"""Typed models for the research pipeline.This module defines Pydantic models and light-weight data structures usedacross the pipeline: input tasks, intermediate candidates, and outputs."""fromdatetimeimportdatetimefromtypingimportList,Optional,LiteralfrompydanticimportBaseModel,Field,field_validator
[docs]classPipelineTask(BaseModel):"""A high-level pipeline task describing the user's research intent. Parameters ---------- query: Free-text task description or target area. categories: Optional arXiv categories to constrain the search, e.g. ``["cs.AI"]``. max_queries: Upper bound on generated search queries. Default: 5. bm25_top_k: Number of top-ranked candidates to keep. Default: 20. max_analyze: Max number of candidates to analyze with LLM. Default: 10. min_relevance: Minimum score required for inclusion in the final selection. Default: 50.0. Examples -------- .. code-block:: python PipelineTask(query="RAG for small datasets", categories=["cs.AI"]) # doctest: +ELLIPSIS """query:strcategories:Optional[List[str]]=Nonemax_queries:int=Field(default=5,ge=1,le=20)bm25_top_k:int=Field(default=20,ge=5,le=100)max_analyze:int=Field(default=10,ge=1,le=50)min_relevance:float=Field(default=50.0,ge=0.0,le=100.0)queries:Optional[List[str]]=Field(default=None,description=("Optional user-suggested queries. The strategy agent will still decide"" sources for each query."),)@field_validator("query")@classmethoddefquery_must_not_be_empty(cls,value:str)->str:ifnotvalueornotvalue.strip():raiseValueError("Query must not be empty.")returnvalue
[docs]classPaperCandidate(BaseModel):"""A lightweight representation of a potential paper to evaluate. Notes ----- The ``bm25_score`` is populated during ranking and defaults to 0.0. """arxiv_id:strtitle:strsummary:strcategories:List[str]=Field(default_factory=list)published:Optional[datetime]=Noneupdated:Optional[datetime]=Nonepdf_url:Optional[str]=Noneabs_url:Optional[str]=Nonejournal_ref:Optional[str]=Nonedoi:Optional[str]=Nonecomment:Optional[str]=Noneprimary_category:Optional[str]=Nonebm25_score:float=0.0
[docs]classAnalysisInput(BaseModel):"""Selected, ranked input to the LLM for deep analysis."""candidate:PaperCandidatesnippets:List[str]=Field(default_factory=list)
[docs]classAnalysisResult(BaseModel):"""Outcome of a single LLM analysis of a paper candidate."""candidate:PaperCandidaterelevance:floatsummary:strkey_fragments:Optional[str]=Nonecontextual_reasoning:Optional[str]=None
[docs]classPipelineOutput(BaseModel):"""Final output of the pipeline for consumer channels. Examples -------- .. code-block:: python from agent.pipeline.pipeline import run_pipeline_sync out = run_pipeline_sync(PipelineTask(query="graph neural networks for molecules")) print(out.should_notify, len(out.analyzed)) """task:PipelineTaskanalyzed:List[AnalysisResult]generated_queries:List[str]selected:List["ScoredAnalysis"]=Field(default_factory=list)should_notify:bool=Falsereport_text:Optional[str]=None
[docs]classGeneratedQuery(BaseModel):"""Structured query item produced by the strategy agent."""query_text:strsource:Literal["arxiv","scholar","pubmed","github"]rationale:Optional[str]=Nonecategories:Optional[List[str]]=Nonetime_from:Optional[str]=Nonetime_to:Optional[str]=None
[docs]classQueryPlan(BaseModel):"""Agentic query plan consisting of multiple queries and optional notes."""notes:Optional[str]=Nonequeries:List[GeneratedQuery]=Field(default_factory=list)
[docs]classAnalysisAgentOutput(BaseModel):"""Output schema for the analysis agent via ``output_type``."""relevance:floatsummary:strkey_fragments:Optional[str]=Nonecontextual_reasoning:Optional[str]=None
[docs]classTelegramSummary(BaseModel):"""Output schema for Telegram formatting agent."""html:str
[docs]classScoredAnalysis(BaseModel):"""Analysis result with overall score used for decision making."""result:AnalysisResultoverall_score:floatreasoning:Optional[str]=None
[docs]classDecisionReport(BaseModel):"""Decision agent output controlling whether to notify and the report text."""should_notify:boolreport_text:Optional[str]