Index
code_context_agent.consumer ¶
Event consumer package for agent streaming output.
EventConsumer ¶
Bases: ABC
Protocol for consuming agent streaming events.
Implement this interface to create custom event consumers for different output formats (Rich terminal, JSON, web UI, etc.).
The consumer receives typed AG-UI events and can render them appropriately. Events follow a lifecycle: run starts -> messages/tools -> run finishes.
Example
class LoggingConsumer(EventConsumer): ... async def on_run_started(self, thread_id: str, run_id: str) -> None: ... print(f"Run started: {run_id}")
on_run_started abstractmethod async ¶
Handle run started event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id | str | Unique identifier for the conversation thread. | required |
run_id | str | Unique identifier for this agent run. | required |
Source code in src/code_context_agent/consumer/base.py
on_text_start abstractmethod async ¶
Handle start of a text message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id | str | Unique identifier for the message. | required |
role | str | Message role (usually "assistant"). | required |
on_text_content abstractmethod async ¶
Handle streaming text content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id | str | Identifier of the message being streamed. | required |
delta | str | New text chunk to append. | required |
on_text_end abstractmethod async ¶
Handle end of a text message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id | str | Identifier of the completed message. | required |
on_tool_start abstractmethod async ¶
Handle start of tool execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_call_id | str | Unique identifier for this tool call. | required |
tool_name | str | Name of the tool being called. | required |
Source code in src/code_context_agent/consumer/base.py
on_tool_args abstractmethod async ¶
Handle streaming tool arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_call_id | str | Identifier of the tool call. | required |
args_delta | str | JSON string chunk of tool arguments. | required |
Source code in src/code_context_agent/consumer/base.py
on_tool_result abstractmethod async ¶
Handle tool execution result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_call_id | str | Identifier of the completed tool call. | required |
result | Any | Result returned by the tool. | required |
on_tool_end abstractmethod async ¶
Handle end of tool execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_call_id | str | Identifier of the completed tool call. | required |
on_state_snapshot abstractmethod async ¶
Handle state snapshot event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
snapshot | dict[str, Any] | Complete state snapshot dictionary. | required |
on_run_finished abstractmethod async ¶
Handle run finished event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id | str | Identifier of the conversation thread. | required |
run_id | str | Identifier of the completed run. | required |
Source code in src/code_context_agent/consumer/base.py
on_error abstractmethod async ¶
Handle error event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message | str | Error message description. | required |
code | str | None | Optional error code. | None |
start async ¶
Initialize the consumer (optional).
Override to perform setup before events start streaming. Called before the first event is received.
stop async ¶
Cleanup the consumer (optional).
Override to perform cleanup after events stop streaming. Called after the last event is received or on error.
QuietConsumer ¶
Bases: EventConsumer
Silent consumer that only writes errors to stderr.
Initialize quiet consumer.
Source code in src/code_context_agent/consumer/rich_consumer.py
start async ¶
Initialize the consumer (optional).
Override to perform setup before events start streaming. Called before the first event is received.
stop async ¶
Cleanup the consumer (optional).
Override to perform cleanup after events stop streaming. Called after the last event is received or on error.
on_run_started async ¶
on_text_start async ¶
on_text_content async ¶
on_text_end async ¶
on_tool_start async ¶
on_tool_args async ¶
on_tool_result async ¶
on_tool_end async ¶
on_state_snapshot async ¶
on_run_finished async ¶
on_error async ¶
Print error to stderr.
RichEventConsumer ¶
Bases: EventConsumer
Dashboard-style consumer for agent execution.
Shows a fixed-height panel with: - Timer and progress bar - Tool category breakdown with mini bars - Active tool with spinner - Recent tool history with timing and status
No streaming text display. The agent's reasoning is not shown — only tool execution status matters for a long-running analysis agent.
Initialize the dashboard consumer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
console | Console | None | Optional Rich Console instance. | None |
mode | str | Analysis mode ("standard" or "full"). | 'standard' |
Source code in src/code_context_agent/consumer/rich_consumer.py
start async ¶
Start the dashboard display.
Source code in src/code_context_agent/consumer/rich_consumer.py
stop async ¶
Stop the dashboard and print final summary.
Source code in src/code_context_agent/consumer/rich_consumer.py
on_run_started async ¶
on_text_start async ¶
on_text_content async ¶
on_text_end async ¶
Handle text end — increment turn counter.
on_tool_start async ¶
Handle tool start — show in active spinner.
Source code in src/code_context_agent/consumer/rich_consumer.py
on_tool_args async ¶
Handle tool args — silently accumulate.
Source code in src/code_context_agent/consumer/rich_consumer.py
on_tool_result async ¶
Handle tool result — check for errors and extract discoveries.
Source code in src/code_context_agent/consumer/rich_consumer.py
on_tool_end async ¶
Handle tool end — move to completed list.
on_state_snapshot async ¶
Handle state snapshot.
on_run_finished async ¶
on_error async ¶
AgentDisplayState ¶
Bases: StrictModel
Mutable state for agent display rendering.
This class tracks all state needed to render the agent's progress in a terminal or UI. It accumulates streaming text, tracks active and completed tool calls, and maintains error state.
Attributes:
| Name | Type | Description |
|---|---|---|
current_phase | str | Description of current analysis phase. |
text_buffer | str | Accumulated streaming text from agent. |
active_message_id | str | None | ID of currently streaming message. |
active_tool | ToolCallState | None | Currently executing tool state (if any). |
completed_tools | list[ToolCallState] | List of completed tool executions. |
state_snapshot | dict[str, Any] | Latest state snapshot from agent. |
error | str | None | Error message if run failed. |
completed | bool | Whether the run has finished. |
thread_id | str | None | Current thread identifier. |
run_id | str | None | Current run identifier. |
Example
state = AgentDisplayState() state.text_buffer += "Analyzing repository..." state.active_tool = ToolCallState(tool_call_id="t1", tool_name="rg_search")
clear_text_buffer ¶
Clear and return the text buffer.
Returns:
| Type | Description |
|---|---|
str | The text buffer contents before clearing. |
complete_active_tool ¶
Mark the active tool as completed and move to history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
result | Any | Optional result to store with the tool call. | None |
Source code in src/code_context_agent/consumer/state.py
get_recent_tools ¶
Get the most recent completed tools.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count | int | Maximum number of tools to return. | 5 |
Returns:
| Type | Description |
|---|---|
list[ToolCallState] | List of recently completed tool states. |
Source code in src/code_context_agent/consumer/state.py
get_tool_stats ¶
Get tool call counts grouped by prefix.
Groups tools by their name prefix (e.g., lsp_, code_graph_) for display in the TUI dashboard.
Returns:
| Type | Description |
|---|---|
dict[str, int] | Dictionary mapping tool prefixes to call counts. |
Source code in src/code_context_agent/consumer/state.py
get_elapsed_seconds ¶
Get elapsed time since run started.
Returns:
| Type | Description |
|---|---|
float | Elapsed seconds, or 0.0 if not started. |
Source code in src/code_context_agent/consumer/state.py
get_tool_elapsed_seconds ¶
Get elapsed time for current tool.
Returns:
| Type | Description |
|---|---|
float | Elapsed seconds for active tool, or 0.0 if no active tool. |
Source code in src/code_context_agent/consumer/state.py
get_success_count ¶
Get count of successful tool calls.
Returns:
| Type | Description |
|---|---|
int | Number of completed tools minus errors. |
advance_phase ¶
Advance to a new phase if it's higher than the current one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
phase | Any | AnalysisPhase enum value to advance to. | required |
Source code in src/code_context_agent/consumer/state.py
add_discovery ¶
Add a discovery event, evicting oldest if at capacity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event | Any | DiscoveryEvent to add. | required |
Source code in src/code_context_agent/consumer/state.py
reset ¶
Reset state for a new run.
Source code in src/code_context_agent/consumer/state.py
ToolCallState ¶
Bases: StrictModel
State for a single tool call.
Attributes:
| Name | Type | Description |
|---|---|---|
tool_call_id | str | Unique identifier for the tool call. |
tool_name | str | Name of the tool being executed. |
args_buffer | str | Accumulated tool arguments (streaming). |
result | Any | Tool execution result (when complete). |
status | str | Current status ("running", "completed", "error"). |