Pipeline

Models

Typed models for the research pipeline.

This module defines Pydantic models and light-weight data structures used across the pipeline: input tasks, intermediate candidates, and outputs.

class agent.pipeline.models.AnalysisAgentOutput(*args, **kwargs)[source]

Bases: BaseModel

Output schema for the analysis agent via output_type.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

contextual_reasoning: Optional[str] = None
key_fragments: Optional[str] = None
relevance: float
summary: str
class agent.pipeline.models.AnalysisInput(*args, **kwargs)[source]

Bases: BaseModel

Selected, ranked input to the LLM for deep analysis.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

candidate: PaperCandidate
class agent.pipeline.models.AnalysisResult(*args, **kwargs)[source]

Bases: BaseModel

Outcome of a single LLM analysis of a paper candidate.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

candidate: PaperCandidate
contextual_reasoning: Optional[str] = None
key_fragments: Optional[str] = None
relevance: float
summary: str
class agent.pipeline.models.DecisionReport(*args, **kwargs)[source]

Bases: BaseModel

Decision agent output controlling whether to notify and the report text.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

report_text: Optional[str]
should_notify: bool
class agent.pipeline.models.GeneratedQuery(*args, **kwargs)[source]

Bases: BaseModel

Structured query item produced by the strategy agent.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

categories: Optional[List[str]] = None
query_text: str
rationale: Optional[str] = None
source: Literal['arxiv', 'scholar', 'pubmed', 'github']
time_from: Optional[str] = None
time_to: Optional[str] = None
class agent.pipeline.models.PaperCandidate(*args, **kwargs)[source]

Bases: BaseModel

A lightweight representation of a potential paper to evaluate.

Notes

The bm25_score is populated during ranking and defaults to 0.0.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

abs_url: Optional[str] = None
arxiv_id: str
bm25_score: float = 0.0
comment: Optional[str] = None
doi: Optional[str] = None
journal_ref: Optional[str] = None
pdf_url: Optional[str] = None
primary_category: Optional[str] = None
published: Optional[datetime] = None
summary: str
title: str
updated: Optional[datetime] = None
class agent.pipeline.models.PipelineOutput(*args, **kwargs)[source]

Bases: BaseModel

Final output of the pipeline for consumer channels.

Examples

from agent.pipeline.pipeline import run_pipeline_sync
out = run_pipeline_sync(PipelineTask(query="graph neural networks for molecules"))
print(out.should_notify, len(out.analyzed))
Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

analyzed: List[AnalysisResult]
generated_queries: List[str]
report_text: Optional[str] = None
should_notify: bool = False
task: PipelineTask
class agent.pipeline.models.PipelineTask(*args, **kwargs)[source]

Bases: BaseModel

A high-level pipeline task describing the user’s research intent.

Parameters:
  • query – Free-text task description or target area.

  • categories – Optional arXiv categories to constrain the search, e.g. ["cs.AI"].

  • max_queries – Upper bound on generated search queries. Default: 5.

  • bm25_top_k – Number of top-ranked candidates to keep. Default: 20.

  • max_analyze – Max number of candidates to analyze with LLM. Default: 10.

  • min_relevance – Minimum score required for inclusion in the final selection. Default: 50.0.

  • args (Any)

  • kwargs (Any)

Return type:

Any

Examples

PipelineTask(query="RAG for small datasets", categories=["cs.AI"])  # doctest: +ELLIPSIS
categories: Optional[List[str]] = None
query: str
query_must_not_be_empty()
class agent.pipeline.models.QueryPlan(*args, **kwargs)[source]

Bases: BaseModel

Agentic query plan consisting of multiple queries and optional notes.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

notes: Optional[str] = None
class agent.pipeline.models.ScoredAnalysis(*args, **kwargs)[source]

Bases: BaseModel

Analysis result with overall score used for decision making.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

overall_score: float
reasoning: Optional[str] = None
result: AnalysisResult
class agent.pipeline.models.TelegramSummary(*args, **kwargs)[source]

Bases: BaseModel

Output schema for Telegram formatting agent.

Parameters:
  • args (Any)

  • kwargs (Any)

Return type:

Any

html: str

Core

async agent.pipeline.pipeline.run_pipeline(task)[source]

Execute the end-to-end research pipeline and return structured output.

Stages:

1) Strategy: generate multiple queries for the task
2) Retrieval: collect candidates from arXiv
3) Ranking: score with BM25 over title+abstract
4) Analysis: LLM/heuristic analysis of top candidates
5) Decision: choose items and produce a report
Parameters:

task (PipelineTask) – Validated agent.pipeline.models.PipelineTask describing user intent.

Return type:

PipelineOutput

Returns:

Structured PipelineOutput with analyzed items and an optional human report.

agent.pipeline.pipeline.run_pipeline_sync(task)[source]

Run run_pipeline() synchronously.

This helper creates and runs an event loop to execute the async pipeline in simple scripts or REPLs.

Parameters:

task (PipelineTask) – The pipeline task to execute.

Return type:

PipelineOutput

Returns:

The structured pipeline output.

Steps

LLM-based analysis stage for ranked candidates.

This module provides a minimal async analysis function that: - Reduces context (abstract only for now) - Calls the shared LLM model once per paper - Returns structured AnalysisResult instances

Retries and concurrency limits can be layered in the orchestrator later.

async agent.pipeline.analyze.analyze_candidates(*, task_query, analysis_inputs)[source]

Analyze candidates via agents or a heuristic fallback.

When an API key and the environment flag PIPELINE_USE_AGENTS_ANALYZE is set, uses the configured LLM agent to produce structured outputs. Otherwise, computes a quick overlap-based heuristic.

Parameters:
  • task_query (str) – The task description that guides relevance.

  • analysis_inputs (List[AnalysisInput]) – Ranked inputs containing candidates and optional snippets.

Return type:

List[AnalysisResult]

Returns:

One AnalysisResult per input, preserving order.

Decisioning: scoring, selection, and reporting with agent support.

This module filters analyzed items, scores them with heuristics, and optionally uses an agent to produce a plain-text report when there are strong candidates.

async agent.pipeline.decision.make_decision_and_report(task, selected)[source]

Generate a plain-text report or decide to skip notifying the user.

Uses an LLM-based reporter when available, falling back to a local template otherwise.

Parameters:
Return type:

DecisionReport

Returns:

Decision and optional report text.

agent.pipeline.decision.score_result(task, result)[source]

Compute overall score in [0, 100] using relevance and simple boosts.

Parameters:
Return type:

float

Returns:

Score in the range [0, 100].

agent.pipeline.decision.select_top(task, analyzed)[source]

Score and keep items above min_relevance in descending order.

The output is trimmed to at most three items to keep reports concise.

Parameters:
Return type:

List[ScoredAnalysis]

Returns:

Compact, sorted selection of ScoredAnalysis.

Search utilities for the pipeline.

This module provides: - Query generation (simple heuristic without embeddings) - Retrieval from multiple sources (arXiv, Google Scholar, PubMed, GitHub)

All functions are synchronous wrappers around sync parsers to keep things simple for initial integration. The pipeline orchestrator can run them in threads or plain sync for now.

Search arXiv and convert results to PaperCandidate items.

Parameters:
  • query (str) – Search query string.

  • categories (Optional[List[str]]) – Optional list of arXiv categories, e.g. ["cs.AI", "cs.LG"].

  • max_results (int) – Page size for the search request (default 100).

  • start (int) – Offset for pagination (default 0).

Return type:

List[PaperCandidate]

Returns:

A list of candidate papers converted from arXiv results.

Example:

items = arxiv_search(query="RAG AND small datasets", max_results=10)
print(len(items))
agent.pipeline.search.collect_candidates(task, queries, per_query_limit=50)[source]

Run source-specific search per query and collect unique candidates.

Parameters:
  • task (PipelineTask) – The pipeline task providing categories and other context.

  • queries (Iterable[GeneratedQuery]) – Iterable of GeneratedQuery with per-query source.

  • per_query_limit (int) – Max results retrieved for each query (default 50).

Return type:

List[PaperCandidate]

Returns:

Unique candidates from all queries.

Search GitHub repositories and represent them as candidates.

The pipeline treats repositories as candidates with title and snippet.

Parameters:
  • query (str) – Search query string.

  • max_results (int) – Page size for the search request (default 50).

  • start (int) – Offset for pagination (default 0).

Return type:

List[PaperCandidate]

Returns:

Candidate list with repo name and link.

Search PubMed and convert results to candidates.

Parameters:
  • query (str) – Search query string.

  • max_results (int) – Page size for the search request (default 50).

  • start (int) – Offset for pagination (default 0).

Return type:

List[PaperCandidate]

Returns:

Candidate list with title and PubMed link.

Search Google Scholar and convert results to lightweight candidates.

Since Scholar results do not provide abstracts, the summary field uses the snippet text when available. Categories and arXiv-specific fields are left empty.

Parameters:
  • query (str) – Search query string.

  • max_results (int) – Page size for the search request (default 50).

  • start (int) – Offset for pagination (default 0).

Return type:

List[PaperCandidate]

Returns:

Candidate list with title and link.

BM25 ranking for candidate papers.

Implements a compact BM25 ranking using only Python standard library to keep dependencies minimal. Tokenization is a simple lowercased split on non-word characters, which is sufficient for baseline ranking.

agent.pipeline.ranking.rank_candidates(*, query, candidates, top_k)[source]

Rank candidates with BM25 over title + summary and return top-k.

Parameters:
  • query (str) – Natural-language query.

  • candidates (Iterable[PaperCandidate]) – Iterable of PaperCandidate to be ranked. Candidates are copied to a list internally and scores are written to their bm25_score attribute.

  • top_k (int) – Number of items to return after sorting by score and recency.

Return type:

List[PaperCandidate]

Returns:

The top-k candidates, sorted by descending score and recency.

Formatting utilities for pipeline outputs with agent support.

agent.pipeline.formatting.to_telegram_html(output)[source]

Synchronous facade using the fallback to avoid event loop requirements.

Parameters:

output (PipelineOutput) – Full pipeline output to format.

Return type:

str

Returns:

Telegram-friendly HTML string.

async agent.pipeline.formatting.to_telegram_html_agent(output)[source]

Agent-based formatter; falls back to local template on failure.

Parameters:

output (PipelineOutput) – Full pipeline output to format.

Return type:

str

Returns:

Telegram-friendly HTML string.

Utilities

Utility helpers for the research pipeline.

This module currently provides a small set of runtime helpers that are reused across pipeline stages. The public API is intentionally minimal and stable.

Example:

from agent.pipeline.utils import retry_async

async def fetch():
    # some flaky network call
    return 42

result = await retry_async(lambda: fetch(), attempts=3, base_delay=1.0)
assert result == 42
async agent.pipeline.utils.retry_async(func, *, attempts=3, base_delay=5.0, factor=2.0)[source]

Retry an async operation with exponential backoff.

Parameters:
  • func (Callable[[], Awaitable[TypeVar(T)]]) – Zero-argument coroutine factory to call on each attempt. Using a factory defers creation of the coroutine until it is awaited, avoiding “already awaited” errors on retries.

  • attempts (int) – Total attempts including the first call (>= 1). Default 3.

  • base_delay (float) – Initial delay in seconds before the next attempt. Default 5.0.

  • factor (float) – Multiplicative backoff factor after each failure. Default 2.0.

Return type:

TypeVar(T)

Returns:

The value returned by the successful call to func.

Raises:

Exception – Re-raises the last exception encountered if all attempts fail.

Example:

async def get_value() -> int:
    return 7

value = await retry_async(lambda: get_value(), attempts=5, base_delay=0.2)
assert value == 7