An AI agent debugger that captures every decision, tool call, and LLM interaction as a queryable timeline — so you can debug agents like distributed systems, not black boxes.
When you ask an AI agent to do something — book a flight, write code, analyze data — a lot happens behind the scenes. The agent makes decisions, calls tools, talks to LLMs, and occasionally refuses unsafe actions.
When something goes wrong — the agent loops forever, picks the wrong tool, or gives a nonsensical answer — you're stuck. Traditional debugging tools can't see inside the agent's reasoning.
OpenTelemetry sees infrastructure metrics, not reasoning chains. LangSmith is SaaS-first with no local option. Sentry sees errors, not why the agent chose them. Peaky Peek is agent-decision-aware and local-first by default.
Peaky Peek wraps your agent in a context manager that captures every action as an event. These events form a timeline you can search, replay, and analyze.
Here's the core idea in code — three lines to start debugging any agent:
from agent_debugger_sdk import TraceContext, init
init()
async with TraceContext(agent_name="my_agent") as ctx:
await ctx.record_decision(
reasoning="User asked for weather data",
confidence=0.85,
chosen_action="call_weather_tool",
)
await ctx.record_tool_call("weather_api",
{"location": "Seattle"})
Pull in the two things we need: TraceContext (the recorder) and init (the setup).
Initialize Peaky Peek. This sets up the connection to the local server.
Open the recording session — like pressing "record" on a camera.
Log a decision: the agent thought about weather data, was 85% confident, and chose to call the weather tool.
Log a tool call: the agent called the weather_api with "Seattle" as the location.
See every reasoning step as an interactive tree. Click nodes to inspect why the agent chose each action.
Time-travel through agent execution. Play, pause, step, and seek to any point in the trace.
Find specific events across all sessions. Search by keyword, filter by event type.
Full safety trail: policy → tool guard → block → violation → refusal.
Three layers work together: the SDK captures events, the API stores and serves them, and the frontend lets you explore.
The recording equipment. You add two lines to your agent code, and the SDK captures every decision, tool call, LLM interaction, and checkpoint.
The control room. Receives events from the SDK, stores them in SQLite, serves them via REST and real-time streaming, and handles search and replay.
The viewing gallery. Decision trees, timeline replays, tool inspectors, safety audit trails, and live monitoring — all rendered in your browser.
The three layers communicate through two channels: REST API for on-demand data, and SSE (Server-Sent Events) for live streaming.
Every agent action becomes an event. Events form a tree, flow through a pipeline, and end up on your screen.
Peaky Peek models agent execution as a hierarchy — like a family tree for your agent's actions:
The container for everything. One user request = one session. It tracks total tokens, cost, tool calls, and whether it succeeded or failed.
The full event timeline for a session. Every event gets a sequence number so you can replay them in order.
Any observable action. Each event has a type (decision, tool_call, error...), a parent ID for nesting, and an importance score.
A state snapshot for time-travel debugging. Restore from a checkpoint to replay the agent from that exact point.
The EventType enum defines 14 types of things that can happen during agent execution. Here are the key ones:
The agent chose an action. Includes reasoning, confidence score, and evidence.
The agent invoked an external tool and got a result back. The call/result pair shows what was asked and what was returned.
What was sent to the AI model and what came back. Includes token counts, cost, and the full conversation.
Something went wrong. Includes the error type, message, and full stack trace.
The agent's safety guardrails in action. Did it pass a check? Refuse an action? Violate a policy?
State snapshots, speaker turns in multi-agent chats, and detected anomalies.
Every event is a dataclass with a shared set of fields. Here's the foundation:
@dataclass(kw_only=True)
class TraceEvent:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
session_id: str = ""
parent_id: str | None = None
event_type: EventType = EventType.AGENT_START
timestamp: datetime = field(
default_factory=lambda: datetime.now(timezone.utc)
)
name: str = ""
data: dict[str, Any] = field(default_factory=dict)
importance: float = 0.5
upstream_event_ids: list[str] = field(
default_factory=list
)
This is a data blueprint. Every event automatically gets a unique ID and timestamp when it's created.
Each event belongs to a session (so you can group all events from one agent run together).
parent_id creates the tree structure — a tool_result can point back to the tool_call that triggered it.
event_type tells us what kind of thing happened — a decision, tool call, error, etc.
data is a catch-all dictionary for event-specific information (different for each event type).
importance is a score from 0-1 that determines how prominently this event shows up in the UI.
The parent_id field is what makes events form a tree. A tool_result's parent is the tool_call. A decision that led to a tool call becomes its grandparent. This is how the Decision Tree visualization knows what connects to what.
Click through each step to follow an event from creation to your screen.
The central piece of the SDK. TraceContext manages the entire lifecycle of a debugging session.
When you write async with TraceContext(...) as ctx:, three things happen silently. Here's the setup code:
async def __aenter__(self) -> TraceContext:
_current_session_id.set(self.session_id)
_current_parent_id.set(None)
_event_sequence.set(0)
_current_context.set(self)
from agent_debugger_sdk.config import get_config
config = get_config()
if config.mode == "cloud" and config.api_key:
from agent_debugger_sdk.transport import HttpTransport
self._transport = HttpTransport(
config.endpoint, config.api_key
)
start_event = TraceEvent(
session_id=self.session_id,
event_type=EventType.AGENT_START,
name="session_start",
data={"agent_name": self.session.agent_name},
importance=0.2,
)
await self._emit_event(start_event)
return self
Set up thread-local variables so any code inside the session can find the active TraceContext.
If we're in "cloud mode" (user provided an API key), create an HTTP transport to send events to a remote server.
Create the very first event — a "session_start" marker — and emit it through the pipeline.
Return the context so the developer can use it with as ctx:
The ContextVar pattern is crucial. Regular global variables would leak between concurrent agent sessions. ContextVars ensures that when two agents run at the same time, each one's events go to the right session.
When TraceContext emits an event, it doesn't just store it locally. It sends the event through a pipeline with three destinations:
persist_event
Save to SQLite so events survive server restarts and can be searched later
EventBuffer.publish
Push to an in-memory buffer that feeds live SSE streams to the frontend
session_update_hook
Update the session's running totals (tokens, cost, tool call count)
This pipeline is connected when the server starts up, using configure_event_pipeline():
configure_event_pipeline(
buffer,
persist_event=_services.persist_event,
persist_checkpoint=_services.persist_checkpoint,
persist_session_start=_services.persist_session_start,
persist_session_update=_services.persist_session_update,
)
Wire up the pipeline: events go to the live buffer AND to the database. Checkpoints get their own persistence path. Session start/update each get their own hooks.
Imagine you're playing a video game and you reach a tricky boss fight. You save your game before the fight so if you die, you can reload and try a different strategy. That's exactly what checkpoints do for agent debugging.
You create a checkpoint at any point during execution by calling ctx.create_checkpoint(). It captures the agent's current state — its memory, its position in the task, whatever the developer decides is important.
The developer decides when to create checkpoints. This is intentional — only the developer knows which moments are worth saving. The importance score (0-1) helps the UI prioritize which checkpoints to show first when there are many.
Watch how the system components talk to each other when a checkpoint is created:
Hundreds of events fire during a single session. How does Peaky Peek figure out which ones matter?
Every event gets a score from 0.0 to 1.0. The ImportanceScorer looks at the event type first, then adjusts based on context:
def score(self, event: TraceEvent) -> float:
base_scores: dict[EventType, float] = {
EventType.ERROR: 0.9,
EventType.REFUSAL: 0.85,
EventType.BEHAVIOR_ALERT: 0.88,
EventType.POLICY_VIOLATION: 0.92,
EventType.DECISION: 0.7,
EventType.SAFETY_CHECK: 0.75,
EventType.CHECKPOINT: 0.6,
EventType.TOOL_CALL: 0.4,
EventType.LLM_REQUEST: 0.3,
EventType.AGENT_START: 0.2,
}
score = base_scores.get(event.event_type, 0.3)
if event.event_type == EventType.DECISION:
confidence = float(_event_value(
event, "confidence", 0.5
))
score += 0.3 * abs(0.5 - confidence) * 2
return min(score, 1.0)
Start with a base score for each event type. Errors and policy violations are inherently important (0.9+). Session starts are routine (0.2).
For decisions, boost the score if the agent was uncertain. A confidence of 0.1 or 0.9 is more interesting than 0.5 — extreme confidence (or extreme uncertainty) is a signal worth investigating.
Cap at 1.0 — no event can be "more than maximally important."
The LiveMonitor watches events as they stream in and derives alerts from patterns it detects. Think of it as a security camera with AI — it doesn't just record, it notices when something's off.
The same tool called 3+ times in a row. Detected by checking the last three tool_call events — if they all target the same tool, that's a loop.
The agent bounces between two actions — like A→B→A→B. The detect_oscillation function looks for repeating patterns of length 2-4.
2+ safety events (refusals, violations, failed checks) in the recent window. The agent is being pushed against its limits.
The agent's last two decisions had different chosen_actions. A sudden strategy change mid-execution is worth flagging.
The oscillation detection algorithm is elegantly simple. Here's the core logic:
def detect_oscillation(
events: list[TraceEvent], window: int = 10
) -> OscillationAlert | None:
for pattern_len in [2, 3, 4]:
pattern = sequence[:pattern_len]
repeats = 1
for i in range(pattern_len,
len(sequence) - pattern_len + 1,
pattern_len):
if sequence[i:i + pattern_len] == pattern:
repeats += 1
if repeats >= 2:
return OscillationAlert(
pattern="->".join(p[1] for p in pattern),
repeat_count=repeats,
severity=min(1.0, repeats / 3.0),
)
Look at the recent events (last 10 by default).
Try to find repeating patterns of length 2, 3, or 4 events.
If a pattern repeats at least twice (e.g., search→read→search→read), that's oscillation.
Severity scales with how many times it repeated — 2 repeats is notable, 3+ is alarming.
The live monitor produces two kinds of alerts. Captured alerts come from events the SDK explicitly recorded (like behavior_alert events). Derived alerts are detected by the monitor itself — like a tool loop that no one explicitly flagged, but the pattern is obvious in hindsight.
What if you could debug an agent without changing a single line of its code?
Most AI agents use LLM libraries like OpenAI, Anthropic, or LangChain. These libraries all have a central function where LLM calls happen.
Peaky Peek's auto-patch system monkey-patches these central functions to wrap them with tracing. The original function still runs — the patch just adds event recording around it.
Each LLM library gets its own adapter that knows how to extract tracing data from that library's specific response format. The OpenAI adapter knows how to read OpenAI's response object, the Anthropic adapter knows Anthropic's format, and so on.
Here's the shared implementation that all adapters use. When an agent makes an LLM call, this wrapper fires instead of the original function:
def _call_sync(self, original, self_client,
*args, **kwargs):
session_id = get_or_create_session(
self._transport, self._config.agent_name,
self.name
)
request_id = self._emit_request(
kwargs, session_id
)
start = time.perf_counter()
try:
response = original(self_client, *args, **kwargs)
finally:
duration_ms = (
time.perf_counter() - start
) * 1000
self._emit_response(
response, request_id, session_id, duration_ms
)
return response
Get or create a session for this agent so we know which session to attribute events to.
Emit a "request" event BEFORE the call — capture what the agent is asking the LLM.
Start a timer, then call the REAL original function. The finally block ensures we always measure time, even if the call crashes.
Emit a "response" event AFTER the call — capture what the LLM said back and how long it took.
Return the original response unchanged — the agent has no idea we were recording.
The PatchRegistry is the control center. It holds all adapters and applies them in one shot:
def apply(self, config, names=None):
for adapter in self._adapters:
if names and adapter.name not in names:
continue
if not adapter.is_available():
continue
try:
adapter.patch(config)
self._patched.append(adapter)
except Exception:
logger.warning("Failed to patch")
For each registered adapter (OpenAI, Anthropic, etc.)...
If the user specified specific adapters by name, skip the rest.
Check if the library is actually installed — skip if not.
Apply the patch. If it fails, log a warning but keep going — never crash the agent because tracing failed.
Notice the try/except in the apply loop. If patching OpenAI fails for some reason, Peaky Peek doesn't crash your agent — it just logs a warning and moves on. The debugging tool should never break the thing it's debugging.
Design decisions, the transport layer, and how to extend the system.
Data stays on your machine by default. No external telemetry, no cloud dependency. Your agent's prompts and reasoning never leave your disk unless you explicitly configure it.
Every failure path in the SDK is wrapped in try/except. If event persistence fails, the agent keeps running. If transport fails, events buffer locally. The debugger is a passenger, not a driver.
The SDK doesn't import collector modules at the top level. This avoids circular dependencies — you can use the SDK without the server, and the server can use the SDK without the collector.
The HTTP transport retries transient errors (timeouts, 5xx) with exponential backoff: 0.5s, 1s, 2s. Permanent errors (4xx) fail immediately. This prevents cascading failures.
The HttpTransport class is responsible for sending events to the server in cloud mode. It classifies errors into two types:
TransientError
Temporary problems — timeouts, server errors (5xx), network issues. These get retried with exponential backoff (0.5s, 1s, 2s).
PermanentError
Fatal problems — auth failures (401, 403), not found (404). No point retrying — the result won't change.
This matters because agent execution is time-sensitive. If the debugger's transport layer goes into an infinite retry loop, it blocks the agent. The 3-retry cap with backoff ensures fast failure.
Peaky Peek is designed to be extensible at every layer:
Define a new dataclass in core/events/, register it in EVENT_TYPE_REGISTRY, and update the UI types in frontend/src/types/index.ts.
Create a new adapter in auto_patch/adapters/, extend BaseAdapter, implement is_available(), patch(), and unpatch(). Register it in the PatchRegistry.
Create a new file in api/, define Pydantic schemas in schemas.py, and register the router in main.py. Update the frontend client.ts to call it.
Create a new module in collector/, add a detection function, and integrate it into the analysis pipeline. The LiveMonitor already shows how to derive alerts from patterns.
You now understand how an AI agent debugger works — from the event pipeline to the intelligence layer. Next time your agent loops forever, you'll know exactly where to look.