"""Utility helpers for the research pipeline.This module currently provides a small set of runtime helpers that are reusedacross pipeline stages. The public API is intentionally minimal and stable.Example:: from agent.pipeline.utils import retry_async async def fetch(): # some flaky network call return 42 result = await retry_async(lambda: fetch(), attempts=3, base_delay=1.0) assert result == 42"""importasynciofromtypingimportAwaitable,Callable,Optional,TypeVarfromshared.loggingimportget_loggerlogger=get_logger(__name__)T=TypeVar("T")
[docs]asyncdefretry_async(func:Callable[[],Awaitable[T]],*,attempts:int=3,base_delay:float=5.0,factor:float=2.0,)->T:"""Retry an async operation with exponential backoff. :param func: Zero-argument coroutine factory to call on each attempt. Using a factory defers creation of the coroutine until it is awaited, avoiding "already awaited" errors on retries. :param attempts: Total attempts including the first call (>= 1). Default 3. :param base_delay: Initial delay in seconds before the next attempt. Default 5.0. :param factor: Multiplicative backoff factor after each failure. Default 2.0. :returns: The value returned by the successful call to ``func``. :raises Exception: Re-raises the last exception encountered if all attempts fail. Example:: async def get_value() -> int: return 7 value = await retry_async(lambda: get_value(), attempts=5, base_delay=0.2) assert value == 7 """delay=base_delaylast_error:Optional[Exception]=Noneforattemptinrange(1,attempts+1):try:returnawaitfunc()exceptExceptionaserror:# noqa: BLE001last_error=errorifattempt>=attempts:breaklogger.warning(f"Retryable error on attempt {attempt}/{attempts}: {error}. Sleeping {delay:.1f}s")awaitasyncio.sleep(delay)delay*=factorassertlast_errorisnotNoneraiselast_error