"""Integration operations between bot and agent systems."""
from datetime import datetime
from typing import List, Optional, Tuple
from sqlalchemy import select, and_
from ..connection import SessionLocal
from ..models import (
User,
UserTask,
TaskQueue,
ResearchTopic,
Finding,
PaperAnalysis,
ArxivPaper,
)
from ..enums import TaskStatus, UserPlan
from .generic_task import create_task
[docs]
async def get_next_queued_task() -> Optional[UserTask]:
"""Get next task from queue for agent processing.
This function bridges the new UserTask/TaskQueue system with the agent.
:returns: Next UserTask ready for processing or None if queue is empty
"""
async with SessionLocal() as session:
result = await session.execute(
select(UserTask)
.join(TaskQueue)
.where(UserTask.status == TaskStatus.QUEUED)
.order_by(TaskQueue.priority.asc(), TaskQueue.created_at.asc())
.limit(1)
)
return result.scalar_one_or_none()
[docs]
async def start_task_processing(task_id: int) -> bool:
"""Start processing a queued task.
Updates task status to PROCESSING and records start time.
:param task_id: Task ID
:returns: True if successfully started, False if task not found or already processing
"""
async with SessionLocal() as session:
task = await session.get(UserTask, task_id)
if task is None or task.status != TaskStatus.QUEUED:
return False
task.status = TaskStatus.PROCESSING
task.processing_started_at = datetime.now()
task.updated_at = datetime.now()
# Update queue entry if it exists
queue_result = await session.execute(
select(TaskQueue).where(TaskQueue.task_id == task.id)
)
queue_entry = queue_result.scalar_one_or_none()
if queue_entry:
queue_entry.started_at = datetime.now()
queue_entry.updated_at = datetime.now()
await session.commit()
return True
[docs]
async def complete_task_processing(
task_id: int, success: bool = True, error_message: Optional[str] = None
) -> bool:
"""Complete task processing and update status.
:param task_id: Task ID
:param success: Whether task completed successfully
:param error_message: Error message if task failed
:returns: True if successfully completed
"""
async with SessionLocal() as session:
task = await session.get(UserTask, task_id)
if task is None:
return False
# Calculate processing time before updating status
processing_time = 0.0
if task.processing_started_at:
end_time = datetime.now()
processing_time = (end_time - task.processing_started_at).total_seconds()
# Update task status
if success:
# Increment cycle count first
task.cycles_completed = task.cycles_completed + 1
# Check if we've reached the maximum cycles
if task.cycles_completed >= task.max_cycles:
# Task is complete - no more cycles needed
task.status = TaskStatus.COMPLETED
task.processing_completed_at = datetime.now()
# Check if task has results and send notification
results = await get_user_task_results(task.id)
has_results = len(results) > 0
# Send cycle limit notification asynchronously
await _notify_cycle_limit_reached(task, has_results)
else:
# More cycles needed - return to queue for next iteration
task.status = TaskStatus.QUEUED
# Don't set processing_completed_at yet as task is not fully complete
# Update queue entry to reset processing state
queue_result = await session.execute(
select(TaskQueue).where(TaskQueue.task_id == task.id)
)
queue_entry = queue_result.scalar_one_or_none()
if queue_entry:
queue_entry.worker_id = None # Reset worker assignment
queue_entry.started_at = None # Reset start time for reprocessing
queue_entry.updated_at = datetime.now()
else:
task.status = TaskStatus.FAILED
task.processing_completed_at = (
datetime.now()
) # Set completion time even for failures
task.error_message = error_message
task.updated_at = datetime.now()
await session.commit()
# Update global statistics
from .task_statistics import update_task_statistics
await update_task_statistics(processing_time, success)
return True
[docs]
async def create_research_topic_for_user_task(
user_task: UserTask,
) -> Optional[ResearchTopic]:
"""Create a ResearchTopic from UserTask for agent compatibility.
This bridges the new UserTask system with the legacy ResearchTopic system
that the agent pipeline expects.
:param user_task: UserTask instance
:returns: ResearchTopic instance or None if user not found
"""
async with SessionLocal() as session:
# Get user by internal ID
user = await session.get(User, user_task.user_id)
if user is None:
return None
# Check if research topic already exists for this task
# Use full description for exact matching
existing_result = await session.execute(
select(ResearchTopic).where(
and_(
ResearchTopic.user_id
== user.telegram_id, # Use telegram_id for legacy compatibility
ResearchTopic.target_topic == user_task.description, # Exact match
ResearchTopic.is_active,
)
)
)
existing_topic = existing_result.scalar_one_or_none()
if existing_topic:
return existing_topic
# Create new research topic
topic = ResearchTopic(
user_id=user.telegram_id, # Use telegram_id for legacy compatibility
target_topic=user_task.description,
search_area=user_task.title or user_task.description[:100],
is_active=True,
)
session.add(topic)
await session.commit()
await session.refresh(topic)
return topic
[docs]
async def link_analysis_to_user_task(
analysis: PaperAnalysis, user_task: UserTask
) -> None:
"""Link a paper analysis to a user task for proper result tracking.
Creates a Finding record that connects the analysis to the user's task.
:param analysis: PaperAnalysis instance
:param user_task: UserTask instance
"""
async with SessionLocal() as session:
# Create finding record
finding = Finding(
task_id=user_task.id,
paper_id=analysis.paper_id,
relevance=analysis.relevance,
summary=analysis.summary,
)
session.add(finding)
await session.commit()
[docs]
async def get_user_task_results(task_id: int) -> List[Tuple[PaperAnalysis, ArxivPaper]]:
"""Get analysis results for a user task.
:param task_id: UserTask ID
:returns: List of (PaperAnalysis, ArxivPaper) tuples
"""
async with SessionLocal() as session:
result = await session.execute(
select(PaperAnalysis, ArxivPaper)
.join(Finding, PaperAnalysis.paper_id == Finding.paper_id)
.join(ArxivPaper, PaperAnalysis.paper_id == ArxivPaper.id)
.where(Finding.task_id == task_id)
.order_by(PaperAnalysis.relevance.desc())
)
rows = result.all()
return [(row[0], row[1]) for row in rows]
# Legacy function for compatibility (used in bot/handlers/task.py)
[docs]
async def create_user_task(user_id: int, description: str) -> UserTask:
"""Create a user task (legacy function for compatibility).
:param user_id: Telegram user ID (will be converted to internal user ID)
:param description: Task description
:returns: UserTask instance
"""
# Get or create user by telegram_id
from .user import get_or_create_user
user = await get_or_create_user(telegram_id=user_id)
# Use the enhanced task creation function
from .task import create_user_task_with_queue
task, _ = await create_user_task_with_queue(user, description)
return task
async def _notify_cycle_limit_reached(user_task: UserTask, has_results: bool) -> None:
"""Send notification to user when cycle limit is reached.
:param user_task: The completed user task
:param has_results: Whether the task produced any results
"""
# Get user telegram_id for notification
async with SessionLocal() as session:
user = await session.get(User, user_task.user_id)
if user is None:
return
telegram_id = user.telegram_id
plan_name = "Premium" if user.plan == UserPlan.PREMIUM else "Free"
if has_results:
# User has results - congratulate and offer to continue
message = f"""
🎉 <b>Task #{user_task.id} completed!</b>
✅ <b>Results found for your query:</b>
📝 <i>{user_task.description[:100]}{"..." if len(user_task.description) > 100 else ""}</i>
🔄 <b>Cycles completed:</b> {user_task.cycles_completed}/{user_task.max_cycles} (Plan: {plan_name})
🤖 Hope the results were helpful!
💡 <b>Want to continue research?</b>
• Create a new task with a refined query
• Or get a Premium subscription for unlimited search cycles
Use /task to create a new task or /status to view results.
""".strip()
else:
# No results found - suggest refinement or premium
message = f"""
🔄 <b>Task #{user_task.id} completed</b>
📝 <i>{user_task.description[:100]}{"..." if len(user_task.description) > 100 else ""}</i>
🔄 <b>Cycles completed:</b> {user_task.cycles_completed}/{user_task.max_cycles} (Plan: {plan_name})
❌ <b>Unfortunately, no results found for this query.</b>
💡 <b>Recommendations:</b>
• Try reformulating the query more specifically
• Use different keywords
• Or get a Premium subscription for more search cycles
Use /task to create a new task with a refined query.
""".strip()
# Create notification task
data = {"task_type": "cycle_limit_notification", "user_id": telegram_id}
await create_task(
task_type="cycle_limit_notification",
data=data,
status="completed",
result=message,
)