Skip to content

base

code_context_agent.consumer.base

Abstract base class for consuming agent streaming events.

This module defines the EventConsumer protocol that all consumers must implement. Consumers receive typed events from the AG-UI protocol and can render them in different ways (Rich terminal, JSON output, web UI, etc.).

EventConsumer

Bases: ABC

Protocol for consuming agent streaming events.

Implement this interface to create custom event consumers for different output formats (Rich terminal, JSON, web UI, etc.).

The consumer receives typed AG-UI events and can render them appropriately. Events follow a lifecycle: run starts -> messages/tools -> run finishes.

Example

class LoggingConsumer(EventConsumer): ... async def on_run_started(self, thread_id: str, run_id: str) -> None: ... print(f"Run started: {run_id}")

on_run_started abstractmethod async

on_run_started(thread_id, run_id)

Handle run started event.

Parameters:

Name Type Description Default
thread_id str

Unique identifier for the conversation thread.

required
run_id str

Unique identifier for this agent run.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_run_started(self, thread_id: str, run_id: str) -> None:
    """Handle run started event.

    Args:
        thread_id: Unique identifier for the conversation thread.
        run_id: Unique identifier for this agent run.
    """

on_text_start abstractmethod async

on_text_start(message_id, role)

Handle start of a text message.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the message.

required
role str

Message role (usually "assistant").

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_text_start(self, message_id: str, role: str) -> None:
    """Handle start of a text message.

    Args:
        message_id: Unique identifier for the message.
        role: Message role (usually "assistant").
    """

on_text_content abstractmethod async

on_text_content(message_id, delta)

Handle streaming text content.

Parameters:

Name Type Description Default
message_id str

Identifier of the message being streamed.

required
delta str

New text chunk to append.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_text_content(self, message_id: str, delta: str) -> None:
    """Handle streaming text content.

    Args:
        message_id: Identifier of the message being streamed.
        delta: New text chunk to append.
    """

on_text_end abstractmethod async

on_text_end(message_id)

Handle end of a text message.

Parameters:

Name Type Description Default
message_id str

Identifier of the completed message.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_text_end(self, message_id: str) -> None:
    """Handle end of a text message.

    Args:
        message_id: Identifier of the completed message.
    """

on_tool_start abstractmethod async

on_tool_start(tool_call_id, tool_name)

Handle start of tool execution.

Parameters:

Name Type Description Default
tool_call_id str

Unique identifier for this tool call.

required
tool_name str

Name of the tool being called.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_tool_start(self, tool_call_id: str, tool_name: str) -> None:
    """Handle start of tool execution.

    Args:
        tool_call_id: Unique identifier for this tool call.
        tool_name: Name of the tool being called.
    """

on_tool_args abstractmethod async

on_tool_args(tool_call_id, args_delta)

Handle streaming tool arguments.

Parameters:

Name Type Description Default
tool_call_id str

Identifier of the tool call.

required
args_delta str

JSON string chunk of tool arguments.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_tool_args(self, tool_call_id: str, args_delta: str) -> None:
    """Handle streaming tool arguments.

    Args:
        tool_call_id: Identifier of the tool call.
        args_delta: JSON string chunk of tool arguments.
    """

on_tool_result abstractmethod async

on_tool_result(tool_call_id, result)

Handle tool execution result.

Parameters:

Name Type Description Default
tool_call_id str

Identifier of the completed tool call.

required
result Any

Result returned by the tool.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_tool_result(self, tool_call_id: str, result: Any) -> None:
    """Handle tool execution result.

    Args:
        tool_call_id: Identifier of the completed tool call.
        result: Result returned by the tool.
    """

on_tool_end abstractmethod async

on_tool_end(tool_call_id)

Handle end of tool execution.

Parameters:

Name Type Description Default
tool_call_id str

Identifier of the completed tool call.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_tool_end(self, tool_call_id: str) -> None:
    """Handle end of tool execution.

    Args:
        tool_call_id: Identifier of the completed tool call.
    """

on_state_snapshot abstractmethod async

on_state_snapshot(snapshot)

Handle state snapshot event.

Parameters:

Name Type Description Default
snapshot dict[str, Any]

Complete state snapshot dictionary.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_state_snapshot(self, snapshot: dict[str, Any]) -> None:
    """Handle state snapshot event.

    Args:
        snapshot: Complete state snapshot dictionary.
    """

on_run_finished abstractmethod async

on_run_finished(thread_id, run_id)

Handle run finished event.

Parameters:

Name Type Description Default
thread_id str

Identifier of the conversation thread.

required
run_id str

Identifier of the completed run.

required
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_run_finished(self, thread_id: str, run_id: str) -> None:
    """Handle run finished event.

    Args:
        thread_id: Identifier of the conversation thread.
        run_id: Identifier of the completed run.
    """

on_error abstractmethod async

on_error(message, code=None)

Handle error event.

Parameters:

Name Type Description Default
message str

Error message description.

required
code str | None

Optional error code.

None
Source code in src/code_context_agent/consumer/base.py
@abstractmethod
async def on_error(self, message: str, code: str | None = None) -> None:
    """Handle error event.

    Args:
        message: Error message description.
        code: Optional error code.
    """

start async

start()

Initialize the consumer (optional).

Override to perform setup before events start streaming. Called before the first event is received.

Source code in src/code_context_agent/consumer/base.py
async def start(self) -> None:
    """Initialize the consumer (optional).

    Override to perform setup before events start streaming.
    Called before the first event is received.
    """

stop async

stop()

Cleanup the consumer (optional).

Override to perform cleanup after events stop streaming. Called after the last event is received or on error.

Source code in src/code_context_agent/consumer/base.py
async def stop(self) -> None:
    """Cleanup the consumer (optional).

    Override to perform cleanup after events stop streaming.
    Called after the last event is received or on error.
    """