[docs]asyncdefcreate_user_task_with_queue(user:User,description:str)->Tuple[UserTask,TaskQueue]:"""Create a new user task and add it to the processing queue. :param user: User instance :param description: Task description :returns: Tuple of (UserTask, TaskQueue) """asyncwithSessionLocal()assession:# Determine max cycles based on planmax_cycles=100ifuser.plan==UserPlan.PREMIUMelse5# Create tasktask=UserTask(user_id=user.id,title=description[:100]+"..."iflen(description)>100elsedescription,description=description,status=TaskStatus.QUEUED,max_cycles=max_cycles,)session.add(task)awaitsession.commit()awaitsession.refresh(task)# Add to queuequeue_entry=awaitadd_task_to_queue(task)# Increment user's daily counteruser.daily_tasks_created+=1user.updated_at=datetime.now()awaitsession.merge(user)awaitsession.commit()returntask,queue_entry
[docs]asyncdefget_user_tasks(user_id:int)->List[UserTask]:"""Get all tasks for a user with eager loading to avoid lazy loading issues. :param user_id: Internal user ID :returns: List of UserTask instances """asyncwithSessionLocal()assession:result=awaitsession.execute(select(UserTask).options(selectinload(UserTask.queue_entry))# Eager load queue_entry.where(UserTask.user_id==user_id).order_by(UserTask.created_at.desc()))returnlist(result.scalars().all())
[docs]asyncdefupdate_user_task_status(task_id:int,status:TaskStatus)->None:"""Update task status. :param task_id: Task ID :param status: New status """asyncwithSessionLocal()assession:task=awaitsession.get(UserTask,task_id)iftaskisNone:returntask.status=statustask.updated_at=datetime.now()awaitsession.commit()
[docs]asyncdefupdate_user_task_status_for_user(user_id:int,task_id:int,status:TaskStatus)->bool:"""Safely update task status ensuring ownership by user. :param user_id: Internal user ID :param task_id: Task ID :param status: New status :returns: True if updated successfully, False if user not found """asyncwithSessionLocal()assession:task=awaitsession.get(UserTask,task_id)iftaskisNoneortask.user_id!=user_id:returnFalsetask.status=statustask.updated_at=datetime.now()awaitsession.commit()returnTrue
[docs]asyncdefdeactivate_user_tasks(user_id:int)->None:"""Deactivate all active tasks for a user. :param user_id: Internal user ID """asyncwithSessionLocal()assession:result=awaitsession.execute(select(UserTask).where(and_(UserTask.user_id==user_id,UserTask.status==TaskStatus.ACTIVE)))tasks=result.scalars().all()fortintasks:t.status=TaskStatus.PAUSEDt.updated_at=datetime.now()awaitsession.commit()
[docs]asyncdeflist_active_user_tasks()->List[UserTask]:"""List all active user tasks. :returns: List of active UserTask instances """asyncwithSessionLocal()assession:result=awaitsession.execute(select(UserTask).where(UserTask.status==TaskStatus.ACTIVE).order_by(UserTask.created_at.asc()))returnlist(result.scalars().all())
[docs]asyncdefget_most_recent_active_user_task()->Optional[UserTask]:"""Return the most recently updated active user task, or None if none exist. :returns: A single UserTask instance or None when no active tasks. """asyncwithSessionLocal()assession:result=awaitsession.execute(select(UserTask).where(UserTask.status==TaskStatus.ACTIVE).order_by(UserTask.updated_at.desc(),UserTask.created_at.desc()).limit(1))returnresult.scalars().first()
[docs]asyncdeflist_user_tasks(user_id:int)->List[UserTask]:"""List all tasks for a user. :param user_id: Internal user ID :returns: List of UserTask instances """asyncwithSessionLocal()assession:result=awaitsession.execute(select(UserTask).where(UserTask.user_id==user_id).order_by(UserTask.created_at.desc()))returnlist(result.scalars().all())