Agent¶
triage.Agent is the core wrapper. It runs your async callable in a retry loop, classifying failures and dispatching recovery actions.
Constructor¶
Agent(
fn: Callable[..., Awaitable[Any]],
policy: FailurePolicy,
*,
classifier: Classifier | None = None,
checkpoint_store: CheckpointStore | None = None,
max_recovery_attempts: int = 3,
max_total_attempts: int | None = None,
auto_checkpoint: bool = False,
)
| Parameter | Default | Description |
|---|---|---|
fn |
required | The async agent callable to wrap |
policy |
required | Maps FailureType values to recovery strategies |
classifier |
RulesClassifier() |
Classifies failures from the trajectory |
checkpoint_store |
InMemoryCheckpointStore() |
Stores and retrieves checkpoints |
max_recovery_attempts |
3 |
Hard cap on recovery loop iterations per run() call |
max_total_attempts |
None |
Global cap on len(attempt_history) across all failure types; None disables |
auto_checkpoint |
False |
If True, saves a checkpoint after every record_step() call |
max_total_attempts vs max_recovery_attempts¶
max_recovery_attempts counts loop iterations. An agent that alternates between EXTERNAL_FAULT and LOOP_DETECTED can cycle up to max_recovery_attempts times regardless of which types appear.
max_total_attempts counts total entries in attempt_history — one per failure+dispatch pair across all types. It fires before max_recovery_attempts when the total accumulated attempts reaches the limit:
agent = triage.Agent(
my_agent, policy=policy,
max_recovery_attempts=5, # per-loop guard
max_total_attempts=3, # global guard — fires first if reached
)
clone()¶
Returns a new Agent sharing the same fn, policy, classifier, and checkpoint_store but with fresh per-run state. Use this for concurrent task dispatch — a single instance is not safe for concurrent run() calls:
import asyncio
agents = [agent.clone() for _ in tasks]
results = await asyncio.gather(*[ag.run(t) for ag, t in zip(agents, tasks)])
run()¶
Runs the wrapped agent. On failure, classifies the trajectory, dispatches the policy, and re-runs with injected context. Returns the result on success.
Raises:
- TriageEscalationError — strategy returns ESCALATE, max_recovery_attempts exceeded, or max_total_attempts exceeded
- TriageAbortError — strategy returns ABORT
Wrapped function contract¶
async def my_agent(
task: str,
*,
record_step: Callable[[Step], None],
update_state: Callable[[dict], None],
_triage_context: TriageContext | None = None, # injected after any failure
_triage_hint: str | None = None, # backward-compat scalar hint
_triage_subgoal: str | None = None, # injected on RESUME
_triage_state: dict | None = None, # injected on ROLLBACK
**kwargs,
) -> Any:
...
record_step— call once per observable action; drives trajectory classificationupdate_state— persist data to be restored onROLLBACK_triage_context— typedTriageContextwithfailure_type,hint,subgoal,state,attempt_number; available on every recovery attempt
contextvars — zero-signature-change injection¶
If you cannot or do not want to change a function's signature, use triage.get_recorder() and triage.get_state_updater() inside the agent body:
from triage.agent import get_recorder, get_state_updater
async def my_agent(task: str, **kwargs) -> str:
record = get_recorder() # works inside Agent.run() context
update = get_state_updater()
record(Step(index=0, action="fetch", tool_output=data))
update({"data": data})
return result
Both raise RuntimeError if called outside a run() context.
Decorator form¶
@triage.agent(policy=my_policy, auto_checkpoint=True)
async def my_agent(task: str, *, record_step, update_state, **kwargs) -> str:
...
TriageEscalationError¶
Raised when a strategy returns ESCALATE or either attempt cap is exceeded.
TriageAbortError¶
Raised when a strategy returns ABORT. Hard stop — no further recovery.
Example¶
import triage
from triage.strategies.retry import backoff_and_retry
from triage.taxonomy import Step, TriageContext
async def my_agent(task: str, *, record_step, update_state, **kwargs) -> str:
tc: TriageContext | None = kwargs.get("_triage_context")
if tc:
print(f"Recovery attempt {tc.attempt_number}: {tc.hint}")
data = fetch_data(task)
record_step(Step(index=0, action="fetch", tool_output=data))
update_state({"data": data})
return process(data)
policy = triage.FailurePolicy(
EXTERNAL_FAULT=backoff_and_retry(max_attempts=3),
default=triage.FailurePolicy.escalate_by_default(),
)
agent = triage.Agent(
my_agent, policy=policy,
max_total_attempts=5,
auto_checkpoint=True,
)
try:
result = await agent.run("analyse Q1 data")
except triage.TriageEscalationError as exc:
print(f"Needs review: {exc.context.failure_type.value}")