[docs]asyncdefget_or_create_task_statistics()->TaskStatistics:"""Get current task statistics or create default if none exist. :returns: TaskStatistics instance """asyncwithSessionLocal()assession:result=awaitsession.execute(select(TaskStatistics).order_by(TaskStatistics.id.desc()).limit(1))stats=result.scalar_one_or_none()ifstatsisNone:stats=TaskStatistics()session.add(stats)awaitsession.commit()awaitsession.refresh(stats)returnstats
[docs]asyncdefupdate_task_statistics(processing_time_seconds:float,success:bool=True)->None:"""Update global task processing statistics. :param processing_time_seconds: Time taken to process the task :param success: Whether the task completed successfully """asyncwithSessionLocal()assession:stats=awaitget_or_create_task_statistics()# Update countsifsuccess:stats.total_tasks_processed+=1stats.recent_completed_tasks+=1stats.total_processing_time_seconds+=int(processing_time_seconds)else:stats.recent_failed_tasks+=1# Recalculate averagesifstats.total_tasks_processed>0:stats.avg_processing_time=(stats.total_processing_time_seconds/stats.total_tasks_processed)# Update min/max timesifsuccess:stats.min_processing_time=min(stats.min_processing_time,processing_time_seconds)stats.max_processing_time=max(stats.max_processing_time,processing_time_seconds)# Simple median estimation (can be improved with more sophisticated approach)recent_times=[stats.min_processing_time,processing_time_seconds,stats.max_processing_time,]stats.median_processing_time=sorted(recent_times)[1]# Update recent averageifstats.recent_completed_tasks>0:stats.recent_avg_time=(stats.recent_avg_time+processing_time_seconds)/2# Update queue lengthqueue_count=awaitsession.execute(select(func.count(TaskQueue.id)))stats.current_queue_length=queue_count.scalar_one()or0stats.last_updated=datetime.now()awaitsession.commit()