Pipeline¶
Models¶
Typed models for the research pipeline.
This module defines Pydantic models and light-weight data structures used across the pipeline: input tasks, intermediate candidates, and outputs.
- class agent.pipeline.models.AnalysisAgentOutput(*args, **kwargs)[source]¶
Bases:
BaseModel
Output schema for the analysis agent via
output_type
.- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- class agent.pipeline.models.AnalysisInput(*args, **kwargs)[source]¶
Bases:
BaseModel
Selected, ranked input to the LLM for deep analysis.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
-
candidate:
PaperCandidate
¶
- class agent.pipeline.models.AnalysisResult(*args, **kwargs)[source]¶
Bases:
BaseModel
Outcome of a single LLM analysis of a paper candidate.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
-
candidate:
PaperCandidate
¶
- class agent.pipeline.models.DecisionReport(*args, **kwargs)[source]¶
Bases:
BaseModel
Decision agent output controlling whether to notify and the report text.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- class agent.pipeline.models.GeneratedQuery(*args, **kwargs)[source]¶
Bases:
BaseModel
Structured query item produced by the strategy agent.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- class agent.pipeline.models.PaperCandidate(*args, **kwargs)[source]¶
Bases:
BaseModel
A lightweight representation of a potential paper to evaluate.
Notes
The
bm25_score
is populated during ranking and defaults to 0.0.- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- class agent.pipeline.models.PipelineOutput(*args, **kwargs)[source]¶
Bases:
BaseModel
Final output of the pipeline for consumer channels.
Examples
from agent.pipeline.pipeline import run_pipeline_sync out = run_pipeline_sync(PipelineTask(query="graph neural networks for molecules")) print(out.should_notify, len(out.analyzed))
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
-
analyzed:
List
[AnalysisResult
]¶
-
task:
PipelineTask
¶
- class agent.pipeline.models.PipelineTask(*args, **kwargs)[source]¶
Bases:
BaseModel
A high-level pipeline task describing the user’s research intent.
- Parameters:
query – Free-text task description or target area.
categories – Optional arXiv categories to constrain the search, e.g.
["cs.AI"]
.max_queries – Upper bound on generated search queries. Default: 5.
bm25_top_k – Number of top-ranked candidates to keep. Default: 20.
max_analyze – Max number of candidates to analyze with LLM. Default: 10.
min_relevance – Minimum score required for inclusion in the final selection. Default: 50.0.
args (Any)
kwargs (Any)
- Return type:
Any
Examples
PipelineTask(query="RAG for small datasets", categories=["cs.AI"]) # doctest: +ELLIPSIS
- query_must_not_be_empty()¶
- class agent.pipeline.models.QueryPlan(*args, **kwargs)[source]¶
Bases:
BaseModel
Agentic query plan consisting of multiple queries and optional notes.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
- class agent.pipeline.models.ScoredAnalysis(*args, **kwargs)[source]¶
Bases:
BaseModel
Analysis result with overall score used for decision making.
- Parameters:
args (Any)
kwargs (Any)
- Return type:
Any
-
result:
AnalysisResult
¶
Core¶
- async agent.pipeline.pipeline.run_pipeline(task)[source]¶
Execute the end-to-end research pipeline and return structured output.
Stages:
1) Strategy: generate multiple queries for the task 2) Retrieval: collect candidates from arXiv 3) Ranking: score with BM25 over title+abstract 4) Analysis: LLM/heuristic analysis of top candidates 5) Decision: choose items and produce a report
- Parameters:
task (
PipelineTask
) – Validatedagent.pipeline.models.PipelineTask
describing user intent.- Return type:
- Returns:
Structured
PipelineOutput
with analyzed items and an optional human report.
- agent.pipeline.pipeline.run_pipeline_sync(task)[source]¶
Run
run_pipeline()
synchronously.This helper creates and runs an event loop to execute the async pipeline in simple scripts or REPLs.
- Parameters:
task (
PipelineTask
) – The pipeline task to execute.- Return type:
- Returns:
The structured pipeline output.
Steps¶
LLM-based analysis stage for ranked candidates.
This module provides a minimal async analysis function that:
- Reduces context (abstract only for now)
- Calls the shared LLM model once per paper
- Returns structured AnalysisResult
instances
Retries and concurrency limits can be layered in the orchestrator later.
- async agent.pipeline.analyze.analyze_candidates(*, task_query, analysis_inputs)[source]¶
Analyze candidates via agents or a heuristic fallback.
When an API key and the environment flag
PIPELINE_USE_AGENTS_ANALYZE
is set, uses the configured LLM agent to produce structured outputs. Otherwise, computes a quick overlap-based heuristic.- Parameters:
task_query (
str
) – The task description that guides relevance.analysis_inputs (
List
[AnalysisInput
]) – Ranked inputs containing candidates and optional snippets.
- Return type:
- Returns:
One
AnalysisResult
per input, preserving order.
Decisioning: scoring, selection, and reporting with agent support.
This module filters analyzed items, scores them with heuristics, and optionally uses an agent to produce a plain-text report when there are strong candidates.
- async agent.pipeline.decision.make_decision_and_report(task, selected)[source]¶
Generate a plain-text report or decide to skip notifying the user.
Uses an LLM-based reporter when available, falling back to a local template otherwise.
- Parameters:
task (
PipelineTask
) – The source task that describes user intent.selected (
List
[ScoredAnalysis
]) – A compact list of scored analyses.
- Return type:
- Returns:
Decision and optional report text.
- agent.pipeline.decision.score_result(task, result)[source]¶
Compute overall score in
[0, 100]
using relevance and simple boosts.- Parameters:
task (
PipelineTask
) – The pipeline task providing thresholds.result (
AnalysisResult
) – A single analysis result to score.
- Return type:
- Returns:
Score in the range
[0, 100]
.
- agent.pipeline.decision.select_top(task, analyzed)[source]¶
Score and keep items above
min_relevance
in descending order.The output is trimmed to at most three items to keep reports concise.
- Parameters:
task (
PipelineTask
) – Pipeline task withmin_relevance
.analyzed (
List
[AnalysisResult
]) – Analysis results to select from.
- Return type:
- Returns:
Compact, sorted selection of
ScoredAnalysis
.
Search utilities for the pipeline.
This module provides: - Query generation (simple heuristic without embeddings) - Retrieval from multiple sources (arXiv, Google Scholar, PubMed, GitHub)
All functions are synchronous wrappers around sync parsers to keep things simple for initial integration. The pipeline orchestrator can run them in threads or plain sync for now.
- agent.pipeline.search.arxiv_search(*, query, categories=None, max_results=100, start=0)[source]¶
Search arXiv and convert results to
PaperCandidate
items.- Parameters:
- Return type:
- Returns:
A list of candidate papers converted from arXiv results.
Example:
items = arxiv_search(query="RAG AND small datasets", max_results=10) print(len(items))
- agent.pipeline.search.collect_candidates(task, queries, per_query_limit=50)[source]¶
Run source-specific search per query and collect unique candidates.
- Parameters:
task (
PipelineTask
) – The pipeline task providing categories and other context.queries (
Iterable
[GeneratedQuery
]) – Iterable ofGeneratedQuery
with per-query source.per_query_limit (
int
) – Max results retrieved for each query (default 50).
- Return type:
- Returns:
Unique candidates from all queries.
- agent.pipeline.search.github_search(*, query, max_results=50, start=0)[source]¶
Search GitHub repositories and represent them as candidates.
The pipeline treats repositories as candidates with title and snippet.
- Parameters:
- Return type:
- Returns:
Candidate list with repo name and link.
- agent.pipeline.search.pubmed_search(*, query, max_results=50, start=0)[source]¶
Search PubMed and convert results to candidates.
- Parameters:
- Return type:
- Returns:
Candidate list with title and PubMed link.
- agent.pipeline.search.scholar_search(*, query, max_results=50, start=0)[source]¶
Search Google Scholar and convert results to lightweight candidates.
Since Scholar results do not provide abstracts, the
summary
field uses the snippet text when available. Categories and arXiv-specific fields are left empty.- Parameters:
- Return type:
- Returns:
Candidate list with title and link.
BM25 ranking for candidate papers.
Implements a compact BM25 ranking using only Python standard library to keep dependencies minimal. Tokenization is a simple lowercased split on non-word characters, which is sufficient for baseline ranking.
- agent.pipeline.ranking.rank_candidates(*, query, candidates, top_k)[source]¶
Rank candidates with BM25 over title + summary and return top-k.
- Parameters:
query (
str
) – Natural-language query.candidates (
Iterable
[PaperCandidate
]) – Iterable ofPaperCandidate
to be ranked. Candidates are copied to a list internally and scores are written to theirbm25_score
attribute.top_k (
int
) – Number of items to return after sorting by score and recency.
- Return type:
- Returns:
The top-k candidates, sorted by descending score and recency.
Formatting utilities for pipeline outputs with agent support.
- agent.pipeline.formatting.to_telegram_html(output)[source]¶
Synchronous facade using the fallback to avoid event loop requirements.
- Parameters:
output (
PipelineOutput
) – Full pipeline output to format.- Return type:
- Returns:
Telegram-friendly HTML string.
- async agent.pipeline.formatting.to_telegram_html_agent(output)[source]¶
Agent-based formatter; falls back to local template on failure.
- Parameters:
output (
PipelineOutput
) – Full pipeline output to format.- Return type:
- Returns:
Telegram-friendly HTML string.
Utilities¶
Utility helpers for the research pipeline.
This module currently provides a small set of runtime helpers that are reused across pipeline stages. The public API is intentionally minimal and stable.
Example:
from agent.pipeline.utils import retry_async
async def fetch():
# some flaky network call
return 42
result = await retry_async(lambda: fetch(), attempts=3, base_delay=1.0)
assert result == 42
- async agent.pipeline.utils.retry_async(func, *, attempts=3, base_delay=5.0, factor=2.0)[source]¶
Retry an async operation with exponential backoff.
- Parameters:
func (
Callable
[[],Awaitable
[TypeVar
(T
)]]) – Zero-argument coroutine factory to call on each attempt. Using a factory defers creation of the coroutine until it is awaited, avoiding “already awaited” errors on retries.attempts (
int
) – Total attempts including the first call (>= 1). Default 3.base_delay (
float
) – Initial delay in seconds before the next attempt. Default 5.0.factor (
float
) – Multiplicative backoff factor after each failure. Default 2.0.
- Return type:
TypeVar
(T
)- Returns:
The value returned by the successful call to
func
.- Raises:
Exception – Re-raises the last exception encountered if all attempts fail.
Example:
async def get_value() -> int: return 7 value = await retry_async(lambda: get_value(), attempts=5, base_delay=0.2) assert value == 7