Source code for shared.database.operations.integration

"""Integration operations between bot and agent systems."""

from datetime import datetime
from typing import List, Optional, Tuple

from sqlalchemy import select, and_

from ..connection import SessionLocal
from ..models import (
    User,
    UserTask,
    TaskQueue,
    ResearchTopic,
    Finding,
    PaperAnalysis,
    ArxivPaper,
)
from ..enums import TaskStatus, UserPlan
from .generic_task import create_task


[docs] async def get_next_queued_task() -> Optional[UserTask]: """Get next task from queue for agent processing. This function bridges the new UserTask/TaskQueue system with the agent. :returns: Next UserTask ready for processing or None if queue is empty """ async with SessionLocal() as session: result = await session.execute( select(UserTask) .join(TaskQueue) .where(UserTask.status == TaskStatus.QUEUED) .order_by(TaskQueue.priority.asc(), TaskQueue.created_at.asc()) .limit(1) ) return result.scalar_one_or_none()
[docs] async def start_task_processing(task_id: int) -> bool: """Start processing a queued task. Updates task status to PROCESSING and records start time. :param task_id: Task ID :returns: True if successfully started, False if task not found or already processing """ async with SessionLocal() as session: task = await session.get(UserTask, task_id) if task is None or task.status != TaskStatus.QUEUED: return False task.status = TaskStatus.PROCESSING task.processing_started_at = datetime.now() task.updated_at = datetime.now() # Update queue entry if it exists queue_result = await session.execute( select(TaskQueue).where(TaskQueue.task_id == task.id) ) queue_entry = queue_result.scalar_one_or_none() if queue_entry: queue_entry.started_at = datetime.now() queue_entry.updated_at = datetime.now() await session.commit() return True
[docs] async def complete_task_processing( task_id: int, success: bool = True, error_message: Optional[str] = None ) -> bool: """Complete task processing and update status. :param task_id: Task ID :param success: Whether task completed successfully :param error_message: Error message if task failed :returns: True if successfully completed """ async with SessionLocal() as session: task = await session.get(UserTask, task_id) if task is None: return False # Calculate processing time before updating status processing_time = 0.0 if task.processing_started_at: end_time = datetime.now() processing_time = (end_time - task.processing_started_at).total_seconds() # Update task status if success: # Increment cycle count first task.cycles_completed = task.cycles_completed + 1 # Check if we've reached the maximum cycles if task.cycles_completed >= task.max_cycles: # Task is complete - no more cycles needed task.status = TaskStatus.COMPLETED task.processing_completed_at = datetime.now() # Check if task has results and send notification results = await get_user_task_results(task.id) has_results = len(results) > 0 # Send cycle limit notification asynchronously await _notify_cycle_limit_reached(task, has_results) else: # More cycles needed - return to queue for next iteration task.status = TaskStatus.QUEUED # Don't set processing_completed_at yet as task is not fully complete # Update queue entry to reset processing state queue_result = await session.execute( select(TaskQueue).where(TaskQueue.task_id == task.id) ) queue_entry = queue_result.scalar_one_or_none() if queue_entry: queue_entry.worker_id = None # Reset worker assignment queue_entry.started_at = None # Reset start time for reprocessing queue_entry.updated_at = datetime.now() else: task.status = TaskStatus.FAILED task.processing_completed_at = ( datetime.now() ) # Set completion time even for failures task.error_message = error_message task.updated_at = datetime.now() await session.commit() # Update global statistics from .task_statistics import update_task_statistics await update_task_statistics(processing_time, success) return True
[docs] async def create_research_topic_for_user_task( user_task: UserTask, ) -> Optional[ResearchTopic]: """Create a ResearchTopic from UserTask for agent compatibility. This bridges the new UserTask system with the legacy ResearchTopic system that the agent pipeline expects. :param user_task: UserTask instance :returns: ResearchTopic instance or None if user not found """ async with SessionLocal() as session: # Get user by internal ID user = await session.get(User, user_task.user_id) if user is None: return None # Check if research topic already exists for this task # Use full description for exact matching existing_result = await session.execute( select(ResearchTopic).where( and_( ResearchTopic.user_id == user.telegram_id, # Use telegram_id for legacy compatibility ResearchTopic.target_topic == user_task.description, # Exact match ResearchTopic.is_active, ) ) ) existing_topic = existing_result.scalar_one_or_none() if existing_topic: return existing_topic # Create new research topic topic = ResearchTopic( user_id=user.telegram_id, # Use telegram_id for legacy compatibility target_topic=user_task.description, search_area=user_task.title or user_task.description[:100], is_active=True, ) session.add(topic) await session.commit() await session.refresh(topic) return topic
[docs] async def get_user_task_results(task_id: int) -> List[Tuple[PaperAnalysis, ArxivPaper]]: """Get analysis results for a user task. :param task_id: UserTask ID :returns: List of (PaperAnalysis, ArxivPaper) tuples """ async with SessionLocal() as session: result = await session.execute( select(PaperAnalysis, ArxivPaper) .join(Finding, PaperAnalysis.paper_id == Finding.paper_id) .join(ArxivPaper, PaperAnalysis.paper_id == ArxivPaper.id) .where(Finding.task_id == task_id) .order_by(PaperAnalysis.relevance.desc()) ) rows = result.all() return [(row[0], row[1]) for row in rows]
# Legacy function for compatibility (used in bot/handlers/task.py)
[docs] async def create_user_task(user_id: int, description: str) -> UserTask: """Create a user task (legacy function for compatibility). :param user_id: Telegram user ID (will be converted to internal user ID) :param description: Task description :returns: UserTask instance """ # Get or create user by telegram_id from .user import get_or_create_user user = await get_or_create_user(telegram_id=user_id) # Use the enhanced task creation function from .task import create_user_task_with_queue task, _ = await create_user_task_with_queue(user, description) return task
async def _notify_cycle_limit_reached(user_task: UserTask, has_results: bool) -> None: """Send notification to user when cycle limit is reached. :param user_task: The completed user task :param has_results: Whether the task produced any results """ # Get user telegram_id for notification async with SessionLocal() as session: user = await session.get(User, user_task.user_id) if user is None: return telegram_id = user.telegram_id plan_name = "Premium" if user.plan == UserPlan.PREMIUM else "Free" if has_results: # User has results - congratulate and offer to continue message = f""" 🎉 <b>Task #{user_task.id} completed!</b> ✅ <b>Results found for your query:</b> 📝 <i>{user_task.description[:100]}{"..." if len(user_task.description) > 100 else ""}</i> 🔄 <b>Cycles completed:</b> {user_task.cycles_completed}/{user_task.max_cycles} (Plan: {plan_name}) 🤖 Hope the results were helpful! 💡 <b>Want to continue research?</b> • Create a new task with a refined query • Or get a Premium subscription for unlimited search cycles Use /task to create a new task or /status to view results. """.strip() else: # No results found - suggest refinement or premium message = f""" 🔄 <b>Task #{user_task.id} completed</b> 📝 <i>{user_task.description[:100]}{"..." if len(user_task.description) > 100 else ""}</i> 🔄 <b>Cycles completed:</b> {user_task.cycles_completed}/{user_task.max_cycles} (Plan: {plan_name}) ❌ <b>Unfortunately, no results found for this query.</b> 💡 <b>Recommendations:</b> • Try reformulating the query more specifically • Use different keywords • Or get a Premium subscription for more search cycles Use /task to create a new task with a refined query. """.strip() # Create notification task data = {"task_type": "cycle_limit_notification", "user_id": telegram_id} await create_task( task_type="cycle_limit_notification", data=data, status="completed", result=message, )