"""Terminal chat client for the TrueFoundry Agent Harness SDK."""
import argparse, json, os, sys
from typing import Any, List, Optional, Union
from truefoundry_gateway_sdk import (
AgentApprovalDecisionAllow,
AgentApprovalDecisionDeny,
AgentApprovalDecisionMessage,
AgentApprovalOrToolResponseMessage,
AgentApprovalRequired,
AgentClientSideToolResponseMessage,
AgentInputUserMessage,
AgentResponsesInput,
AgentResponsesSavedAgent,
AgentToolResponseRequired,
TrueFoundryGateway,
)
from truefoundry_gateway_sdk.helpers import (
EnrichedAssistantMessage,
EnrichedToolCall,
is_assistant_delta,
merge_assistant_message,
)
PendingNextTurnRequest = Union[AgentApprovalRequired, AgentToolResponseRequired]
class ChatSession:
"""Holds turn-to-turn state for one conversation."""
def __init__(
self,
client: TrueFoundryGateway,
request_template: AgentResponsesSavedAgent,
) -> None:
self.client = client
self.request_template = request_template
self.previous_response_id: Optional[str] = None
self.open_subagents: set[str] = set()
self.execution_labels: dict[str, str] = {"root": "root"}
self.assistant_buffers: dict[str, Optional[EnrichedAssistantMessage]] = {}
self.open_tool_calls: dict[str, EnrichedToolCall] = {}
self.pending_next_turn_requests: list[PendingNextTurnRequest] = []
self._root_streaming = False
def label(self, execution_id: str) -> str:
return self.execution_labels.get(execution_id, "root")
def _handle_assistant_delta(self, chunk: Any) -> None:
# Tool calls stream across many deltas: first chunk carries id/name/tool_info,
# later chunks append function.arguments by index, then a finish-only chunk sets
# finish_reason (often with no role or tool_calls). merge_assistant_message folds
# all of that; we only print and register tool calls after finish_reason is set.
execution_id = chunk.execution_id
self.assistant_buffers[execution_id] = merge_assistant_message(
self.assistant_buffers.get(execution_id),
chunk,
)
msg = self.assistant_buffers[execution_id]
assert msg is not None
if execution_id == "root" and chunk.content:
if not self._root_streaming:
sys.stdout.write("\nassistant: ")
self._root_streaming = True
sys.stdout.write(chunk.content)
sys.stdout.flush()
if msg.finish_reason is not None:
self._flush_assistant(execution_id, msg)
self.assistant_buffers[execution_id] = None
def _flush_assistant(self, execution_id: str, msg: EnrichedAssistantMessage) -> None:
label = self.label(execution_id)
if execution_id == "root":
if self._root_streaming:
print()
self._root_streaming = False
elif msg.content:
print(f"\nassistant: {msg.content}")
elif execution_id in self.open_subagents and msg.content:
print(f"\n[{label}] {msg.content}")
for tool_call in msg.tool_calls or []:
if not self._is_complete_tool_call(tool_call):
continue
self.open_tool_calls[tool_call.id] = tool_call
self._print_tool_call(execution_id, tool_call)
@staticmethod
def _is_complete_tool_call(tool_call: EnrichedToolCall) -> bool:
return bool(tool_call.id and tool_call.function.name)
def _print_tool_call(self, execution_id: str, tool_call: EnrichedToolCall) -> None:
label = self.label(execution_id)
tool_info = tool_call.tool_info
fn = tool_call.function
tool_name = fn.name
if tool_info and tool_info.original_tool_name:
tool_name = tool_info.original_tool_name
try:
args = json.loads(fn.arguments or "{}")
except json.JSONDecodeError:
args = fn.arguments
if tool_info and tool_info.is_client_side and fn.name == "ask_user_question":
return
if tool_info and tool_info.is_approval_required:
return
args_preview = json.dumps(args)[:160] if isinstance(args, dict) else str(args)[:160]
prefix = "root" if execution_id == "root" else label
print(f"\n[{prefix} -> {tool_name}] {args_preview}")
def _handle_tool_message(self, event: Any) -> None:
tool_call_id = getattr(event, "tool_call_id", None)
if tool_call_id:
self.open_tool_calls.pop(tool_call_id, None)
label = self.label(event.execution_id)
prefix = "root" if event.execution_id == "root" else label
content = getattr(event, "content", None) or ""
preview = content[:200].replace("\n", " ")
print(f"\n[{prefix} tool result] {preview}")
def _prompt_client_side_response(
self,
pending_event: AgentToolResponseRequired,
tool_call_id: str,
tool_call: Optional[EnrichedToolCall],
) -> AgentClientSideToolResponseMessage:
content = ""
fn_name = None
if tool_call is not None:
fn_name = tool_call.function.name
args = json.loads(tool_call.function.arguments or "{}")
tool_info = tool_call.tool_info
if fn_name == "ask_user_question" or (tool_info and tool_info.is_client_side):
question = args.get("question", "Answer:")
options = args.get("options") or []
print(f"\nassistant asks: {question}")
for idx, option in enumerate(options, 1):
print(f" {idx}. {option}")
if options:
print(" Or type a free-form answer.")
answer = input("you: ").strip()
if options and answer.isdigit() and 1 <= int(answer) <= len(options):
content = options[int(answer) - 1]
else:
content = answer
else:
print(f"\nclient-side tool response required: {fn_name}")
content = input("you: ").strip()
else:
content = input("you: ").strip()
return AgentClientSideToolResponseMessage(
execution_id=pending_event.execution_id,
tool_call_id=tool_call_id,
content=content,
)
def _prompt_approval(
self,
pending_event: AgentApprovalRequired,
tool_call_id: str,
tool_call: Optional[EnrichedToolCall],
) -> AgentApprovalDecisionMessage:
tool_name = tool_call_id
arguments: Any = {}
if tool_call is not None:
tool_info = tool_call.tool_info
tool_name = (
tool_info.original_tool_name
if tool_info
else tool_call.function.name
) or tool_call.function.name
try:
arguments = json.loads(tool_call.function.arguments or "{}")
except json.JSONDecodeError:
arguments = tool_call.function.arguments
print(f"\napproval needed: {tool_name}")
print(f" arguments: {json.dumps(arguments, indent=2)}")
choice = input(" allow / deny / reason for denial: ").strip().lower()
if choice in {"a", "allow", "yes", "y"}:
approval = AgentApprovalDecisionAllow()
elif choice in {"d", "deny", "no", "n"}:
approval = AgentApprovalDecisionDeny()
else:
approval = AgentApprovalDecisionDeny(reason=choice)
return AgentApprovalDecisionMessage(
execution_id=pending_event.execution_id,
tool_call_id=tool_call_id,
approval=approval,
)
def build_next_turn_input(self) -> List[AgentApprovalOrToolResponseMessage]:
next_turn: List[AgentApprovalOrToolResponseMessage] = []
pending = self.pending_next_turn_requests[:]
self.pending_next_turn_requests.clear()
for pending_event in pending:
for tool_ref in pending_event.tool_calls:
tool_call = self.open_tool_calls.get(tool_ref.id)
if pending_event.type == "tool.response_required":
next_turn.append(self._prompt_client_side_response(pending_event, tool_ref.id, tool_call))
elif pending_event.type == "tool.approval_required":
next_turn.append(self._prompt_approval(pending_event, tool_ref.id, tool_call))
return next_turn
def _handle_event(self, event: Any) -> None:
t = event.type
if t == "response.created":
self.previous_response_id = event.response_id
return
elif t == "agent.created":
self.open_subagents.add(event.execution_id)
name = event.agent_info.name
self.execution_labels[event.execution_id] = name
print(f"\n[subagent {name} started]")
return
elif t == "agent.done":
self.open_subagents.discard(event.execution_id)
label = self.label(event.execution_id)
print(f"[subagent {label} finished]")
return
elif t == "agent.error":
label = self.label(event.execution_id)
print(f"\n[{label} error] {event.message}")
return
elif t == "mcp.initialize":
for server in event.content:
print(f"[mcp] connected to {server.mcp_server_name}")
return
elif t == "mcp.auth_required":
print("\n[auth] MCP authorization required. Complete OAuth for each server:")
for server in event.servers:
print(f" {server.mcp_server_name}: {server.auth_url}")
print(" Press Enter once you've finished the OAuth flow.")
input()
self.run_turn(None)
return
elif t == "sandbox.created":
print(f"[sandbox] provisioned ({event.sandbox_id})")
return
elif is_assistant_delta(event):
self._handle_assistant_delta(event)
return
elif t == "agent.message" and getattr(event, "role", None) == "tool":
self._handle_tool_message(event)
return
elif t == "tool.response_required":
self.pending_next_turn_requests.append(event)
return
elif t == "tool.approval_required":
self.pending_next_turn_requests.append(event)
return
elif t == "response.done" and event.status == "error":
message = getattr(event, "message", None)
if message:
print(f"\n[error] {message}")
else:
print(f"\n[unknown event] {event}")
def run_turn(self, input_items: Optional[AgentResponsesInput]) -> str:
request = self.request_template.model_copy(
update={
"previous_response_id": self.previous_response_id,
"input": input_items,
}
)
last_status = "done"
for event in self.client.agents.responses.create(request=request):
self._handle_event(event)
if event.type == "response.done":
last_status = event.status
return last_status
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Terminal chat client for the TrueFoundry Agent Harness.",
)
parser.add_argument(
"agent_name",
nargs="?",
default=os.environ.get("AGENT_NAME"),
help="Registered agent name (default: AGENT_NAME environment variable)",
)
args = parser.parse_args()
if not args.agent_name:
parser.error("agent_name is required (pass as argument or set AGENT_NAME)")
return args
def main() -> None:
args = parse_args()
base_url = os.environ.get("GATEWAY_BASE_URL")
api_key = os.environ.get("TFY_API_KEY")
if not base_url or not api_key:
print("Set GATEWAY_BASE_URL and TFY_API_KEY environment variables.")
sys.exit(1)
client = TrueFoundryGateway(
base_url=base_url,
api_key=api_key
)
session = ChatSession(
client,
AgentResponsesSavedAgent(agent_name=args.agent_name),
)
print("Type a message, or Ctrl-D to exit.")
while True:
while session.pending_next_turn_requests:
tool_input = session.build_next_turn_input()
if tool_input:
session.run_turn(tool_input)
try:
text = input("\nyou: ").strip()
except EOFError:
print()
break
if not text:
continue
session.run_turn(
[AgentInputUserMessage(role="user", content=text)]
)
if __name__ == "__main__":
main()