Skip to content

Index

code_context_agent.consumer

Event consumer package for agent streaming output.

EventConsumer

Bases: ABC

Protocol for consuming agent streaming events.

Implement this interface to create custom event consumers for different output formats (Rich terminal, JSON, web UI, etc.).

The consumer receives typed AG-UI events and can render them appropriately. Events follow a lifecycle: run starts -> messages/tools -> run finishes.

Example

class LoggingConsumer(EventConsumer): ... async def on_run_started(self, thread_id: str, run_id: str) -> None: ... print(f"Run started: {run_id}")

on_run_started abstractmethod async

on_run_started(thread_id, run_id)

Handle run started event.

Parameters:

Name Type Description Default
thread_id str

Unique identifier for the conversation thread.

required
run_id str

Unique identifier for this agent run.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_run_started(self, thread_id: str, run_id: str) -> None:
    """Handle run started event.

    Args:
        thread_id: Unique identifier for the conversation thread.
        run_id: Unique identifier for this agent run.
    """

on_text_start abstractmethod async

on_text_start(message_id, role)

Handle start of a text message.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the message.

required
role str

Message role (usually "assistant").

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_text_start(self, message_id: str, role: str) -> None:
    """Handle start of a text message.

    Args:
        message_id: Unique identifier for the message.
        role: Message role (usually "assistant").
    """

on_text_content abstractmethod async

on_text_content(message_id, delta)

Handle streaming text content.

Parameters:

Name Type Description Default
message_id str

Identifier of the message being streamed.

required
delta str

New text chunk to append.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_text_content(self, message_id: str, delta: str) -> None:
    """Handle streaming text content.

    Args:
        message_id: Identifier of the message being streamed.
        delta: New text chunk to append.
    """

on_text_end abstractmethod async

on_text_end(message_id)

Handle end of a text message.

Parameters:

Name Type Description Default
message_id str

Identifier of the completed message.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_text_end(self, message_id: str) -> None:
    """Handle end of a text message.

    Args:
        message_id: Identifier of the completed message.
    """

on_tool_start abstractmethod async

on_tool_start(tool_call_id, tool_name)

Handle start of tool execution.

Parameters:

Name Type Description Default
tool_call_id str

Unique identifier for this tool call.

required
tool_name str

Name of the tool being called.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_tool_start(self, tool_call_id: str, tool_name: str) -> None:
    """Handle start of tool execution.

    Args:
        tool_call_id: Unique identifier for this tool call.
        tool_name: Name of the tool being called.
    """

on_tool_args abstractmethod async

on_tool_args(tool_call_id, args_delta)

Handle streaming tool arguments.

Parameters:

Name Type Description Default
tool_call_id str

Identifier of the tool call.

required
args_delta str

JSON string chunk of tool arguments.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_tool_args(self, tool_call_id: str, args_delta: str) -> None:
    """Handle streaming tool arguments.

    Args:
        tool_call_id: Identifier of the tool call.
        args_delta: JSON string chunk of tool arguments.
    """

on_tool_result abstractmethod async

on_tool_result(tool_call_id, result)

Handle tool execution result.

Parameters:

Name Type Description Default
tool_call_id str

Identifier of the completed tool call.

required
result Any

Result returned by the tool.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_tool_result(self, tool_call_id: str, result: Any) -> None:
    """Handle tool execution result.

    Args:
        tool_call_id: Identifier of the completed tool call.
        result: Result returned by the tool.
    """

on_tool_end abstractmethod async

on_tool_end(tool_call_id)

Handle end of tool execution.

Parameters:

Name Type Description Default
tool_call_id str

Identifier of the completed tool call.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_tool_end(self, tool_call_id: str) -> None:
    """Handle end of tool execution.

    Args:
        tool_call_id: Identifier of the completed tool call.
    """

on_state_snapshot abstractmethod async

on_state_snapshot(snapshot)

Handle state snapshot event.

Parameters:

Name Type Description Default
snapshot dict[str, Any]

Complete state snapshot dictionary.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_state_snapshot(self, snapshot: dict[str, Any]) -> None:
    """Handle state snapshot event.

    Args:
        snapshot: Complete state snapshot dictionary.
    """

on_run_finished abstractmethod async

on_run_finished(thread_id, run_id)

Handle run finished event.

Parameters:

Name Type Description Default
thread_id str

Identifier of the conversation thread.

required
run_id str

Identifier of the completed run.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_run_finished(self, thread_id: str, run_id: str) -> None:
    """Handle run finished event.

    Args:
        thread_id: Identifier of the conversation thread.
        run_id: Identifier of the completed run.
    """

on_error abstractmethod async

on_error(message, code=None)

Handle error event.

Parameters:

Name Type Description Default
message str

Error message description.

required
code str | None

Optional error code.

None
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_error(self, message: str, code: str | None = None) -> None:
    """Handle error event.

    Args:
        message: Error message description.
        code: Optional error code.
    """

start async

start()

Initialize the consumer (optional).

Override to perform setup before events start streaming. Called before the first event is received.

Source code in src/code_context_agent/consumer/base.py
async def start(self) -> None:
    """Initialize the consumer (optional).

    Override to perform setup before events start streaming.
    Called before the first event is received.
    """

stop async

stop()

Cleanup the consumer (optional).

Override to perform cleanup after events stop streaming. Called after the last event is received or on error.

Source code in src/code_context_agent/consumer/base.py
async def stop(self) -> None:
    """Cleanup the consumer (optional).

    Override to perform cleanup after events stop streaming.
    Called after the last event is received or on error.
    """

QuietConsumer

QuietConsumer()

Bases: EventConsumer

Silent consumer that only writes errors to stderr.

Initialize quiet consumer.

Source code in src/code_context_agent/consumer/rich_consumer.py
def __init__(self) -> None:
    """Initialize quiet consumer."""
    self._stderr = Console(stderr=True, no_color=True, highlight=False)

start async

start()

Initialize the consumer (optional).

Override to perform setup before events start streaming. Called before the first event is received.

Source code in src/code_context_agent/consumer/base.py
async def start(self) -> None:
    """Initialize the consumer (optional).

    Override to perform setup before events start streaming.
    Called before the first event is received.
    """

stop async

stop()

Cleanup the consumer (optional).

Override to perform cleanup after events stop streaming. Called after the last event is received or on error.

Source code in src/code_context_agent/consumer/base.py
async def stop(self) -> None:
    """Cleanup the consumer (optional).

    Override to perform cleanup after events stop streaming.
    Called after the last event is received or on error.
    """

on_run_started async

on_run_started(thread_id, run_id)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_run_started(self, thread_id: str, run_id: str) -> None:
    """No output."""

on_text_start async

on_text_start(message_id, role)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_start(self, message_id: str, role: str) -> None:
    """No output."""

on_text_content async

on_text_content(message_id, delta)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_content(self, message_id: str, delta: str) -> None:
    """No output."""

on_text_end async

on_text_end(message_id)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_end(self, message_id: str) -> None:
    """No output."""

on_tool_start async

on_tool_start(tool_call_id, tool_name)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_start(self, tool_call_id: str, tool_name: str) -> None:
    """No output."""

on_tool_args async

on_tool_args(tool_call_id, args_delta)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_args(self, tool_call_id: str, args_delta: str) -> None:
    """No output."""

on_tool_result async

on_tool_result(tool_call_id, result)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_result(self, tool_call_id: str, result: Any) -> None:
    """No output."""

on_tool_end async

on_tool_end(tool_call_id)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_end(self, tool_call_id: str) -> None:
    """No output."""

on_state_snapshot async

on_state_snapshot(snapshot)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_state_snapshot(self, snapshot: dict[str, Any]) -> None:
    """No output."""

on_run_finished async

on_run_finished(thread_id, run_id)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_run_finished(self, thread_id: str, run_id: str) -> None:
    """No output."""

on_error async

on_error(message, code=None)

Print error to stderr.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_error(self, message: str, code: str | None = None) -> None:
    """Print error to stderr."""
    error_text = f"[{code}] {message}" if code else message
    self._stderr.print(f"Error: {error_text}")

RichEventConsumer

RichEventConsumer(console=None, *, mode='standard')

Bases: EventConsumer

Dashboard-style consumer for agent execution.

Shows a fixed-height panel with: - Timer and progress bar - Tool category breakdown with mini bars - Active tool with spinner - Recent tool history with timing and status

No streaming text display. The agent's reasoning is not shown — only tool execution status matters for a long-running analysis agent.

Initialize the dashboard consumer.

Parameters:

Name Type Description Default
console Console | None

Optional Rich Console instance.

None
mode str

Analysis mode ("standard" or "full").

'standard'
Source code in src/code_context_agent/consumer/rich_consumer.py
def __init__(self, console: Console | None = None, *, mode: str = "standard") -> None:
    """Initialize the dashboard consumer.

    Args:
        console: Optional Rich Console instance.
        mode: Analysis mode ("standard" or "full").
    """
    self.console = console or Console()
    self.state = AgentDisplayState()
    self._live: Live | None = None
    self._mode = mode

start async

start()

Start the dashboard display.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def start(self) -> None:
    """Start the dashboard display."""
    self.state.reset()
    self._live = Live(
        self._build_display(),
        console=self.console,
        refresh_per_second=2,
        transient=True,
        vertical_overflow="ellipsis",
    )
    # Rich auto-refresh calls get_renderable — point it at our builder
    # so the dashboard always shows fresh state (timer, tool elapsed, etc.)
    self._live.get_renderable = self._build_display  # type: ignore[assignment]
    self._live.start()

stop async

stop()

Stop the dashboard and print final summary.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def stop(self) -> None:
    """Stop the dashboard and print final summary."""
    if self._live:
        self._live.stop()
        self._live = None

    # Print a static final summary after Live is gone
    elapsed = self.state.get_elapsed_seconds()
    total = len(self.state.completed_tools)
    errors = self.state.tool_errors
    self.console.print(
        f"[dim]  {total} tools in {self._format_time(elapsed)}{total - errors}{errors}[/dim]",
    )

on_run_started async

on_run_started(thread_id, run_id)

Handle run started.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_run_started(self, thread_id: str, run_id: str) -> None:
    """Handle run started."""
    self.state.thread_id = thread_id
    self.state.run_id = run_id
    self.state.start_time = time.monotonic()

on_text_start async

on_text_start(message_id, role)

Handle text start — no display action.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_start(self, message_id: str, role: str) -> None:
    """Handle text start — no display action."""
    self.state.active_message_id = message_id

on_text_content async

on_text_content(message_id, delta)

Handle text delta — silently accumulate (not displayed).

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_content(self, message_id: str, delta: str) -> None:
    """Handle text delta — silently accumulate (not displayed)."""
    self.state.text_buffer += delta

on_text_end async

on_text_end(message_id)

Handle text end — increment turn counter.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_end(self, message_id: str) -> None:
    """Handle text end — increment turn counter."""
    self.state.active_message_id = None
    self.state.turn_count += 1
    self.state.text_buffer = ""

on_tool_start async

on_tool_start(tool_call_id, tool_name)

Handle tool start — show in active spinner.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_start(self, tool_call_id: str, tool_name: str) -> None:
    """Handle tool start — show in active spinner."""
    self.state.active_tool = ToolCallState(
        tool_call_id=tool_call_id,
        tool_name=tool_name,
    )
    self.state.tool_start_time = time.monotonic()
    self._detect_phase(tool_name)

on_tool_args async

on_tool_args(tool_call_id, args_delta)

Handle tool args — silently accumulate.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_args(self, tool_call_id: str, args_delta: str) -> None:
    """Handle tool args — silently accumulate."""
    if self.state.active_tool and self.state.active_tool.tool_call_id == tool_call_id:
        self.state.active_tool.args_buffer += args_delta

on_tool_result async

on_tool_result(tool_call_id, result)

Handle tool result — check for errors and extract discoveries.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_result(self, tool_call_id: str, result: Any) -> None:
    """Handle tool result — check for errors and extract discoveries."""
    if self.state.active_tool and self.state.active_tool.tool_call_id == tool_call_id:
        self.state.active_tool.result = result
        if (isinstance(result, str) and "error" in result.lower()) or (
            isinstance(result, dict) and result.get("error")
        ):
            self.state.tool_errors += 1
        # Extract discovery events
        discovery = self._extract_discovery(self.state.active_tool.tool_name, result)
        if discovery:
            self.state.add_discovery(discovery)

on_tool_end async

on_tool_end(tool_call_id)

Handle tool end — move to completed list.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_end(self, tool_call_id: str) -> None:
    """Handle tool end — move to completed list."""
    if self.state.active_tool and self.state.active_tool.tool_call_id == tool_call_id:
        self.state.complete_active_tool()

on_state_snapshot async

on_state_snapshot(snapshot)

Handle state snapshot.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_state_snapshot(self, snapshot: dict[str, Any]) -> None:
    """Handle state snapshot."""
    self.state.state_snapshot = snapshot
    if "phase" in snapshot:
        self.state.current_phase = str(snapshot["phase"])

on_run_finished async

on_run_finished(thread_id, run_id)

Handle run finished.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_run_finished(self, thread_id: str, run_id: str) -> None:
    """Handle run finished."""
    self.state.completed = True

on_error async

on_error(message, code=None)

Handle error.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_error(self, message: str, code: str | None = None) -> None:
    """Handle error."""
    error_text = f"[{code}] {message}" if code else message
    self.state.error = error_text

AgentDisplayState

Bases: StrictModel

Mutable state for agent display rendering.

This class tracks all state needed to render the agent's progress in a terminal or UI. It accumulates streaming text, tracks active and completed tool calls, and maintains error state.

Attributes:

Name Type Description
current_phase str

Description of current analysis phase.

text_buffer str

Accumulated streaming text from agent.

active_message_id str | None

ID of currently streaming message.

active_tool ToolCallState | None

Currently executing tool state (if any).

completed_tools list[ToolCallState]

List of completed tool executions.

state_snapshot dict[str, Any]

Latest state snapshot from agent.

error str | None

Error message if run failed.

completed bool

Whether the run has finished.

thread_id str | None

Current thread identifier.

run_id str | None

Current run identifier.

Example

state = AgentDisplayState() state.text_buffer += "Analyzing repository..." state.active_tool = ToolCallState(tool_call_id="t1", tool_name="rg_search")

clear_text_buffer

clear_text_buffer()

Clear and return the text buffer.

Returns:

Type Description
str

The text buffer contents before clearing.

Source code in src/code_context_agent/consumer/state.py
def clear_text_buffer(self) -> str:
    """Clear and return the text buffer.

    Returns:
        The text buffer contents before clearing.
    """
    text = self.text_buffer
    self.text_buffer = ""
    return text

complete_active_tool

complete_active_tool(result=None)

Mark the active tool as completed and move to history.

Parameters:

Name Type Description Default
result Any

Optional result to store with the tool call.

None
Source code in src/code_context_agent/consumer/state.py
def complete_active_tool(self, result: Any = None) -> None:
    """Mark the active tool as completed and move to history.

    Args:
        result: Optional result to store with the tool call.
    """
    if self.active_tool:
        self.active_tool.status = "completed"
        self.active_tool.result = result
        self.completed_tools.append(self.active_tool)
        self.active_tool = None

get_recent_tools

get_recent_tools(count=5)

Get the most recent completed tools.

Parameters:

Name Type Description Default
count int

Maximum number of tools to return.

5

Returns:

Type Description
list[ToolCallState]

List of recently completed tool states.

Source code in src/code_context_agent/consumer/state.py
def get_recent_tools(self, count: int = 5) -> list[ToolCallState]:
    """Get the most recent completed tools.

    Args:
        count: Maximum number of tools to return.

    Returns:
        List of recently completed tool states.
    """
    return self.completed_tools[-count:]

get_tool_stats

get_tool_stats()

Get tool call counts grouped by prefix.

Groups tools by their name prefix (e.g., lsp_, code_graph_) for display in the TUI dashboard.

Returns:

Type Description
dict[str, int]

Dictionary mapping tool prefixes to call counts.

Source code in src/code_context_agent/consumer/state.py
def get_tool_stats(self) -> dict[str, int]:
    """Get tool call counts grouped by prefix.

    Groups tools by their name prefix (e.g., lsp_*, code_graph_*) for
    display in the TUI dashboard.

    Returns:
        Dictionary mapping tool prefixes to call counts.
    """
    stats: dict[str, int] = {}
    for tool in self.completed_tools:
        # Group by first part of tool name (before underscore)
        parts = tool.tool_name.split("_")
        prefix = parts[0] + "_*" if len(parts) > 1 else tool.tool_name
        stats[prefix] = stats.get(prefix, 0) + 1
    return stats

get_elapsed_seconds

get_elapsed_seconds()

Get elapsed time since run started.

Returns:

Type Description
float

Elapsed seconds, or 0.0 if not started.

Source code in src/code_context_agent/consumer/state.py
def get_elapsed_seconds(self) -> float:
    """Get elapsed time since run started.

    Returns:
        Elapsed seconds, or 0.0 if not started.
    """
    if self.start_time is None:
        return 0.0
    return time.monotonic() - self.start_time

get_tool_elapsed_seconds

get_tool_elapsed_seconds()

Get elapsed time for current tool.

Returns:

Type Description
float

Elapsed seconds for active tool, or 0.0 if no active tool.

Source code in src/code_context_agent/consumer/state.py
def get_tool_elapsed_seconds(self) -> float:
    """Get elapsed time for current tool.

    Returns:
        Elapsed seconds for active tool, or 0.0 if no active tool.
    """
    if self.tool_start_time is None:
        return 0.0
    return time.monotonic() - self.tool_start_time

get_success_count

get_success_count()

Get count of successful tool calls.

Returns:

Type Description
int

Number of completed tools minus errors.

Source code in src/code_context_agent/consumer/state.py
def get_success_count(self) -> int:
    """Get count of successful tool calls.

    Returns:
        Number of completed tools minus errors.
    """
    return len(self.completed_tools) - self.tool_errors

advance_phase

advance_phase(phase)

Advance to a new phase if it's higher than the current one.

Parameters:

Name Type Description Default
phase Any

AnalysisPhase enum value to advance to.

required
Source code in src/code_context_agent/consumer/state.py
def advance_phase(self, phase: Any) -> None:
    """Advance to a new phase if it's higher than the current one.

    Args:
        phase: AnalysisPhase enum value to advance to.
    """
    from .phases import PHASE_DESCRIPTIONS, PHASE_NAMES, PhaseState

    # Don't regress to a lower phase
    if self.phases and phase <= self.phases[-1].phase:
        return

    # Complete the previous phase
    now = time.monotonic()
    if self.phases and not self.phases[-1].is_complete:
        self.phases[-1].completed_at = now

    # Start the new phase
    self.phases.append(
        PhaseState(
            phase=phase,
            name=PHASE_NAMES.get(phase, f"Phase {phase}"),
            description=PHASE_DESCRIPTIONS.get(phase, ""),
            started_at=now,
        ),
    )
    self.current_phase_index = len(self.phases) - 1

add_discovery

add_discovery(event)

Add a discovery event, evicting oldest if at capacity.

Parameters:

Name Type Description Default
event Any

DiscoveryEvent to add.

required
Source code in src/code_context_agent/consumer/state.py
def add_discovery(self, event: Any) -> None:
    """Add a discovery event, evicting oldest if at capacity.

    Args:
        event: DiscoveryEvent to add.
    """
    self.discoveries.append(event)
    # Capped list — evict oldest when over capacity
    while len(self.discoveries) > self.max_discoveries:
        self.discoveries.pop(0)

reset

reset()

Reset state for a new run.

Source code in src/code_context_agent/consumer/state.py
def reset(self) -> None:
    """Reset state for a new run."""
    self.current_phase = ""
    self.text_buffer = ""
    self.active_message_id = None
    self.active_tool = None
    self.completed_tools = []
    self.state_snapshot = {}
    self.error = None
    self.completed = False
    # Reset metrics
    self.start_time = None
    self.turn_count = 0
    self.tool_errors = 0
    self.tool_start_time = None
    # Reset phase tracking (v7)
    self.phases = []
    self.current_phase_index = -1
    self.discoveries = []

ToolCallState

Bases: StrictModel

State for a single tool call.

Attributes:

Name Type Description
tool_call_id str

Unique identifier for the tool call.

tool_name str

Name of the tool being executed.

args_buffer str

Accumulated tool arguments (streaming).

result Any

Tool execution result (when complete).

status str

Current status ("running", "completed", "error").