Source code for agent.manager

"""Autonomous agent manager loop.

Polls active user tasks, runs a single pipeline iteration per task, persists
results (unless dry-run), and triggers Telegram notifications via DB tasks.
"""

import asyncio
from datetime import datetime
import json
import os
from dataclasses import dataclass
from typing import List, Optional, Tuple

from shared.logging import get_logger
from shared.db import (
    UserSettings,
    UserTask,
    create_arxiv_paper,
    create_paper_analysis,
    create_task,
    get_user_settings,
    list_active_queries_for_task,
    update_agent_status,
    get_arxiv_paper_by_arxiv_id,
    # Integration functions
    get_next_queued_task,
    start_task_processing,
    complete_task_processing,
    create_research_topic_for_user_task,
    link_analysis_to_user_task,
)

from agent.pipeline.pipeline import run_pipeline
from agent.pipeline.models import PipelineOutput, PipelineTask


logger = get_logger(__name__)


[docs] @dataclass class RuntimeConfig: """In-memory configuration for the agent manager. :ivar poll_seconds: Poll interval in seconds for the main loop. :ivar dry_run: If ``True``, do not persist analyses; only notify. :ivar agent_id: Identifier reported in status updates. :ivar test_user_id: Optional override to send notifications to a test user. """ poll_seconds: int = 30 dry_run: bool = False agent_id: str = "main_agent" test_user_id: Optional[int] = None
def _read_config() -> RuntimeConfig: """Load runtime config from environment variables. :returns: A populated :class:`RuntimeConfig` instance. """ poll = int(os.getenv("AGENT_POLL_SECONDS", "30")) dry = os.getenv("AGENT_DRY_RUN", "0").lower() in {"1", "true", "yes"} agent_id = os.getenv("AGENT_ID", "main_agent") test_uid: Optional[int] = None if os.getenv("AGENT_TEST_USER_ID"): try: test_uid = int(os.getenv("AGENT_TEST_USER_ID", "").strip()) except Exception: test_uid = None return RuntimeConfig( poll_seconds=poll, dry_run=dry, agent_id=agent_id, test_user_id=test_uid ) def _build_pipeline_task( *, user_task: UserTask, settings: Optional[UserSettings], explicit_queries: Optional[List[str]] = None, ) -> PipelineTask: """Convert DB ``UserTask`` into a :class:`PipelineTask` for the pipeline. Tries to extract optional fields (``queries``, ``categories``) from the task description if it contains JSON. :param user_task: Source task from the database. :param settings: Per-user settings influencing thresholds. :param explicit_queries: Optional explicit queries associated with the task. :returns: A :class:`PipelineTask` ready for execution. """ raw_text = (user_task.description or user_task.title or "").strip() min_rel = float(getattr(settings, "min_relevance", 50.0)) if settings else 50.0 queries: Optional[List[str]] = explicit_queries if explicit_queries else None categories: Optional[List[str]] = None query_text: str = raw_text # Attempt to parse JSON payloads in description for advanced control try: data = json.loads(raw_text) if isinstance(data, dict): if isinstance(data.get("query"), str): query_text = data["query"].strip() if isinstance(data.get("queries"), list): queries = [str(x) for x in data["queries"] if str(x).strip()] if isinstance(data.get("categories"), list): categories = [str(x) for x in data["categories"] if str(x).strip()] except Exception: # Not JSON, use raw text pass return PipelineTask( query=query_text, min_relevance=min_rel, queries=queries, categories=categories, ) async def _persist_selected( output: PipelineOutput, *, user_task: UserTask, topic_id: int = 0 ) -> List[Tuple[int, int]]: """Persist selected items into DB: ensure paper and create analysis. :param output: Pipeline output with selected items. :param user_task: The UserTask instance for proper integration. :param topic_id: Research topic ID for legacy compatibility. :returns: List of ``(analysis_id, paper_id)`` pairs. """ saved: List[Tuple[int, int]] = [] for s in output.selected: c = s.result.candidate # Ensure paper exists existing = await get_arxiv_paper_by_arxiv_id(c.arxiv_id) if existing is None: # Fallbacks for non-arXiv items that may lack timestamps published_ts: datetime = c.published or c.updated or datetime.now() updated_ts: datetime = c.updated or c.published or published_ts paper = await create_arxiv_paper( { "arxiv_id": c.arxiv_id, "title": c.title, "authors": json.dumps([]), # unknown authors here "summary": c.summary, "categories": json.dumps(c.categories or []), "published": published_ts, "updated": updated_ts, "pdf_url": c.pdf_url or "", "abs_url": c.abs_url or "", "journal_ref": c.journal_ref, "doi": c.doi, "comment": c.comment, "primary_category": c.primary_category, } ) else: paper = existing # Create analysis row analysis = await create_paper_analysis( paper_id=paper.id, topic_id=topic_id, relevance=float(s.overall_score), summary=s.result.summary, key_fragments=s.result.key_fragments, contextual_reasoning=s.result.contextual_reasoning, ) # Link analysis to user task through Finding await link_analysis_to_user_task(analysis, user_task) saved.append((analysis.id, paper.id)) return saved async def _notify_report(user_id: int, report_text: str) -> None: """Create a completed Task row that the bot will pick up and send to the user. :param user_id: Telegram or internal user identifier. :param report_text: Plain-text report to be delivered. :returns: ``None``. """ data = {"task_type": "agent_report", "user_id": user_id} logger.info(f"Enqueuing completed agent_report for user {user_id}") await create_task( task_type="agent_report", data=data, status="completed", result=report_text ) async def _process_user_task(rt: RuntimeConfig, user_task: UserTask) -> None: """Process one user task: run pipeline, persist, and notify if needed. :param rt: Runtime configuration. :param user_task: Task pulled from the database queue. :returns: ``None``. """ task_success = False error_message = None try: # Start task processing if not await start_task_processing(user_task.id): logger.error(f"Failed to start processing task {user_task.id}") return # Create research topic for legacy compatibility research_topic = await create_research_topic_for_user_task(user_task) if research_topic is None: logger.error(f"Failed to create research topic for task {user_task.id}") await complete_task_processing( user_task.id, False, "Failed to create research topic" ) return # Get user settings (using telegram_id for legacy compatibility) settings = await get_user_settings(research_topic.user_id) # Load explicit queries if configured for the task explicit_queries: Optional[List[str]] = None try: active_queries = await list_active_queries_for_task(user_task.id) if active_queries: explicit_queries = [ q.query_text for q in active_queries if q.query_text ] except Exception: explicit_queries = None pipeline_task = _build_pipeline_task( user_task=user_task, settings=settings, explicit_queries=explicit_queries ) await update_agent_status( agent_id=rt.agent_id, status="running", activity=f"processing user task {user_task.id}", current_user_id=research_topic.user_id, # Use telegram_id for status ) logger.info( f"Running pipeline for task {user_task.id}: {user_task.description[:50]}..." ) output: PipelineOutput = await run_pipeline(pipeline_task) # Handle notifications if output.should_notify and output.report_text: target_user = rt.test_user_id or research_topic.user_id await _notify_report(target_user, output.report_text) # Persist results if not dry run if not rt.dry_run and output.selected: try: await _persist_selected( output, user_task=user_task, topic_id=research_topic.id ) logger.info( f"Persisted {len(output.selected)} results for task {user_task.id}" ) except Exception as e: logger.error(f"Persist selected failed for task {user_task.id}: {e}") error_message = f"Failed to persist results: {str(e)}" raise task_success = True logger.info(f"Successfully completed task {user_task.id}") except Exception as e: logger.error(f"Error processing task {user_task.id}: {e}") error_message = str(e) task_success = False finally: # Complete task processing and update status await complete_task_processing(user_task.id, task_success, error_message) await update_agent_status( agent_id=rt.agent_id, status="idle", activity="waiting", current_user_id=None, )
[docs] async def main() -> None: """Agent main loop: poll tasks and process them autonomously. :returns: ``None``. """ cfg = _read_config() logger.info( f"Agent starting (poll={cfg.poll_seconds}s, dry_run={'yes' if cfg.dry_run else 'no'}, agent_id={cfg.agent_id})" ) while True: try: # Get next task from queue (QUEUED status) task = await get_next_queued_task() if not task: await update_agent_status( agent_id=cfg.agent_id, status="idle", activity="waiting for queued tasks", ) await asyncio.sleep(cfg.poll_seconds) continue # Process the next queued task logger.info(f"Processing queued task {task.id}: {task.description[:50]}...") await _process_user_task(cfg, task) # Brief pause between tasks to allow for proper status updates await asyncio.sleep(1) except Exception as loop_error: logger.error(f"Agent loop error: {loop_error}") await update_agent_status( agent_id=cfg.agent_id, status="error", activity=f"error: {str(loop_error)[:100]}", ) await asyncio.sleep(min(60, cfg.poll_seconds))