Skip to main content
Agent SDK is experimental and will have breaking changes!
This example builds on Use an agent. It implements every event handler pattern in one runnable client.

Overview

This client handles every event the harness can emit:
  • Streams assistant tokens in real time
  • Shows tool calls and tool results inline
  • Prompts the user to answer ask_user_question calls
  • Prompts the user to allow or deny tool approvals
  • Shows in-chat MCP OAuth prompts
  • Handles parallel sub-agents
  • Continues the conversation across turns
"""Terminal chat client for the TrueFoundry Agent Harness SDK."""

import argparse, json, os, sys
from typing import Optional, Union

from truefoundry_gateway_sdk import (
    ApprovalAllow,
    ApprovalDeny,
    McpAuthRequiredEvent,
    McpInitializedEvent,
    ModelMessageEvent,
    ModelMessageEventDelta,
    SandboxCreatedEvent,
    Session,
    ThreadCreatedEvent,
    ThreadDoneEvent,
    ToolApprovalRequiredEvent,
    ToolCall,
    ToolResponseEvent,
    ToolResponseRequiredEvent,
    TrueFoundryGateway,
    TurnCreatedEvent,
    TurnDoneEvent,
    TurnDoneStatus,
    TurnErrorState,
    TurnEvent,
    TurnInput,
    UserMessage,
    UserToolApproval,
    UserToolResponse,
    is_event_delta,
    merge_event_delta,
)

PendingNextTurnRequest = Union[ToolApprovalRequiredEvent, ToolResponseRequiredEvent, McpAuthRequiredEvent]


class ChatSession:
    """Holds turn-to-turn state for one conversation."""

    def __init__(self, session: Session) -> None:
        self.session = session
        # id-keyed index of assembled events. Base model.message events are stored here
        # and their model.message.delta fragments merge into them in place.
        self.events: dict[str, TurnEvent] = {}
        self.open_subagent_threads: set[str] = set()
        self.thread_labels: dict[str, str] = {"main": "main"}
        self.pending_next_turn_requests: list[PendingNextTurnRequest] = []
        # id of the model.message currently streaming on each thread. A later event on
        # the same thread with a different id means that message is done, so flush it.
        self._streaming_ids: dict[str, str] = {}
        self._main_streaming = False

    def label(self, thread_id: str) -> str:
        return self.thread_labels.get(thread_id, "main")

    def _handle_model_message_delta(self, event: ModelMessageEventDelta) -> None:
        # Stream main-thread text live; the flush happens later, driven by id change.
        if event.thread_id == "main" and event.content:
            if not self._main_streaming:
                print("\nassistant: ", end="", flush=True)
                self._main_streaming = True
            print(event.content, end="", flush=True)

    def _flush_model_message(self, msg: ModelMessageEvent) -> None:
        # The message is complete: finalize the streamed line, print any non-streamed
        # content, and emit the now fully assembled tool calls.
        thread_id = msg.thread_id
        label = self.label(thread_id)

        if thread_id == "main":
            if self._main_streaming:
                print()
                self._main_streaming = False
            elif msg.content:
                print(f"\nassistant: {msg.content}")
        elif thread_id in self.open_subagent_threads and msg.content:
            print(f"\n[{label}] {msg.content}")

        for tool_call in msg.tool_calls:
            if not tool_call.id or not tool_call.function.name:
                continue
            if tool_call.tool_info.name == "ask_user_question":
                continue
            self._print_tool_call(thread_id, tool_call)

    def _print_tool_call(self, thread_id: str, tool_call: ToolCall) -> None:
        label = self.label(thread_id)
        try:
            args = json.loads(tool_call.function.arguments or "{}")
        except json.JSONDecodeError:
            args = tool_call.function.arguments

        args_preview = json.dumps(args)[:160] if isinstance(args, dict) else str(args)[:160]
        prefix = "main" if thread_id == "main" else label
        print(f"\n[{prefix} -> {tool_call.tool_info.name}] {args_preview}")

    def _handle_tool_response(self, event: ToolResponseEvent) -> None:
        label = self.label(event.thread_id)
        prefix = "main" if event.thread_id == "main" else label
        preview = event.content[:200].replace("\n", " ")
        print(f"\n[{prefix} tool result] {preview}")

    def _prompt_client_side_response(
        self,
        pending_event: ToolResponseRequiredEvent,
        tool_call: ToolCall,
    ) -> UserToolResponse:
        if tool_call.tool_info.name == "ask_user_question":
            try:
                args = json.loads(tool_call.function.arguments or "{}")
            except json.JSONDecodeError:
                args = {}
            question = args.get("question", "Answer:")
            options = args.get("options") or []
            print(f"\nassistant asks: {question}")
            for idx, option in enumerate(options, 1):
                print(f"  {idx}. {option}")
            if options:
                print("  Or type a free-form answer.")
            answer = input("you: ").strip()
            if options and answer.isdigit() and 1 <= int(answer) <= len(options):
                content = options[int(answer) - 1]
            else:
                content = answer
        else:
            print(f"\nclient-side tool response required: {tool_call.tool_info.name}")
            print(f"  arguments: {tool_call.function.arguments}")
            content = input("you: ").strip()

        return UserToolResponse(
            thread_id=pending_event.thread_id,
            tool_call_id=tool_call.id,
            content=content,
        )

    def _prompt_approval(
        self,
        pending_event: ToolApprovalRequiredEvent,
        tool_call: ToolCall,
    ) -> UserToolApproval:
        try:
            arguments = json.loads(tool_call.function.arguments or "{}")
        except json.JSONDecodeError:
            arguments = tool_call.function.arguments

        print(f"\napproval needed: {tool_call.tool_info.name}")
        print(f"  arguments: {json.dumps(arguments, indent=2)}")
        choice = input("  allow / deny / reason for denial: ").strip().lower()
        if choice in {"a", "allow", "yes", "y"}:
            approval = ApprovalAllow()
        elif choice in {"d", "deny", "no", "n"}:
            approval = ApprovalDeny()
        else:
            approval = ApprovalDeny(reason=choice)

        return UserToolApproval(
            thread_id=pending_event.thread_id,
            tool_call_id=tool_call.id,
            approval=approval,
        )

    def _prompt_mcp_auth_required(self, event: McpAuthRequiredEvent) -> None:
        print("\n[auth] MCP authorization required. Complete OAuth for each server:")
        for server in event.servers:
            print(f"        {server.mcp_server_name}: {server.auth_url}")
        while True:
            answer = input("        Type 'yes' once you've authenticated: ").strip().lower()
            if answer in {"y", "yes"}:
                break

    def build_next_turn_input(self) -> TurnInput:
        next_turn: list[UserToolApproval | UserToolResponse] = []
        pending = self.pending_next_turn_requests[:]
        self.pending_next_turn_requests.clear()

        for pending_event in pending:
            if isinstance(pending_event, McpAuthRequiredEvent):
                self._prompt_mcp_auth_required(pending_event)
                continue
            for tool_ref in pending_event.tool_calls:
                # tool_ref.event_id points at the model.message that emitted this tool
                # call; find the matching tool call inside it by id.
                model_message = self.events[tool_ref.event_id]
                tool_call = next(
                    tc for tc in model_message.tool_calls if tc.id == tool_ref.id
                )
                if isinstance(pending_event, ToolResponseRequiredEvent):
                    next_turn.append(
                        self._prompt_client_side_response(pending_event, tool_call)
                    )
                else:
                    next_turn.append(
                        self._prompt_approval(pending_event, tool_call)
                    )

        return next_turn

    def _handle_event(self, event: object) -> Optional[TurnDoneStatus]:
        # Keep the id-keyed index current: store base events, merge deltas into them.
        if is_event_delta(event):
            merge_event_delta(self.events[event.id], event)
        elif isinstance(event, ModelMessageEvent):
            self.events[event.id] = event

        # A later event on a thread with a different id means that thread's streaming
        # model.message is complete - flush it before handling the new event.
        thread_id = getattr(event, "thread_id", None)
        if thread_id is not None:
            streaming_id = self._streaming_ids.get(thread_id)
            if streaming_id is not None and streaming_id != event.id:
                self._flush_model_message(self.events[streaming_id])
                del self._streaming_ids[thread_id]

        if isinstance(event, TurnCreatedEvent):
            return None

        if isinstance(event, ThreadCreatedEvent):
            if event.thread_id != "main":
                self.open_subagent_threads.add(event.thread_id)
                name = event.agent_info.name
                self.thread_labels[event.thread_id] = name
                print(f"\n[subagent {name} started]")
            return None

        if isinstance(event, ThreadDoneEvent):
            if event.thread_id != "main":
                self.open_subagent_threads.discard(event.thread_id)
                label = self.label(event.thread_id)
                if event.status == "error":
                    print(f"\n[{label} error] {event.message}")
                else:
                    print(f"[subagent {label} finished]")
            return None

        if isinstance(event, McpInitializedEvent):
            for server in event.content:
                print(f"[mcp] connected to {server.mcp_server_name}")
            return None

        if isinstance(event, McpAuthRequiredEvent):
            self.pending_next_turn_requests.append(event)
            return None

        if isinstance(event, SandboxCreatedEvent):
            print(f"[sandbox] provisioned ({event.sandbox_id})")
            return None

        if isinstance(event, ModelMessageEvent):
            # Base model.message: a new message starts streaming on this thread. Its
            # content and tool_calls fill in as the matching deltas arrive.
            self._streaming_ids[event.thread_id] = event.id
            return None

        if isinstance(event, ModelMessageEventDelta):
            self._handle_model_message_delta(event)
            return None

        if isinstance(event, ToolResponseEvent):
            self._handle_tool_response(event)
            return None

        if isinstance(event, ToolResponseRequiredEvent):
            self.pending_next_turn_requests.append(event)
            return None

        if isinstance(event, ToolApprovalRequiredEvent):
            self.pending_next_turn_requests.append(event)
            return None

        if isinstance(event, TurnDoneEvent):
            # Turn-level event (thread_id is null), so flush any messages still
            # streaming - the last message on each thread has no later event to
            # trigger its flush.
            for streaming_id in self._streaming_ids.values():
                self._flush_model_message(self.events[streaming_id])
            self._streaming_ids.clear()

            # event.state is the terminal TurnTerminalState (same as wait_for_completion()).
            if isinstance(event.state, TurnErrorState):
                print(f"\n[error] {event.state.message}")
            return event.state.status

        print(f"\n[unknown event] {event}")
        return None

    def run_turn(self, input_items: TurnInput | None = None) -> TurnDoneStatus:
        last_status: TurnDoneStatus = "done"
        turn = self.session.create_turn(input=input_items)
        for message in turn.stream():
            status = self._handle_event(message)
            if status is not None:
                last_status = status
        return last_status


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Terminal chat client for the TrueFoundry Agent Harness.",
    )
    parser.add_argument(
        "agent_name",
        nargs="?",
        default=os.environ.get("AGENT_NAME"),
        help="Registered agent name (default: AGENT_NAME environment variable)",
    )
    args = parser.parse_args()
    if not args.agent_name:
        parser.error("agent_name is required (pass as argument or set AGENT_NAME)")
    return args


def main() -> None:
    args = parse_args()

    base_url = os.environ.get("GATEWAY_BASE_URL")
    api_key = os.environ.get("TFY_API_KEY")
    if not base_url or not api_key:
        print("Set GATEWAY_BASE_URL and TFY_API_KEY environment variables.")
        sys.exit(1)

    client = TrueFoundryGateway(
        base_url=base_url,
        api_key=api_key,
    )
    session = client.agents.create_session(agent_name=args.agent_name)
    chat = ChatSession(session)

    print("Type a message, or Ctrl-D to exit.")
    while True:
        while chat.pending_next_turn_requests:
            chat.run_turn(chat.build_next_turn_input())

        try:
            text = input("\nyou: ").strip()
        except EOFError:
            print()
            break
        if not text:
            continue
        chat.run_turn([UserMessage(content=text)])


if __name__ == "__main__":
    main()

Sample run

$ python example.py support-bot
Type a message, or Ctrl-D to exit.

you: Refund invoice INV-2031 for tenant acme.
[mcp] connected to truefoundry-mcp
[sandbox] provisioned (my-tenant.a1b2c3d4)

assistant asks: Which refund reason should I record?
  1. Customer churn
  2. Billing error
  3. Goodwill credit
  Or type a free-form answer.
you: 2

assistant: Found invoice INV-2031 for $1,240.00 on 2026-04-12.

approval needed: process_refund
  arguments: {
  "invoice_id": "INV-2031",
  "amount": 1240.0,
  "reason": "Billing error"
}
  allow / deny / reason for denial: allow

[main tool result] {"refund_id":"RF-9914","status":"completed"}

assistant: Refund RF-9914 of $1,240.00 was processed for INV-2031.

you: ^D

Adapting the flow

A few common variations on top of the same skeleton:
  • Persisted history. Save session.id after the first Turn. On the next process start, call client.agents.get_session(session_id) and use list_turns() with turn.list_events() to rebuild UI state, or turn.stream() if the latest Turn is still running.
  • JSON output for piping. Replace the print calls inside _handle_event with json.dumps(event.model_dump()) to emit one event per line for downstream tools.
  • Generative UI. When you detect a fenced ```openui block in assembled model.message content, hand the block to the OpenUI React renderer instead of printing it. Everything else stays the same.
  • Browser / web UI. The same event shape is delivered over Server-Sent Events on POST /sessions/{session_id}/turns. Replace session.create_turn with an SSE consumer; the handlers above are unchanged.