Skip to content

rich_consumer

code_context_agent.consumer.rich_consumer

Rich terminal display consumer for agent events.

This module provides a dashboard-style event consumer focused on tool execution status rather than streaming text. The display is fixed-height and never grows — it shows what's running, what finished, and progress.

RichEventConsumer

RichEventConsumer(console=None, *, mode='standard')

Bases: EventConsumer

Dashboard-style consumer for agent execution.

Shows a fixed-height panel with: - Timer and progress bar - Tool category breakdown with mini bars - Active tool with spinner - Recent tool history with timing and status

No streaming text display. The agent's reasoning is not shown — only tool execution status matters for a long-running analysis agent.

Initialize the dashboard consumer.

Parameters:

Name Type Description Default
console Console | None

Optional Rich Console instance.

None
mode str

Analysis mode ("standard" or "full").

'standard'
Source code in src/code_context_agent/consumer/rich_consumer.py
def __init__(self, console: Console | None = None, *, mode: str = "standard") -> None:
    """Initialize the dashboard consumer.

    Args:
        console: Optional Rich Console instance.
        mode: Analysis mode ("standard" or "full").
    """
    self.console = console or Console()
    self.state = AgentDisplayState()
    self._live: Live | None = None
    self._mode = mode

start async

start()

Start the dashboard display.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def start(self) -> None:
    """Start the dashboard display."""
    self.state.reset()
    self._live = Live(
        self._build_display(),
        console=self.console,
        refresh_per_second=2,
        transient=True,
        vertical_overflow="ellipsis",
    )
    # Rich auto-refresh calls get_renderable — point it at our builder
    # so the dashboard always shows fresh state (timer, tool elapsed, etc.)
    self._live.get_renderable = self._build_display  # type: ignore[assignment]
    self._live.start()

stop async

stop()

Stop the dashboard and print final summary.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def stop(self) -> None:
    """Stop the dashboard and print final summary."""
    if self._live:
        self._live.stop()
        self._live = None

    # Print a static final summary after Live is gone
    elapsed = self.state.get_elapsed_seconds()
    total = len(self.state.completed_tools)
    errors = self.state.tool_errors
    self.console.print(
        f"[dim]  {total} tools in {self._format_time(elapsed)}{total - errors}{errors}[/dim]",
    )

on_run_started async

on_run_started(thread_id, run_id)

Handle run started.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_run_started(self, thread_id: str, run_id: str) -> None:
    """Handle run started."""
    self.state.thread_id = thread_id
    self.state.run_id = run_id
    self.state.start_time = time.monotonic()

on_text_start async

on_text_start(message_id, role)

Handle text start — no display action.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_start(self, message_id: str, role: str) -> None:
    """Handle text start — no display action."""
    self.state.active_message_id = message_id

on_text_content async

on_text_content(message_id, delta)

Handle text delta — silently accumulate (not displayed).

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_content(self, message_id: str, delta: str) -> None:
    """Handle text delta — silently accumulate (not displayed)."""
    self.state.text_buffer += delta

on_text_end async

on_text_end(message_id)

Handle text end — increment turn counter.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_end(self, message_id: str) -> None:
    """Handle text end — increment turn counter."""
    self.state.active_message_id = None
    self.state.turn_count += 1
    self.state.text_buffer = ""

on_tool_start async

on_tool_start(tool_call_id, tool_name)

Handle tool start — show in active spinner.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_start(self, tool_call_id: str, tool_name: str) -> None:
    """Handle tool start — show in active spinner."""
    self.state.active_tool = ToolCallState(
        tool_call_id=tool_call_id,
        tool_name=tool_name,
    )
    self.state.tool_start_time = time.monotonic()
    self._detect_phase(tool_name)

on_tool_args async

on_tool_args(tool_call_id, args_delta)

Handle tool args — silently accumulate.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_args(self, tool_call_id: str, args_delta: str) -> None:
    """Handle tool args — silently accumulate."""
    if self.state.active_tool and self.state.active_tool.tool_call_id == tool_call_id:
        self.state.active_tool.args_buffer += args_delta

on_tool_result async

on_tool_result(tool_call_id, result)

Handle tool result — check for errors and extract discoveries.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_result(self, tool_call_id: str, result: Any) -> None:
    """Handle tool result — check for errors and extract discoveries."""
    if self.state.active_tool and self.state.active_tool.tool_call_id == tool_call_id:
        self.state.active_tool.result = result
        if (isinstance(result, str) and "error" in result.lower()) or (
            isinstance(result, dict) and result.get("error")
        ):
            self.state.tool_errors += 1
        # Extract discovery events
        discovery = self._extract_discovery(self.state.active_tool.tool_name, result)
        if discovery:
            self.state.add_discovery(discovery)

on_tool_end async

on_tool_end(tool_call_id)

Handle tool end — move to completed list.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_end(self, tool_call_id: str) -> None:
    """Handle tool end — move to completed list."""
    if self.state.active_tool and self.state.active_tool.tool_call_id == tool_call_id:
        self.state.complete_active_tool()

on_state_snapshot async

on_state_snapshot(snapshot)

Handle state snapshot.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_state_snapshot(self, snapshot: dict[str, Any]) -> None:
    """Handle state snapshot."""
    self.state.state_snapshot = snapshot
    if "phase" in snapshot:
        self.state.current_phase = str(snapshot["phase"])

on_run_finished async

on_run_finished(thread_id, run_id)

Handle run finished.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_run_finished(self, thread_id: str, run_id: str) -> None:
    """Handle run finished."""
    self.state.completed = True

on_error async

on_error(message, code=None)

Handle error.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_error(self, message: str, code: str | None = None) -> None:
    """Handle error."""
    error_text = f"[{code}] {message}" if code else message
    self.state.error = error_text

QuietConsumer

QuietConsumer()

Bases: EventConsumer

Silent consumer that only writes errors to stderr.

Initialize quiet consumer.

Source code in src/code_context_agent/consumer/rich_consumer.py
def __init__(self) -> None:
    """Initialize quiet consumer."""
    self._stderr = Console(stderr=True, no_color=True, highlight=False)

on_run_started async

on_run_started(thread_id, run_id)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_run_started(self, thread_id: str, run_id: str) -> None:
    """No output."""

on_text_start async

on_text_start(message_id, role)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_start(self, message_id: str, role: str) -> None:
    """No output."""

on_text_content async

on_text_content(message_id, delta)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_content(self, message_id: str, delta: str) -> None:
    """No output."""

on_text_end async

on_text_end(message_id)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_text_end(self, message_id: str) -> None:
    """No output."""

on_tool_start async

on_tool_start(tool_call_id, tool_name)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_start(self, tool_call_id: str, tool_name: str) -> None:
    """No output."""

on_tool_args async

on_tool_args(tool_call_id, args_delta)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_args(self, tool_call_id: str, args_delta: str) -> None:
    """No output."""

on_tool_result async

on_tool_result(tool_call_id, result)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_result(self, tool_call_id: str, result: Any) -> None:
    """No output."""

on_tool_end async

on_tool_end(tool_call_id)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_tool_end(self, tool_call_id: str) -> None:
    """No output."""

on_state_snapshot async

on_state_snapshot(snapshot)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_state_snapshot(self, snapshot: dict[str, Any]) -> None:
    """No output."""

on_run_finished async

on_run_finished(thread_id, run_id)

No output.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_run_finished(self, thread_id: str, run_id: str) -> None:
    """No output."""

on_error async

on_error(message, code=None)

Print error to stderr.

Source code in src/code_context_agent/consumer/rich_consumer.py
async def on_error(self, message: str, code: str | None = None) -> None:
    """Print error to stderr."""
    error_text = f"[{code}] {message}" if code else message
    self._stderr.print(f"Error: {error_text}")

start async

start()

Initialize the consumer (optional).

Override to perform setup before events start streaming. Called before the first event is received.

Source code in src/code_context_agent/consumer/base.py
async def start(self) -> None:
    """Initialize the consumer (optional).

    Override to perform setup before events start streaming.
    Called before the first event is received.
    """

stop async

stop()

Cleanup the consumer (optional).

Override to perform cleanup after events stop streaming. Called after the last event is received or on error.

Source code in src/code_context_agent/consumer/base.py
async def stop(self) -> None:
    """Cleanup the consumer (optional).

    Override to perform cleanup after events stop streaming.
    Called after the last event is received or on error.
    """