[docs]asyncdefadd_task_to_queue(task:UserTask)->TaskQueue:"""Add task to processing queue with appropriate priority. :param task: UserTask instance to queue :returns: TaskQueue entry """asyncwithSessionLocal()assession:# Get user to determine priorityuser=awaitsession.get(User,task.user_id)priority=50ifuseranduser.plan==UserPlan.PREMIUMelse100# Calculate queue positionqueue_count=awaitsession.execute(select(func.count(TaskQueue.id)).where(TaskQueue.task_id!=task.id))position=(queue_count.scalar_one()or0)+1# Estimate start time based on queue and processing statsstats=awaitget_or_create_task_statistics()estimated_wait=(stats.median_processing_time*(position-1)/max(stats.active_workers,1))estimated_start=datetime.now()+timedelta(seconds=estimated_wait)queue_entry=TaskQueue(task_id=task.id,priority=priority,queue_position=position,estimated_start_time=estimated_start,)session.add(queue_entry)awaitsession.commit()awaitsession.refresh(queue_entry)# Update queue positions for all tasksawaitupdate_queue_positions()returnqueue_entry
[docs]asyncdefupdate_queue_positions()->None:"""Update queue positions for all pending tasks based on priority and creation time."""asyncwithSessionLocal()assession:# Get all queued tasks ordered by priority and creation timeresult=awaitsession.execute(select(TaskQueue).join(UserTask).where(UserTask.status==TaskStatus.QUEUED).order_by(TaskQueue.priority.asc(),TaskQueue.created_at.asc()))queue_entries=result.scalars().all()fori,entryinenumerate(queue_entries,1):entry.queue_position=i# Update estimated start timestats=awaitget_or_create_task_statistics()estimated_wait=(stats.median_processing_time*(i-1)/max(stats.active_workers,1))entry.estimated_start_time=datetime.now()+timedelta(seconds=estimated_wait)entry.updated_at=datetime.now()awaitsession.commit()
[docs]asyncdefget_next_task_from_queue()->Optional[UserTask]:"""Get next task from queue for processing. :returns: Next UserTask to process or None if queue is empty """asyncwithSessionLocal()assession:result=awaitsession.execute(select(UserTask).join(TaskQueue).where(UserTask.status==TaskStatus.QUEUED).order_by(TaskQueue.priority.asc(),TaskQueue.created_at.asc()).limit(1))returnresult.scalar_one_or_none()