Agent SDK is experimental and will have breaking changes!
Overview
This client handles every event the harness can emit:- Streams assistant tokens in real time
- Shows tool calls and tool results inline
- Prompts the user to answer
ask_user_questioncalls - Prompts the user to allow or deny tool approvals
- Shows in-chat MCP OAuth prompts
- Handles parallel sub-agents
- Continues the conversation across turns
"""Terminal chat client for the TrueFoundry Agent Harness SDK."""
import argparse, json, os, sys
from typing import Optional, Union
from truefoundry_gateway_sdk import (
ApprovalAllow,
ApprovalDeny,
McpAuthRequiredEvent,
McpInitializedEvent,
ModelMessageEvent,
ModelMessageEventDelta,
SandboxCreatedEvent,
Session,
ThreadCreatedEvent,
ThreadDoneEvent,
ToolApprovalRequiredEvent,
ToolCall,
ToolResponseEvent,
ToolResponseRequiredEvent,
TrueFoundryGateway,
TurnCreatedEvent,
TurnDoneEvent,
TurnDoneStatus,
TurnErrorState,
TurnEvent,
TurnInput,
UserMessage,
UserToolApproval,
UserToolResponse,
is_event_delta,
merge_event_delta,
)
PendingNextTurnRequest = Union[ToolApprovalRequiredEvent, ToolResponseRequiredEvent, McpAuthRequiredEvent]
class ChatSession:
"""Holds turn-to-turn state for one conversation."""
def __init__(self, session: Session) -> None:
self.session = session
# id-keyed index of assembled events. Base model.message events are stored here
# and their model.message.delta fragments merge into them in place.
self.events: dict[str, TurnEvent] = {}
self.open_subagent_threads: set[str] = set()
self.thread_labels: dict[str, str] = {"main": "main"}
self.pending_next_turn_requests: list[PendingNextTurnRequest] = []
# id of the model.message currently streaming on each thread. A later event on
# the same thread with a different id means that message is done, so flush it.
self._streaming_ids: dict[str, str] = {}
self._main_streaming = False
def label(self, thread_id: str) -> str:
return self.thread_labels.get(thread_id, "main")
def _handle_model_message_delta(self, event: ModelMessageEventDelta) -> None:
# Stream main-thread text live; the flush happens later, driven by id change.
if event.thread_id == "main" and event.content:
if not self._main_streaming:
print("\nassistant: ", end="", flush=True)
self._main_streaming = True
print(event.content, end="", flush=True)
def _flush_model_message(self, msg: ModelMessageEvent) -> None:
# The message is complete: finalize the streamed line, print any non-streamed
# content, and emit the now fully assembled tool calls.
thread_id = msg.thread_id
label = self.label(thread_id)
if thread_id == "main":
if self._main_streaming:
print()
self._main_streaming = False
elif msg.content:
print(f"\nassistant: {msg.content}")
elif thread_id in self.open_subagent_threads and msg.content:
print(f"\n[{label}] {msg.content}")
for tool_call in msg.tool_calls:
if not tool_call.id or not tool_call.function.name:
continue
if tool_call.tool_info.name == "ask_user_question":
continue
self._print_tool_call(thread_id, tool_call)
def _print_tool_call(self, thread_id: str, tool_call: ToolCall) -> None:
label = self.label(thread_id)
try:
args = json.loads(tool_call.function.arguments or "{}")
except json.JSONDecodeError:
args = tool_call.function.arguments
args_preview = json.dumps(args)[:160] if isinstance(args, dict) else str(args)[:160]
prefix = "main" if thread_id == "main" else label
print(f"\n[{prefix} -> {tool_call.tool_info.name}] {args_preview}")
def _handle_tool_response(self, event: ToolResponseEvent) -> None:
label = self.label(event.thread_id)
prefix = "main" if event.thread_id == "main" else label
preview = event.content[:200].replace("\n", " ")
print(f"\n[{prefix} tool result] {preview}")
def _prompt_client_side_response(
self,
pending_event: ToolResponseRequiredEvent,
tool_call: ToolCall,
) -> UserToolResponse:
if tool_call.tool_info.name == "ask_user_question":
try:
args = json.loads(tool_call.function.arguments or "{}")
except json.JSONDecodeError:
args = {}
question = args.get("question", "Answer:")
options = args.get("options") or []
print(f"\nassistant asks: {question}")
for idx, option in enumerate(options, 1):
print(f" {idx}. {option}")
if options:
print(" Or type a free-form answer.")
answer = input("you: ").strip()
if options and answer.isdigit() and 1 <= int(answer) <= len(options):
content = options[int(answer) - 1]
else:
content = answer
else:
print(f"\nclient-side tool response required: {tool_call.tool_info.name}")
print(f" arguments: {tool_call.function.arguments}")
content = input("you: ").strip()
return UserToolResponse(
thread_id=pending_event.thread_id,
tool_call_id=tool_call.id,
content=content,
)
def _prompt_approval(
self,
pending_event: ToolApprovalRequiredEvent,
tool_call: ToolCall,
) -> UserToolApproval:
try:
arguments = json.loads(tool_call.function.arguments or "{}")
except json.JSONDecodeError:
arguments = tool_call.function.arguments
print(f"\napproval needed: {tool_call.tool_info.name}")
print(f" arguments: {json.dumps(arguments, indent=2)}")
choice = input(" allow / deny / reason for denial: ").strip().lower()
if choice in {"a", "allow", "yes", "y"}:
approval = ApprovalAllow()
elif choice in {"d", "deny", "no", "n"}:
approval = ApprovalDeny()
else:
approval = ApprovalDeny(reason=choice)
return UserToolApproval(
thread_id=pending_event.thread_id,
tool_call_id=tool_call.id,
approval=approval,
)
def _prompt_mcp_auth_required(self, event: McpAuthRequiredEvent) -> None:
print("\n[auth] MCP authorization required. Complete OAuth for each server:")
for server in event.servers:
print(f" {server.mcp_server_name}: {server.auth_url}")
while True:
answer = input(" Type 'yes' once you've authenticated: ").strip().lower()
if answer in {"y", "yes"}:
break
def build_next_turn_input(self) -> TurnInput:
next_turn: list[UserToolApproval | UserToolResponse] = []
pending = self.pending_next_turn_requests[:]
self.pending_next_turn_requests.clear()
for pending_event in pending:
if isinstance(pending_event, McpAuthRequiredEvent):
self._prompt_mcp_auth_required(pending_event)
continue
for tool_ref in pending_event.tool_calls:
# tool_ref.event_id points at the model.message that emitted this tool
# call; find the matching tool call inside it by id.
model_message = self.events[tool_ref.event_id]
tool_call = next(
tc for tc in model_message.tool_calls if tc.id == tool_ref.id
)
if isinstance(pending_event, ToolResponseRequiredEvent):
next_turn.append(
self._prompt_client_side_response(pending_event, tool_call)
)
else:
next_turn.append(
self._prompt_approval(pending_event, tool_call)
)
return next_turn
def _handle_event(self, event: object) -> Optional[TurnDoneStatus]:
# Keep the id-keyed index current: store base events, merge deltas into them.
if is_event_delta(event):
merge_event_delta(self.events[event.id], event)
elif isinstance(event, ModelMessageEvent):
self.events[event.id] = event
# A later event on a thread with a different id means that thread's streaming
# model.message is complete - flush it before handling the new event.
thread_id = getattr(event, "thread_id", None)
if thread_id is not None:
streaming_id = self._streaming_ids.get(thread_id)
if streaming_id is not None and streaming_id != event.id:
self._flush_model_message(self.events[streaming_id])
del self._streaming_ids[thread_id]
if isinstance(event, TurnCreatedEvent):
return None
if isinstance(event, ThreadCreatedEvent):
if event.thread_id != "main":
self.open_subagent_threads.add(event.thread_id)
name = event.agent_info.name
self.thread_labels[event.thread_id] = name
print(f"\n[subagent {name} started]")
return None
if isinstance(event, ThreadDoneEvent):
if event.thread_id != "main":
self.open_subagent_threads.discard(event.thread_id)
label = self.label(event.thread_id)
if event.status == "error":
print(f"\n[{label} error] {event.message}")
else:
print(f"[subagent {label} finished]")
return None
if isinstance(event, McpInitializedEvent):
for server in event.content:
print(f"[mcp] connected to {server.mcp_server_name}")
return None
if isinstance(event, McpAuthRequiredEvent):
self.pending_next_turn_requests.append(event)
return None
if isinstance(event, SandboxCreatedEvent):
print(f"[sandbox] provisioned ({event.sandbox_id})")
return None
if isinstance(event, ModelMessageEvent):
# Base model.message: a new message starts streaming on this thread. Its
# content and tool_calls fill in as the matching deltas arrive.
self._streaming_ids[event.thread_id] = event.id
return None
if isinstance(event, ModelMessageEventDelta):
self._handle_model_message_delta(event)
return None
if isinstance(event, ToolResponseEvent):
self._handle_tool_response(event)
return None
if isinstance(event, ToolResponseRequiredEvent):
self.pending_next_turn_requests.append(event)
return None
if isinstance(event, ToolApprovalRequiredEvent):
self.pending_next_turn_requests.append(event)
return None
if isinstance(event, TurnDoneEvent):
# Turn-level event (thread_id is null), so flush any messages still
# streaming - the last message on each thread has no later event to
# trigger its flush.
for streaming_id in self._streaming_ids.values():
self._flush_model_message(self.events[streaming_id])
self._streaming_ids.clear()
# event.state is the terminal TurnTerminalState (same as wait_for_completion()).
if isinstance(event.state, TurnErrorState):
print(f"\n[error] {event.state.message}")
return event.state.status
print(f"\n[unknown event] {event}")
return None
def run_turn(self, input_items: TurnInput | None = None) -> TurnDoneStatus:
last_status: TurnDoneStatus = "done"
turn = self.session.create_turn(input=input_items)
for message in turn.stream():
status = self._handle_event(message)
if status is not None:
last_status = status
return last_status
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Terminal chat client for the TrueFoundry Agent Harness.",
)
parser.add_argument(
"agent_name",
nargs="?",
default=os.environ.get("AGENT_NAME"),
help="Registered agent name (default: AGENT_NAME environment variable)",
)
args = parser.parse_args()
if not args.agent_name:
parser.error("agent_name is required (pass as argument or set AGENT_NAME)")
return args
def main() -> None:
args = parse_args()
base_url = os.environ.get("GATEWAY_BASE_URL")
api_key = os.environ.get("TFY_API_KEY")
if not base_url or not api_key:
print("Set GATEWAY_BASE_URL and TFY_API_KEY environment variables.")
sys.exit(1)
client = TrueFoundryGateway(
base_url=base_url,
api_key=api_key,
)
session = client.agents.create_session(agent_name=args.agent_name)
chat = ChatSession(session)
print("Type a message, or Ctrl-D to exit.")
while True:
while chat.pending_next_turn_requests:
chat.run_turn(chat.build_next_turn_input())
try:
text = input("\nyou: ").strip()
except EOFError:
print()
break
if not text:
continue
chat.run_turn([UserMessage(content=text)])
if __name__ == "__main__":
main()
Sample run
$ python example.py support-bot
Type a message, or Ctrl-D to exit.
you: Refund invoice INV-2031 for tenant acme.
[mcp] connected to truefoundry-mcp
[sandbox] provisioned (my-tenant.a1b2c3d4)
assistant asks: Which refund reason should I record?
1. Customer churn
2. Billing error
3. Goodwill credit
Or type a free-form answer.
you: 2
assistant: Found invoice INV-2031 for $1,240.00 on 2026-04-12.
approval needed: process_refund
arguments: {
"invoice_id": "INV-2031",
"amount": 1240.0,
"reason": "Billing error"
}
allow / deny / reason for denial: allow
[main tool result] {"refund_id":"RF-9914","status":"completed"}
assistant: Refund RF-9914 of $1,240.00 was processed for INV-2031.
you: ^D
Adapting the flow
A few common variations on top of the same skeleton:- Persisted history. Save
session.idafter the first Turn. On the next process start, callclient.agents.get_session(session_id)and uselist_turns()withturn.list_events()to rebuild UI state, orturn.stream()if the latest Turn is still running. - JSON output for piping. Replace the
printcalls inside_handle_eventwithjson.dumps(event.model_dump())to emit one event per line for downstream tools. - Generative UI. When you detect a fenced
```openuiblock in assembledmodel.messagecontent, hand the block to the OpenUI React renderer instead of printing it. Everything else stays the same. - Browser / web UI. The same event shape is delivered over Server-Sent Events on
POST /sessions/{session_id}/turns. Replacesession.create_turnwith an SSE consumer; the handlers above are unchanged.