Database Module

Backward compatibility layer for shared.db imports.

This module provides the same interface as the old shared.db module but uses the new modular database structure from shared.database.

class shared.db.AgentStatus(*args, **kwargs)[source]

Bases: Base

Real-time agent status tracking.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.ArxivPaper(*args, **kwargs)[source]

Bases: Base

ArXiv papers for analysis.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.Base(*args, **kwargs)[source]

Bases: AsyncAttrs, DeclarativeBase

Base class for all database models.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.Finding(*args, **kwargs)[source]

Bases: Base

Research findings from task processing.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.PaperAnalysis(*args, **kwargs)[source]

Bases: Base

Analysis of paper relevance to research topics.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.RateLimitRecord(*args, **kwargs)[source]

Bases: Base

Rate limiting records for anti-spam protection.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.ResearchTopic(*args, **kwargs)[source]

Bases: Base

Research topics for arXiv analysis.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.SearchQuery(*args, **kwargs)[source]

Bases: Base

Search queries for task processing.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.TaskQueue(*args, **kwargs)[source]

Bases: Base

Global task queue with priority management.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.TaskStatistics(*args, **kwargs)[source]

Bases: Base

Global task processing statistics for time estimation.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.TaskStatus(*values)[source]

Bases: str, Enum

Task status types.

ACTIVE = 'active'
CANCELLED = 'cancelled'
COMPLETED = 'completed'
FAILED = 'failed'
PAUSED = 'paused'
PROCESSING = 'processing'
QUEUED = 'queued'
class shared.db.User(*args, **kwargs)[source]

Bases: Base

User model with plan management and rate limiting.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.UserPlan(*values)[source]

Bases: str, Enum

User subscription plan types.

FREE = 'free'
PREMIUM = 'premium'
class shared.db.UserSettings(*args, **kwargs)[source]

Bases: Base

User settings for filtering and analysis.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

class shared.db.UserTask(*args, **kwargs)[source]

Bases: Base

Enhanced user task model with queue support.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

async shared.db.add_task_to_queue(task)[source]

Add task to processing queue with appropriate priority.

Parameters:

task (UserTask) – UserTask instance to queue

Return type:

TaskQueue

Returns:

TaskQueue entry

async shared.db.check_rate_limit(user_id, action_type)[source]

Check if user action is within rate limits.

Parameters:
  • user_id (int) – Internal user ID (not telegram_id)

  • action_type (str) – Type of action being performed

Return type:

Tuple[bool, str]

Returns:

Tuple of (allowed: bool, reason: str)

async shared.db.check_user_can_create_task(user)[source]

Check if user can create a new task based on limits.

Parameters:

user (User) – User instance

Return type:

Tuple[bool, str]

Returns:

Tuple of (can_create: bool, reason: str)

async shared.db.complete_task_processing(task_id, success=True, error_message=None)[source]

Complete task processing and update status.

Parameters:
  • task_id (int) – Task ID

  • success (bool) – Whether task completed successfully

  • error_message (Optional[str]) – Error message if task failed

Return type:

bool

Returns:

True if successfully completed

async shared.db.count_analyses_for_user(user_id)[source]

Count analyses for user.

Parameters:

user_id (int) – User ID

Return type:

int

Returns:

Analysis count

async shared.db.count_relevant_analyses_for_user(user_id, min_overall)[source]

Count relevant analyses for user.

Parameters:
  • user_id (int) – User ID

  • min_overall (float) – Minimum relevance score

Return type:

int

Returns:

Relevant analysis count

async shared.db.create_arxiv_paper(data)[source]

Create an ArXiv paper.

Parameters:

data (dict[str, Any]) – Paper data

Return type:

ArxivPaper

Returns:

ArxivPaper instance

async shared.db.create_paper_analysis(*, paper_id, topic_id, relevance, summary, status='analyzed', key_fragments=None, contextual_reasoning=None)[source]

Create a paper analysis.

Parameters:
  • paper_id (int) – Paper ID

  • topic_id (int) – Topic ID

  • relevance (float) – Relevance score

  • summary (Optional[str]) – Analysis summary

  • status (str) – Analysis status

  • key_fragments (Optional[str]) – Key fragments

  • contextual_reasoning (Optional[str]) – Contextual reasoning

Return type:

PaperAnalysis

Returns:

PaperAnalysis instance

async shared.db.create_research_topic(user_id, target_topic, search_area)[source]

Create a research topic.

Parameters:
  • user_id (int) – User ID

  • target_topic (str) – Target topic

  • search_area (str) – Search area

Return type:

ResearchTopic

Returns:

ResearchTopic instance

async shared.db.create_research_topic_for_user_task(user_task)[source]

Create a ResearchTopic from UserTask for agent compatibility.

This bridges the new UserTask system with the legacy ResearchTopic system that the agent pipeline expects.

Parameters:

user_task (UserTask) – UserTask instance

Return type:

Optional[ResearchTopic]

Returns:

ResearchTopic instance or None if user not found

async shared.db.create_search_query(*, task_id, query_text, rationale=None, categories=None, time_from=None, time_to=None, status='active')[source]

Create a new search query.

Parameters:
Return type:

SearchQuery

Returns:

SearchQuery instance

async shared.db.create_task(task_type, data, status='pending', result=None)[source]

Create a generic task.

Parameters:
  • task_type (str) – Task type

  • data (dict[str, Any]) – Task data

  • status (str) – Task status

  • result (Optional[str]) – Task result

Return type:

Task

Returns:

Task instance

async shared.db.create_user_task(user_id, description)[source]

Create a user task (legacy function for compatibility).

Parameters:
  • user_id (int) – Telegram user ID (will be converted to internal user ID)

  • description (str) – Task description

Return type:

UserTask

Returns:

UserTask instance

async shared.db.create_user_task_with_queue(user, description)[source]

Create a new user task and add it to the processing queue.

Parameters:
  • user (User) – User instance

  • description (str) – Task description

Return type:

Tuple[UserTask, TaskQueue]

Returns:

Tuple of (UserTask, TaskQueue)

async shared.db.deactivate_user_tasks(user_id)[source]

Deactivate all active tasks for a user.

Parameters:

user_id (int) – Internal user ID

Return type:

None

async shared.db.deactivate_user_topics(user_id)[source]

Deactivate all user topics.

Parameters:

user_id (int) – User ID

Return type:

None

shared.db.ensure_connection()[source]

Async SQLAlchemy manages connections via the session. No-op retained for compatibility.

Return type:

None

async shared.db.get_active_topic_by_user(user_id)[source]

Get active topic for user.

Parameters:

user_id (int) – User ID

Return type:

Optional[ResearchTopic]

Returns:

ResearchTopic instance or None

async shared.db.get_agent_status(agent_id)[source]

Get agent status.

Parameters:

agent_id (str) – Agent ID

Return type:

Optional[AgentStatus]

Returns:

AgentStatus instance or None

async shared.db.get_analysis_with_entities(analysis_id)[source]

Get analysis with related entities.

Parameters:

analysis_id (int) – Analysis ID

Return type:

Optional[Tuple[PaperAnalysis, ArxivPaper, ResearchTopic]]

Returns:

Tuple of (PaperAnalysis, ArxivPaper, ResearchTopic) or None

async shared.db.get_arxiv_paper_by_arxiv_id(arxiv_id)[source]

Get ArXiv paper by ArXiv ID.

Parameters:

arxiv_id (str) – ArXiv ID

Return type:

Optional[ArxivPaper]

Returns:

ArxivPaper instance or None

async shared.db.get_most_recent_active_user_task()[source]

Return the most recently updated active user task, or None if none exist.

Return type:

Optional[UserTask]

Returns:

A single UserTask instance or None when no active tasks.

async shared.db.get_next_queued_task()[source]

Get next task from queue for agent processing.

This function bridges the new UserTask/TaskQueue system with the agent.

Return type:

Optional[UserTask]

Returns:

Next UserTask ready for processing or None if queue is empty

async shared.db.get_next_task_from_queue()[source]

Get next task from queue for processing.

Return type:

Optional[UserTask]

Returns:

Next UserTask to process or None if queue is empty

async shared.db.get_or_create_task_statistics()[source]

Get current task statistics or create default if none exist.

Return type:

TaskStatistics

Returns:

TaskStatistics instance

async shared.db.get_or_create_user(telegram_id, username=None, first_name=None, last_name=None)[source]

Get user by telegram_id or create new user with default free plan.

Parameters:
  • telegram_id (int) – Telegram user ID

  • username (Optional[str]) – Telegram username (optional)

  • first_name (Optional[str]) – User’s first name (optional)

  • last_name (Optional[str]) – User’s last name (optional)

Return type:

User

Returns:

User instance

async shared.db.get_or_create_user_settings(user_id)[source]

Get or create user settings.

Parameters:

user_id (int) – User ID

Return type:

UserSettings

Returns:

UserSettings instance

async shared.db.get_task(task_id)[source]

Get task by ID.

Parameters:

task_id (int) – Task ID

Return type:

Optional[Task]

Returns:

Task instance or None

async shared.db.get_topic_by_user_and_text(user_id, target_topic, search_area)[source]

Get topic by user and text.

Parameters:
  • user_id (int) – User ID

  • target_topic (str) – Target topic

  • search_area (str) – Search area

Return type:

Optional[ResearchTopic]

Returns:

ResearchTopic instance or None

async shared.db.get_user_settings(user_id)[source]

Get user settings.

Parameters:

user_id (int) – User ID

Return type:

Optional[UserSettings]

Returns:

UserSettings instance or None

async shared.db.get_user_task_results(task_id)[source]

Get analysis results for a user task.

Parameters:

task_id (int) – UserTask ID

Return type:

List[Tuple[PaperAnalysis, ArxivPaper]]

Returns:

List of (PaperAnalysis, ArxivPaper) tuples

async shared.db.get_user_tasks(user_id)[source]

Get all tasks for a user with eager loading to avoid lazy loading issues.

Parameters:

user_id (int) – Internal user ID

Return type:

List[UserTask]

Returns:

List of UserTask instances

async shared.db.has_paper_analysis(paper_id, topic_id)[source]

Check if paper analysis exists.

Parameters:
  • paper_id (int) – Paper ID

  • topic_id (int) – Topic ID

Return type:

bool

Returns:

True if analysis exists

async shared.db.init_db()[source]

Initialize database and create all tables including new user management and queue tables.

Return type:

None

async shared.db.initialize_database()

Initialize database and create all tables including new user management and queue tables.

Return type:

None

Link a paper analysis to a user task for proper result tracking.

Creates a Finding record that connects the analysis to the user’s task.

Parameters:
Return type:

None

async shared.db.list_active_queries_for_task(task_id)[source]

List active search queries for a task.

Parameters:

task_id (int) – Task ID

Return type:

List[SearchQuery]

Returns:

List of SearchQuery instances

async shared.db.list_active_topics()[source]

List all active topics.

Return type:

List[ResearchTopic]

Returns:

List of ResearchTopic instances

async shared.db.list_active_user_tasks()[source]

List all active user tasks.

Return type:

List[UserTask]

Returns:

List of active UserTask instances

async shared.db.list_completed_tasks_since(last_id)[source]

List completed tasks since last ID.

Parameters:

last_id (int) – Last task ID

Return type:

List[Task]

Returns:

List of Task instances

async shared.db.list_new_analyses_since(last_id, min_overall)[source]

List new analyses since last ID.

Parameters:
  • last_id (int) – Last analysis ID

  • min_overall (float) – Minimum relevance score

Return type:

List[PaperAnalysis]

Returns:

List of PaperAnalysis instances

async shared.db.list_pending_tasks()[source]

List pending tasks.

Return type:

List[Task]

Returns:

List of Task instances

async shared.db.list_recent_analyses_for_user(user_id, limit=5)[source]

List recent analyses for user.

Parameters:
  • user_id (int) – User ID

  • limit (int) – Result limit

Return type:

List[Tuple[PaperAnalysis, ArxivPaper]]

Returns:

List of (PaperAnalysis, ArxivPaper) tuples

async shared.db.list_user_tasks(user_id)[source]

List all tasks for a user.

Parameters:

user_id (int) – Internal user ID

Return type:

List[UserTask]

Returns:

List of UserTask instances

async shared.db.mark_analysis_notified(analysis_id)[source]

Mark analysis as notified.

Parameters:

analysis_id (int) – Analysis ID

Return type:

None

async shared.db.mark_analysis_queued(analysis_id)[source]

Mark analysis as queued.

Parameters:

analysis_id (int) – Analysis ID

Return type:

None

async shared.db.mark_task_completed(task_id, result_text)[source]

Mark task as completed.

Parameters:
  • task_id (int) – Task ID

  • result_text (Optional[str]) – Result text

Return type:

None

async shared.db.mark_task_failed(task_id, error_text)[source]

Mark task as failed.

Parameters:
  • task_id (int) – Task ID

  • error_text (str) – Error text

Return type:

None

async shared.db.mark_task_sent(task_id)[source]

Mark task as sent.

Parameters:

task_id (int) – Task ID

Return type:

None

async shared.db.record_finding(task_id, paper_id, relevance, summary)[source]

Record a research finding.

Parameters:
  • task_id (int) – Task ID

  • paper_id (int) – Paper ID

  • relevance (float) – Relevance score

  • summary (Optional[str]) – Finding summary

Return type:

Finding

Returns:

Finding instance

async shared.db.reset_daily_counters_if_needed(user)[source]

Reset daily counters if a day has passed.

Parameters:

user (User) – User instance

Return type:

User

Returns:

Updated user instance

async shared.db.start_task_processing(task_id)[source]

Start processing a queued task.

Updates task status to PROCESSING and records start time.

Parameters:

task_id (int) – Task ID

Return type:

bool

Returns:

True if successfully started, False if task not found or already processing

async shared.db.swap_user_active_topics(user_id)[source]

Swap user active topics.

Parameters:

user_id (int) – User ID

Return type:

Optional[ResearchTopic]

Returns:

ResearchTopic instance or None

async shared.db.update_agent_status(*, agent_id, status, activity, current_user_id=None, current_topic_id=None, papers_processed=0, papers_found=0)[source]

Update agent status.

Parameters:
  • agent_id (str) – Agent ID

  • status (str) – Agent status

  • activity (str) – Agent activity

  • current_user_id (Optional[int]) – Current user ID

  • current_topic_id (Optional[int]) – Current topic ID

  • papers_processed (int) – Papers processed count

  • papers_found (int) – Papers found count

Return type:

None

async shared.db.update_queue_positions()[source]

Update queue positions for all pending tasks based on priority and creation time.

Return type:

None

async shared.db.update_search_query_stats(query_id, success_increment=0)[source]

Update search query statistics.

Parameters:
  • query_id (int) – Query ID

  • success_increment (int) – Success count increment

Return type:

None

async shared.db.update_task_statistics(processing_time_seconds, success=True)[source]

Update global task processing statistics.

Parameters:
  • processing_time_seconds (float) – Time taken to process the task

  • success (bool) – Whether the task completed successfully

Return type:

None

async shared.db.update_user_settings(user_id, **fields)[source]

Update user settings.

Parameters:
  • user_id (int) – User ID

  • fields (Any) – Fields to update

Return type:

None

async shared.db.update_user_task_status(task_id, status)[source]

Update task status.

Parameters:
Return type:

None

async shared.db.update_user_task_status_for_user(user_id, task_id, status)[source]

Safely update task status ensuring ownership by user.

Parameters:
  • user_id (int) – Internal user ID

  • task_id (int) – Task ID

  • status (TaskStatus) – New status

Return type:

bool

Returns:

True if updated successfully, False if user not found

async shared.db.upgrade_user_plan(telegram_id, plan, expires_at=None)[source]

Upgrade user plan and adjust limits.

Parameters:
  • telegram_id (int) – Telegram user ID

  • plan (UserPlan) – New plan type

  • expires_at (Optional[datetime]) – Plan expiration date (for premium)

Return type:

bool

Returns:

True if upgraded successfully, False if user not found