Skip to main content
Coming soon.
Agent Platform is currently in beta. The API may introduce breaking changes.

Overview

This page covers running an agent that is already saved in the Agent Registry. To create or configure an agent, see Agent. The Agent Harness ships with a Python and TypeScript SDK for interacting with agents and consuming the event stream. We also provide a React component for building a chat interface for an agent.

How interaction works

Interaction is organized as a hierarchy: Session → Turn → Events, with threads running underneath.
  • Session — a long-running agent process. The session holds the agent configuration and persists across many Turns.
  • Thread — an execution context inside the session. The main thread is the root agent, which interacts with the user and acts as a coordinator for other agents. It can spin up additional threads, each running a sub-agent. Threads are long-lived - they can live across Turns.
  • Turn — a single user interaction with the session, and the request boundary. Turns within a session are chained, so each Turn builds on the history of the ones before it. Only one Turn can be running in a session at any point in time. A Turn runs until the agent finishes, or until it hits the iteration limit or timeout configured for the agent. A Turn can be cancelled while it is running.
  • Events — a Turn streams events as the agent works. Each event identifies the thread it came from.
  • Event Delta — some events are streamed as a base event followed by a series of deltas that you merge into it, rather than a single payload. The clearest example is model output: a base model.message followed by model.message.delta chunks - all sharing the base event’s id - that you merge into the base as they arrive.
Sequence diagram showing a client creating a session, then chaining multiple turns: each turn streams SSE events such as turn.created and model.message, ends with turn.done, and can resume with user follow-up input such as tool approvals or responses

Creating a Session

A Session is the conversation context for a saved agent. Create one by name with create_session().
import os
from truefoundry_gateway_sdk import TrueFoundryGateway

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)

# A Session is the conversation context for a saved agent.
session = client.agents.create_session(agent_name="support-bot")

Listing sessions for an agent

List the existing Sessions for a saved agent with list_sessions(). It iterates newest-first, transparently paginating. Use it to resume an earlier conversation - each Session exposes its id (persist or reuse it to continue).
import os
from truefoundry_gateway_sdk import TrueFoundryGateway

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)

for session in client.agents.list_sessions(agent_name="support-bot"):
    print("session id:", session.id)
    print("created at:", session.created_at)

Interacting with a Session by creating a Turn

Create a Turn in the session with create_turn(), then call stream() to start the Turn and stream its events as they arrive. The first event is always turn.created (carrying the turn_id); the stream closes with turn.done. Once the stream closes, the Turn is terminal. Call state() to read its final state. To continue the conversation, create another Turn on the same session. Turns within a session are chained automatically, so each Turn sees the history of the ones before it.
Creating a new Turn in a session automatically cancels any other Turn that is still running in that session.
import os
from truefoundry_gateway_sdk import TrueFoundryGateway, UserMessage

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)
session = client.agents.create_session(agent_name="support-bot")

# Each Turn is one request/response cycle.
turn = session.create_turn(
    input=[UserMessage(content="I would like to file a support ticket.")]
)
for message in turn.stream():
    print(message)

# Once the stream closes the Turn is terminal. state() returns its final state.
turn_state = turn.state()
print("final status:", turn_state.status)

# Create another Turn on the same session. Turns are chained automatically,
# so this turn sees the history of the first one.
turn = session.create_turn(
    input=[UserMessage(content="Use my last order, ORD-2031.")]
)
for message in turn.stream():
    print(message)

Listing Turns

List the Turns in a session with list_turns(). Each Turn exposes its input, and state() returns the current state of the run.
list_turns() always returns Turns in descending order (newest-first).
import os
from truefoundry_gateway_sdk import (
    TrueFoundryGateway,
    TurnRunningState,
    TurnDoneState,
    TurnCancelledState,
    TurnErrorState,
)

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)

session = client.agents.get_session(session_id="sess-abc123")

# list_turns() always returns Turns in descending order (newest-first).
for turn in session.list_turns():
    turn_state = turn.state()
    print("turn id:", turn.id)
    # input is the list that started the turn: UserMessage(s), or
    # UserToolApproval / UserToolResponse items.
    print("input:", turn.input)

    if isinstance(turn_state, TurnRunningState):
        print("status: running (no output yet)")
    elif isinstance(turn_state, TurnDoneState):
        print("status: done")
        # output is the final assistant message (model.message), or None.
        if turn_state.output is not None:
            print("output:", turn_state.output.content)
        # required_actions lists any pause events the Turn surfaced
        # (tool.approval_required, tool.response_required, mcp.auth_required).
        for item in turn_state.required_actions:
            print("required action:", item)
    elif isinstance(turn_state, TurnCancelledState):
        print("status: cancelled", turn_state.reason)
    elif isinstance(turn_state, TurnErrorState):
        print("status: error", turn_state.message)

Handling Event Delta while streaming

Most events in a Turn are complete on their own - a single payload you can use directly. Some updates are streamed instead: the base event arrives first, followed by a series of Event Deltas - incremental fragments that you merge into the base. All deltas for one update share the base event’s id, while sequence_number increases across the base and all its deltas.
// Base assembled event arrives first
{ "type": "model.message", "id": "0f3a9c2b-7d41-4e8a-b2c6-1a5f9e3d2b48", "sequence_number": 6, "content": "" }

// Deltas follow: same id, increasing sequence_number, each merged into the base
{ "type": "model.message.delta", "id": "0f3a9c2b-7d41-4e8a-b2c6-1a5f9e3d2b48", "sequence_number": 7, "content": "Hel" }
{ "type": "model.message.delta", "id": "0f3a9c2b-7d41-4e8a-b2c6-1a5f9e3d2b48", "sequence_number": 8, "content": "lo" }
{ "type": "model.message.delta", "id": "0f3a9c2b-7d41-4e8a-b2c6-1a5f9e3d2b48", "sequence_number": 9, "content": "!", "finish_reason": "stop" }

// After merging every delta, the base holds the full message
{ "type": "model.message", "id": "0f3a9c2b-7d41-4e8a-b2c6-1a5f9e3d2b48", "content": "Hello!", "finish_reason": "stop" }
Because every delta carries the base event’s id, keep an id-keyed index of assembled events: store each non-delta event under its id, and merge each delta into the base with the same id. The id is unique per message, so deltas from concurrently streaming threads (the main agent and any sub-agents) always merge into the right base.
Event Deltas appear only while streaming. When you list events via the events API, the deltas are already merged into a single assembled event.
The most common example is the assistant’s model output: a base ModelMessageEvent followed by ModelMessageEventDelta deltas that carry incremental text and tool-call chunks. Merge the deltas into the base as they arrive - read the base’s growing content for a live typing effect.
import os
from truefoundry_gateway_sdk import (
    TrueFoundryGateway,
    TurnEvent,
    UserMessage,
    is_event_delta,
    merge_event_delta,
)

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)
session = client.agents.create_session(agent_name="support-bot")

events: dict[str, TurnEvent] = {}  # event id -> assembled event

turn = session.create_turn(
    input=[UserMessage(content="Summarize my last order.")]
)
for message in turn.stream():
    if is_event_delta(message):
        base = events[message.id]
        merge_event_delta(base, message)
    else:
        events[message.id] = message

Resume streaming

When you lose the original create_turn() stream (for example, after a page reload), fetch the Turn again with get_turn() using its turn_id and check its state. If it is still running, reconnect to its live event stream with stream() and keep merging; if it has already finished, rebuild the index from its stored event log with list_events() instead. When resuming a running Turn, pass after_sequence_number to continue after a known point (defaults to 0, replaying from the first event); the SDK also resumes transparently from the last delivered event on any mid-stream connection drop, and closes when the Turn reaches a terminal state. Either way, merge into the same id-keyed index of assembled events, so base events and their deltas keep merging seamlessly across the reconnect.
from truefoundry_gateway_sdk import TurnRunningState, is_event_delta, merge_event_delta

# `session_id`, `turn_id`, `events`, and `last_sequence_number` already exist from the original stream:
#   session_id: str               - the id of the Session
#   turn_id: str                  - the id of the Turn you were streaming
#   events: dict[str, TurnEvent]  - the id-keyed index of assembled events
#   last_sequence_number: int     - the sequence_number of the last event you processed
# Persist all four (for example, in your session store) so you can resume after a restart.

session = client.agents.get_session(session_id=session_id)
turn = session.get_turn(turn_id=turn_id)

if isinstance(turn.state(), TurnRunningState):
    # Still running: reconnect to the live stream. after_sequence_number skips events
    # you've already processed; the SDK also auto-resumes from the last delivered
    # event on any mid-stream connection drop.
    for message in turn.stream(after_sequence_number=last_sequence_number):
        last_sequence_number = message.sequence_number
        if is_event_delta(message):
            merge_event_delta(events[message.id], message)
        else:
            events[message.id] = message
else:
    # Already finished: rebuild the index from the stored event log, which is
    # authoritative and overwrites any partial state from the lost stream.
    # list_events() returns already-merged events, so there are no deltas to merge here.
    for event in turn.list_events():
        events[event.id] = event

Handling Threads

A single Turn stream interleaves events from the root agent and any sub-agents that run in parallel. Every event carries a thread_id: See Turn Events for the full reference.
import os
from collections import defaultdict
from truefoundry_gateway_sdk import (
    TrueFoundryGateway,
    TurnEvent,
    UserMessage,
    ModelMessageEvent,
    ModelMessageEventDelta,
    ThreadCreatedEvent,
    ThreadDoneEvent,
    TrueFoundrySystemToolInfo,
    is_event_delta,
    merge_event_delta,
)

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)
session = client.agents.create_session(agent_name="support-bot")

threads: dict[str, dict[str, TurnEvent]] = defaultdict(dict)

turn = session.create_turn(
    input=[UserMessage(content="Summarize my last order.")]
)
for message in turn.stream():
    if message.thread_id is None:
        print("turn level event:", message)
        continue

    if isinstance(message, (ModelMessageEvent, ModelMessageEventDelta)):
        for tool_call in message.tool_calls or []:
            if (
                isinstance(tool_call.tool_info, TrueFoundrySystemToolInfo)
                and tool_call.tool_info.name == "create_sub_agent"
            ):
                print("sub agent is getting initialized")
    if isinstance(message, ThreadCreatedEvent):
        print(f"{message.thread_id} created: {message.title}")

    if is_event_delta(message):
        merge_event_delta(threads[message.thread_id][message.id], message)
    else:
        threads[message.thread_id][message.id] = message

    if isinstance(message, ThreadDoneEvent):
        events_for_thread = threads.pop(message.thread_id)
        print(f"{message.thread_id} done: {message.title} ({len(events_for_thread)} events)")

Non-streaming Turn

If you don’t need live events, skip stream() and call wait_for_completion() instead. It blocks until the Turn finishes.
import os
from truefoundry_gateway_sdk import (
    TrueFoundryGateway,
    UserMessage,
    TurnDoneState,
    TurnCancelledState,
    TurnErrorState,
)

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)
session = client.agents.create_session(agent_name="support-bot")

turn = session.create_turn(
    input=[UserMessage(content="Summarize my last order.")],
)

turn_state = turn.wait_for_completion()
if isinstance(turn_state, TurnDoneState):
    # output and required_actions are mutually exclusive: if output is present,
    # required_actions is empty, and vice versa.
    # output is the final assistant message, or None if the Turn paused.
    if turn_state.output is not None:
        print(turn_state.output.content)
    # required_actions holds any pause events (tool.approval_required,
    # tool.response_required, mcp.auth_required).
    else:
        print("required actions:", turn_state.required_actions)
elif isinstance(turn_state, TurnCancelledState):
    print("cancelled:", turn_state.reason)
elif isinstance(turn_state, TurnErrorState):
    print("error:", turn_state.message)

print("completed at:", turn_state.completed_at)

Attaching images or files to a Turn

A UserMessage’s content can be a plain string or a list of content parts. Use content parts to send text alongside one or more file uploads (images, PDFs, and other documents). Each file is passed as a data URI of the form data:<mime>;base64,<payload>.
import base64
import os
from truefoundry_gateway_sdk import (
    TrueFoundryGateway,
    UserMessage,
    TextContent,
    FileUploadContent,
    FileUpload,
)

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)
session = client.agents.create_session(agent_name="support-bot")

def to_data_uri(path: str, mime: str) -> str:
    with open(path, "rb") as f:
        return f"data:{mime};base64,{base64.b64encode(f.read()).decode()}"

# Attach both an image and a document in the same message.
image_uri = to_data_uri("invoice.png", "image/png")
doc_uri = to_data_uri("notes.txt", "text/plain")

turn = session.create_turn(
    input=[
        UserMessage(
            content=[
                TextContent(text="Does the invoice total match the notes?"),
                FileUploadContent(
                    file=FileUpload(filename="invoice.png", file_data=image_uri)
                ),
                FileUploadContent(
                    file=FileUpload(filename="notes.txt", file_data=doc_uri)
                ),
            ]
        )
    ]
)
for message in turn.stream():
    print(message)
Whether an attached file is understood depends on the agent’s model. Send images only to a vision-capable model; document handling (for example, PDFs) likewise depends on model and agent configuration.
Non-image files such as PDFs require the sandbox to be enabled on the agent - the harness uses it to process the document.

Cancelling a Turn

To stop the currently running Turn, call cancel() on its session. Cancelling aborts any in-flight model request, waits for running MCP tool calls to finish, and force-stops any sandbox the Turn provisioned. Cancellation is idempotent. You can continue by creating a new Turn, which chains on the cancelled Turn’s history.
import os
from truefoundry_gateway_sdk import TrueFoundryGateway, UserMessage

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)
session = client.agents.create_session(agent_name="support-bot")

# Start a turn, then cancel the session partway through the stream.
turn = session.create_turn(
    input=[UserMessage(content="Run a long research task.")]
)
for i, message in enumerate(turn.stream()):
    print(message)
    if i == 5:
        # Don't break here. After cancel(), the backend closes the SSE stream
        # gracefully: it emits a terminal turn.done event and then ends the
        # stream, which closes this iterator on its own. Keep consuming until
        # the loop finishes so cancellation completes cleanly.
        session.cancel()

# A cancelled turn is terminal, so its event log is stored and can be replayed.
for event in turn.list_events(order="asc"):
    print(event)

# Continue the conversation: a new turn chains on the cancelled turn's history.
turn = session.create_turn(
    input=[UserMessage(content="Actually, just give me a quick summary instead.")]
)
for message in turn.stream():
    print(message)

Listing Events

Fetch the full event log of a finished Turn with list_events(). It yields the Turn’s events in order, auto-paginating. Pass order="asc" (default, oldest-first) or order="desc" to control the direction. Unlike stream(), list_events() returns already-merged events: each model message arrives as a single assembled ModelMessageEvent, never as a base plus deltas. There is nothing to merge - you can use each event directly. For example, a model message that stream() delivers as a base event followed by deltas:
// On stream(): a base event plus its deltas, all sharing one id
{ "type": "model.message",       "id": "0f3a...", "content": "" }
{ "type": "model.message.delta", "id": "0f3a...", "content": "Hel" }
{ "type": "model.message.delta", "id": "0f3a...", "content": "lo!", "finish_reason": "stop" }
is returned by list_events() as one fully-merged event:
// On list_events(): the same message, already assembled
{ "type": "model.message", "id": "0f3a...", "content": "Hello!", "finish_reason": "stop" }
list_events() is only available for Turns that have completed. A running Turn has no stored event log yet - use stream() for live delivery instead.
import os
from truefoundry_gateway_sdk import TrueFoundryGateway, TurnRunningState

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)

session = client.agents.get_session(session_id="sess-abc123")

# Find the latest completed turn (newest-first).
for turn in session.list_turns():
    # list_events() is only available once the turn has completed.
    if isinstance(turn.state(), TurnRunningState):
        continue
    # order="asc" (default) yields oldest-first; use "desc" for newest-first.
    for event in turn.list_events(order="asc"):
        print(event)
    break

Handling MCP Outbound Auth

When the agent needs to call a tool on an MCP server that requires a separate outbound authentication, it emits an mcp.auth_required event and the Turn ends. The event lists each server that needs authentication along with an auth_url. Depending on how the server is configured, this may be an OAuth flow or an API-key entry - see MCP authentication scenarios for details. Send the user to that URL to complete authentication, then resume by creating a new Turn.
When the previous Turn ended with mcp.auth_required, passing a UserMessage in the resuming Turn is not allowed.
import os
from truefoundry_gateway_sdk import (
    TrueFoundryGateway,
    UserMessage,
    McpAuthRequiredEvent,
)

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)
session = client.agents.create_session(agent_name="support-bot")

# First turn: the agent needs an MCP server the user hasn't authorized yet.
# A Turn emits at most one mcp.auth_required event.
pending_auth = None
turn = session.create_turn(
    input=[UserMessage(content="Summarize my latest GitHub issues.")]
)
for message in turn.stream():
    print(message)
    if isinstance(message, McpAuthRequiredEvent):
        pending_auth = message

# Direct the user to each server's auth URL and wait for them to finish.
for server in pending_auth.servers:
    print(f"Authorize {server.mcp_server_name}: {server.auth_url}")
input("Press Enter once you have authorized the server(s)...")

# Second turn: resume - the agent continues the interrupted work.
turn = session.create_turn()
for message in turn.stream():
    print(message)

Handling tool approvals

When a tool call is configured to require human approval, the agent emits a tool.approval_required event and the Turn ends. Each event carries a thread_id and the tool_calls awaiting a decision - a single Turn can emit more than one (for example, when parallel threads each call a gated tool), so collect all of them. Resume by creating a new Turn with one UserToolApproval per pending tool call - allow it, or deny it with an optional reason. Each pending ToolCallRef carries the event_id of the model.message that emitted the tool call. Keep the same id-keyed event index you build while streaming, then look up events[event_id] to read the tool’s name and arguments - no separate bookkeeping required.
import os
from truefoundry_gateway_sdk import (
    TrueFoundryGateway,
    TurnEvent,
    UserMessage,
    UserToolApproval,
    ApprovalAllow,
    ApprovalDeny,
    ToolApprovalRequiredEvent,
    is_event_delta,
    merge_event_delta,
)

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)
session = client.agents.create_session(agent_name="support-bot")

# First turn: the agent calls one or more tools that need approval.
events: dict[str, TurnEvent] = {}  # event id -> assembled event
pending_approvals = []             # a Turn can emit multiple tool.approval_required events

turn = session.create_turn(
    input=[UserMessage(content="Restart the billing service.")]
)
for message in turn.stream():
    if is_event_delta(message):
        merge_event_delta(events[message.id], message)
    else:
        events[message.id] = message
    if isinstance(message, ToolApprovalRequiredEvent):
        pending_approvals.append(message)

# Decide on each pending tool call, showing its name and arguments.
approvals = []
for pending in pending_approvals:
    for ref in pending.tool_calls:
        # Look up the model.message that emitted this tool call, then find the call.
        model_message = events[ref.event_id]
        tool_call = next(tc for tc in model_message.tool_calls if tc.id == ref.id)
        print(f"\napproval needed: {tool_call.tool_info.name}")
        print(f"  arguments: {tool_call.function.arguments}")
        answer = input("allow? [y/n] ").strip().lower()
        approvals.append(
            UserToolApproval(
                thread_id=pending.thread_id,
                tool_call_id=ref.id,
                approval=ApprovalAllow() if answer == "y" else ApprovalDeny(reason="denied by user"),
            )
        )

# Second turn: resume with the approval decisions.
turn = session.create_turn(input=approvals)
for message in turn.stream():
    print(message)

Answering agent’s questions

When the agent needs input it cannot safely assume, it can ask the user a structured question via the built-in client-side ask_user_question tool. Since the tool runs on your side, the agent emits a tool.response_required event and the Turn ends. The pending tool call’s arguments carry the question and its options. Collect the user’s answer and resume by creating a new Turn with one UserToolResponse per pending tool call. Each pending ToolCallRef carries the event_id of the model.message that emitted the tool call. Keep the same id-keyed event index you build while streaming, then look up events[event_id] to read the tool’s name and arguments - no separate bookkeeping required.
import json
import os
from truefoundry_gateway_sdk import (
    TrueFoundryGateway,
    TurnEvent,
    UserMessage,
    UserToolResponse,
    ToolResponseRequiredEvent,
    TrueFoundrySystemToolInfo,
    is_event_delta,
    merge_event_delta,
)

client = TrueFoundryGateway(
    base_url=os.environ["GATEWAY_BASE_URL"],
    api_key=os.environ["TFY_API_KEY"],
)
session = client.agents.create_session(agent_name="support-bot")

# First turn: the agent asks the user a question.
events: dict[str, TurnEvent] = {}  # event id -> assembled event
pending_questions = []             # a Turn can emit multiple tool.response_required events

turn = session.create_turn(
    input=[UserMessage(content="Create a new environment called testing-1.")]
)
for message in turn.stream():
    if is_event_delta(message):
        merge_event_delta(events[message.id], message)
    else:
        events[message.id] = message
    if isinstance(message, ToolResponseRequiredEvent):
        pending_questions.append(message)

# Answer each pending question, reading the prompt and options from arguments.
responses = []
for pending in pending_questions:
    for ref in pending.tool_calls:
        # Look up the model.message that emitted this tool call, then find the call.
        model_message = events[ref.event_id]
        tool_call = next(tc for tc in model_message.tool_calls if tc.id == ref.id)
        # tool.response_required covers any client-side tool; handle ask_user_question here.
        if not (
            isinstance(tool_call.tool_info, TrueFoundrySystemToolInfo)
            and tool_call.tool_info.name == "ask_user_question"
        ):
            continue
        args = json.loads(tool_call.function.arguments or "{}")
        print(f"\n{args.get('question', '')}")
        for idx, option in enumerate(args.get("options") or [], 1):
            print(f"  {idx}. {option}")
        answer = input("your answer: ").strip()
        responses.append(
            UserToolResponse(
                thread_id=pending.thread_id,
                tool_call_id=ref.id,
                # content is a free-form string - the chosen option, or any text.
                content=answer,
            )
        )

# Second turn: resume with the user's answers.
turn = session.create_turn(input=responses)
for message in turn.stream():
    print(message)

Turn input

Each Turn’s input is a list of one of these types. Resuming a Turn paused by mcp.auth_required needs no input - omit input or pass [].
User messages (UserMessage) cannot be mixed with tool approvals or client-side tool responses in the same input list. UserToolApproval and UserToolResponse may be mixed together.

UserMessage

Start a new conversation or send the next user message. content is either a plain string or a list of content parts, letting you attach files alongside text.
{ "type": "user.message", "content": "I would like to file a support ticket." }
{
    "type": "user.message",
    "content": [
        { "type": "text", "text": "Please review this document." },
        {
            "type": "file_upload",
            "file": {
                "filename": "report.pdf",
                "file_data": "data:application/pdf;base64,JVBERi0xLjQK..."
            }
        }
    ]
}
FieldTypeRequiredDescription
type"user.message"Yes
contentstring | UserContent[]YesThe message text, or a list of content parts (text and file uploads).

UserContent

A content part is one of: Text
FieldTypeRequiredDescription
type"text"Yes
textstringYesThe message text.
File upload
FieldTypeRequiredDescription
type"file_upload"Yes
file.filenamestringYesName of the uploaded file.
file.file_datastringYesData URI: data:<mime>;base64,<payload>.

UserToolApproval

Sent to resume a Turn paused by tool.approval_required. One item per pending tool call.
{
    "type": "user.tool_approval",
    "thread_id": "main",
    "tool_call_id": "call_restart_billing",
    "approval": { "status": "allow" }
}
FieldTypeRequiredDescription
type"user.tool_approval"Yes
thread_idstringYesthread_id from the tool.approval_required event.
tool_call_idstringYesID of the tool call being approved or denied.
approvalApprovalAllow | ApprovalDenyYesUse {"status": "allow"} to permit the call, or {"status": "deny", "reason": "..."} to block it. reason is optional.

UserToolResponse

Sent to resume a Turn paused by tool.response_required. One item per pending tool call.
{
    "type": "user.tool_response",
    "thread_id": "main",
    "tool_call_id": "call_a1b2",
    "content": "tfy-prod-us (production)"
}
FieldTypeRequiredDescription
type"user.tool_response"Yes
thread_idstringYesthread_id from the tool.response_required event.
tool_call_idstringYesID of the tool call whose result is being supplied.
contentstringYesThe result to return to the agent.

Turn Events

create_turn() streams Server-Sent Events (SSE). Each event is a JSON object with a type field.
  • The stream always opens with turn.created and closes with turn.done.
  • All events carry a thread_id. Thread-scoped events use "main" (root agent) or a unique sub-agent ID; turn-level events use None.
  • thread.created and thread.done track sub-agent threads only - they are not emitted for the main thread, which is created automatically on the first Turn and lives for the session’s lifetime.
  • All events carry an id. It identifies the logical event and is present in both stream() and list_events().
  • Events delivered via stream() also carry a sequence_number, monotonically increasing within a Turn.
  • All events carry a created_at ISO-8601 timestamp marking when the event was emitted.
  • turn.created and turn.done are stream-only; they appear on the stream (stream()) but are not returned by list_events().
typeDescription
turn.createdFirst event on the stream. Carries turn_id.
turn.doneLast event on every stream. Carries the final state.

TurnCreatedEvent

First event on every Turn stream, carrying the Turn’s turn_id and metadata. Stream-only.
FieldTypeRequiredDescription
type"turn.created"Yes
idstringYesUnique event identifier.
thread_idnullYesAlways null — turn-level event, not tied to a thread.
turn_idstringYesUnique ID for this Turn.
previous_turn_idstring | NoneNoTurn ID this Turn chains from, or None for the first Turn in a session.
created_bystringYesSubject (user / service account) that created this Turn.
created_atstringYesISO-8601 timestamp when the Turn was created.

ModelMessageEventDelta

The most frequent SSE event. Carries an incremental LLM delta for one message - assistant text and/or tool call chunks. Every delta shares the id of the base ModelMessageEvent it belongs to; merge each delta into that base event in your id-keyed event index. A delta with a non-null finish_reason signals the message is complete. Use merge_event_delta from truefoundry_gateway_sdk to fold a delta into its base event in place. Alternatively, ModelMessageBuilder accumulates deltas on their own (without a base event) into a complete ModelMessage: add() returns the accumulated message so far, and build_and_reset() returns the complete message once finish_reason is set.
FieldTypeRequiredDescription
type"model.message.delta"Yes
idstringYesShared by all deltas of one message; equal to the assembled ModelMessageEvent id. Use sequence_number to distinguish individual deltas.
thread_idstringYesThread this message belongs to. "main" for the root agent; a unique ID for sub-agents.
contentstringNoIncremental text content for this delta.
reasoning_contentstringNoIncremental reasoning/thinking text for this delta; concatenate across deltas.
tool_callsToolCallDelta[]NoTool call chunks to accumulate by index; fully assembled when finish_reason is set.
finish_reasonstringNoSet on the final delta. Signals the accumulated message is complete.
Each ToolCallDelta is shaped like an OpenAI streaming tool-call chunk. id, type, and tool_info appear only on the first chunk for a given index; function.arguments is a partial JSON string concatenated across chunks.
FieldTypeRequiredDescription
indexintYesPosition in the tool_calls array; used to merge chunks.
idstringNoTool call ID. First chunk only.
type"function"NoFirst chunk only.
functionToolCallFunctionDeltaYesThe function name and partial arguments for this chunk.
tool_infoToolCallInfoNoTool metadata. First chunk only.
ToolCallFunctionDelta:
FieldTypeRequiredDescription
namestringNoTool name. Present only in the first chunk for this index.
argumentsstringNoPartial JSON string; concatenate across chunks.
ToolCallInfo carries metadata about the tool, discriminated on type. Read the tool name from name. For MCP-backed tools (type: "mcp"):
FieldTypeRequiredDescription
type"mcp"Yes
mcp_server_idstringYesID of the MCP server backing this tool.
mcp_server_namestringYesName of the MCP server backing this tool.
namestringYesThe tool’s original name on the MCP server.
is_deferredboolNoWhether the tool call is deferred.
For built-in TrueFoundry system tools (type: "truefoundry-system"), for example ask_user_question:
FieldTypeRequiredDescription
type"truefoundry-system"Yes
namestringYesThe system tool’s name.

ModelMessage

The complete, assembled form of a model message: fully concatenated content and fully reconstructed tool_calls, with finish_reason set. This is the shape you get after merging all of a message’s deltas into its base event, or from ModelMessageBuilder.build_and_reset().
FieldTypeRequiredDescription
idstringYesShared across all deltas of one message; equal to the assembled ModelMessageEvent id.
thread_idstringYesThread this message belongs to. "main" for the root agent; a unique ID for sub-agents.
contentstringNoThe complete assistant text.
reasoning_contentstringNoThe complete reasoning/thinking text.
tool_callsToolCall[]NoFully assembled tool calls.
finish_reasonstringYesWhy the message ended (for example, "stop" or "tool_calls").

ModelMessageEvent

The event form of ModelMessage (adds type and id). Returned by list_events(); on the stream, the assistant output arrives as ModelMessageEventDelta deltas instead.
FieldTypeRequiredDescription
type"model.message"Yes
idstringYesUnique event identifier. Equal to the id carried by this message’s model.message.delta events.
thread_idstringYesThread this message belongs to. "main" for the root agent; a unique ID for sub-agents.
contentstringNoThe complete assistant text.
reasoning_contentstringNoThe complete reasoning/thinking text.
tool_callsToolCall[]NoFully assembled tool calls.
finish_reasonstringYesWhy the message ended (for example, "stop" or "tool_calls").
Each ToolCall is shaped like OpenAI’s ChatCompletionMessageToolCall. Read the tool name from tool_info.name.
FieldTypeRequiredDescription
idstringYesTool call ID.
type"function"YesAlways "function".
functionToolCallFunctionYesThe tool name and complete arguments.
tool_infoToolCallInfoNoTool metadata (same shape as above).
ToolCallFunction:
FieldTypeRequiredDescription
namestringYesTool name.
argumentsstringYesComplete JSON string of the tool’s input arguments.

ToolResponseEvent

A complete server-side tool result returned to the LLM. Not a streaming delta - one event carries the full result.
FieldTypeRequiredDescription
type"tool.response"Yes
thread_idstringYesThread this result belongs to.
tool_call_idstringYesLinks this result back to the originating tool call.
contentstringYesThe tool result.

ThreadCreatedEvent

Emitted when a sub-agent thread starts.
Not emitted for the main thread. The main thread (thread_id: "main") is the root agent; it is created automatically on the session’s first Turn and is never the subject of a thread.created or thread.done event. thread.created/thread.done track only sub-agent threads.
FieldTypeRequiredDescription
type"thread.created"Yes
thread_idstringYesUnique ID for the sub-agent thread.
titlestringNoHuman-readable label for the sub-agent thread.
parentAgentParentYesParent thread and tool call that spawned this sub-agent.
agent_infoAgentInfoYesType, name, and input of the thread’s agent.
AgentInfo:
FieldTypeRequiredDescription
type"dynamic"YesHow the sub-agent was defined.
namestringYesThe sub-agent’s name.
inputstringNoThe input the parent thread handed to the sub-agent.

ThreadDoneEvent

Emitted when a sub-agent thread reaches a terminal state. Does not terminate the overall Turn stream.
Not emitted for the main thread. The main thread is never “done” - it lives across Turns for the lifetime of the session. The overall Turn’s completion is signalled by turn.done, and the session ends only when it is cancelled.
FieldTypeRequiredDescription
type"thread.done"Yes
thread_idstringYesUnique ID for the sub-agent thread.
titlestringNoHuman-readable label for the sub-agent thread.
status"done" | "error"Yesdone - completed normally, output is present. error - failed, message is present.
outputModelMessageNoFinal accumulated message. Present when status is "done".
parentAgentParentYesParent thread and tool call that spawned this sub-agent.
messagestringNoPresent when status is "error".

McpInitializedEvent

Emitted when one or more MCP server sessions are initialized for a thread.
FieldTypeRequiredDescription
type"mcp.initialize"Yes
thread_idstringYes
contentMcpInitializationInfo[]YesList of initialized MCP servers, each with mcp_server_name and session_id.

SandboxCreatedEvent

Emitted once when a sandbox is provisioned for the Turn. The sandbox is reused across Turns within the session.
FieldTypeRequiredDescription
type"sandbox.created"Yes
thread_idnullYesAlways null — turn-level event, not tied to a thread.
sandbox_idstringYesUnique identifier for the provisioned sandbox.

McpAuthRequiredEvent

Emitted when one or more MCP servers require OAuth authorization before the agent can proceed. The stream ends after this event. Resume by creating a new Turn on the same session and starting it (for example with stream()) after the user completes the OAuth flow - no input is required (omit input or pass []).
FieldTypeRequiredDescription
type"mcp.auth_required"Yes
thread_idnullYesAlways null — turn-level event. Blocked threads are listed per server in servers[].thread_ids.
serversMcpServerAuthInfo[]YesList of servers needing authorization, each with mcp_server_name, auth_url, and thread_ids.

ToolApprovalRequiredEvent

Emitted when one or more tool calls require explicit human approval before they can run. The stream ends after this event. Resume by creating a new Turn with UserToolApproval items - one per pending tool_call_id - and starting it (for example with stream()).
FieldTypeRequiredDescription
type"tool.approval_required"Yes
thread_idstringYes
tool_callsToolCallRef[]YesThe tool calls awaiting approval, each with a tool_call_id (id) and the event_id of the model.message that emitted it - look it up in your event index for the tool name and arguments.

ToolResponseRequiredEvent

Emitted when the agent has called a client-side tool and is waiting for the result. The stream ends after this event. Resume by creating a new Turn with UserToolResponse items - one per pending tool_call_id - and starting it (for example with stream()).
FieldTypeRequiredDescription
type"tool.response_required"Yes
thread_idstringYes
tool_callsToolCallRef[]YesThe tool calls awaiting a client-supplied result, each with a tool_call_id (id) and the event_id of the model.message that emitted it - look it up in your event index for the tool name and arguments.

TurnDoneEvent

Final event on every Turn stream. Stream-only. Its state is the terminal state object - the same value returned by wait_for_completion() - so callers don’t need a separate Turn fetch to know what happened.
FieldTypeRequiredDescription
type"turn.done"Yes
thread_idnullYesAlways null — turn-level event, not tied to a thread.
stateTurnTerminalStateYesThe Turn’s terminal state; never running. See TurnTerminalState.

TurnTerminalState

The terminal lifecycle state of a Turn, returned by wait_for_completion() and carried by TurnDoneEvent’s state. It is one of three objects, discriminated on status: TurnDoneState, TurnCancelledState, or TurnErrorState. (The non-terminal TurnRunningState, returned by state() while a Turn is still running, has only status: "running".)

TurnDoneState

The Turn completed normally.
FieldTypeRequiredDescription
status"done"Yes
outputModelMessageEvent | nullNoThe final assistant message, or null if the Turn paused (then required_actions is populated).
required_actions(ToolApprovalRequiredEvent | ToolResponseRequiredEvent | McpAuthRequiredEvent)[]NoPause events to act on. Empty if none.
completed_atstringYesISO-8601 timestamp when the Turn completed.

TurnCancelledState

The Turn was cancelled.
FieldTypeRequiredDescription
status"cancelled"Yes
reasonstring | nullNoWhy the Turn was cancelled.
completed_atstringYesISO-8601 timestamp when the Turn was cancelled.

TurnErrorState

The Turn failed.
FieldTypeRequiredDescription
status"error"Yes
messagestringYesHuman-readable error description.
completed_atstringYesISO-8601 timestamp when the Turn failed.

Complete usage example

This is a complete, runnable terminal chat client built on the SDK. It handles every event the harness can emit:
  • streams assistant tokens in real time,
  • shows tool calls and tool results inline,
  • prompts the user to answer ask_user_question calls,
  • prompts the user to allow or deny tool approvals,
  • shows in-chat MCP OAuth prompts,
  • handles parallel sub-agents,
  • continues the conversation across Turns.
"""Terminal chat client for the TrueFoundry Agent Harness SDK."""

import argparse, json, os, sys
from typing import Optional, Union

from truefoundry_gateway_sdk import (
    ApprovalAllow,
    ApprovalDeny,
    McpAuthRequiredEvent,
    McpInitializedEvent,
    ModelMessageEvent,
    ModelMessageEventDelta,
    SandboxCreatedEvent,
    Session,
    ThreadCreatedEvent,
    ThreadDoneEvent,
    ToolApprovalRequiredEvent,
    ToolCall,
    ToolResponseEvent,
    ToolResponseRequiredEvent,
    TrueFoundryGateway,
    TurnCreatedEvent,
    TurnDoneEvent,
    TurnDoneStatus,
    TurnErrorState,
    TurnEvent,
    TurnInput,
    UserMessage,
    UserToolApproval,
    UserToolResponse,
    is_event_delta,
    merge_event_delta,
)

PendingNextTurnRequest = Union[ToolApprovalRequiredEvent, ToolResponseRequiredEvent, McpAuthRequiredEvent]


class ChatSession:
    """Holds turn-to-turn state for one conversation."""

    def __init__(self, session: Session) -> None:
        self.session = session
        # id-keyed index of assembled events. Base model.message events are stored here
        # and their model.message.delta fragments merge into them in place.
        self.events: dict[str, TurnEvent] = {}
        self.open_subagent_threads: set[str] = set()
        self.thread_labels: dict[str, str] = {"main": "main"}
        self.pending_next_turn_requests: list[PendingNextTurnRequest] = []
        # id of the model.message currently streaming on each thread. A later event on
        # the same thread with a different id means that message is done, so flush it.
        self._streaming_ids: dict[str, str] = {}
        self._main_streaming = False

    def label(self, thread_id: str) -> str:
        return self.thread_labels.get(thread_id, "main")

    def _handle_model_message_delta(self, event: ModelMessageEventDelta) -> None:
        # Stream main-thread text live; the flush happens later, driven by id change.
        if event.thread_id == "main" and event.content:
            if not self._main_streaming:
                print("\nassistant: ", end="", flush=True)
                self._main_streaming = True
            print(event.content, end="", flush=True)

    def _flush_model_message(self, msg: ModelMessageEvent) -> None:
        # The message is complete: finalize the streamed line, print any non-streamed
        # content, and emit the now fully assembled tool calls.
        thread_id = msg.thread_id
        label = self.label(thread_id)

        if thread_id == "main":
            if self._main_streaming:
                print()
                self._main_streaming = False
            elif msg.content:
                print(f"\nassistant: {msg.content}")
        elif thread_id in self.open_subagent_threads and msg.content:
            print(f"\n[{label}] {msg.content}")

        for tool_call in msg.tool_calls:
            if not tool_call.id or not tool_call.function.name:
                continue
            if tool_call.tool_info.name == "ask_user_question":
                continue
            self._print_tool_call(thread_id, tool_call)

    def _print_tool_call(self, thread_id: str, tool_call: ToolCall) -> None:
        label = self.label(thread_id)
        try:
            args = json.loads(tool_call.function.arguments or "{}")
        except json.JSONDecodeError:
            args = tool_call.function.arguments

        args_preview = json.dumps(args)[:160] if isinstance(args, dict) else str(args)[:160]
        prefix = "main" if thread_id == "main" else label
        print(f"\n[{prefix} -> {tool_call.tool_info.name}] {args_preview}")

    def _handle_tool_response(self, event: ToolResponseEvent) -> None:
        label = self.label(event.thread_id)
        prefix = "main" if event.thread_id == "main" else label
        preview = event.content[:200].replace("\n", " ")
        print(f"\n[{prefix} tool result] {preview}")

    def _prompt_client_side_response(
        self,
        pending_event: ToolResponseRequiredEvent,
        tool_call: ToolCall,
    ) -> UserToolResponse:
        if tool_call.tool_info.name == "ask_user_question":
            try:
                args = json.loads(tool_call.function.arguments or "{}")
            except json.JSONDecodeError:
                args = {}
            question = args.get("question", "Answer:")
            options = args.get("options") or []
            print(f"\nassistant asks: {question}")
            for idx, option in enumerate(options, 1):
                print(f"  {idx}. {option}")
            if options:
                print("  Or type a free-form answer.")
            answer = input("you: ").strip()
            if options and answer.isdigit() and 1 <= int(answer) <= len(options):
                content = options[int(answer) - 1]
            else:
                content = answer
        else:
            print(f"\nclient-side tool response required: {tool_call.tool_info.name}")
            print(f"  arguments: {tool_call.function.arguments}")
            content = input("you: ").strip()

        return UserToolResponse(
            thread_id=pending_event.thread_id,
            tool_call_id=tool_call.id,
            content=content,
        )

    def _prompt_approval(
        self,
        pending_event: ToolApprovalRequiredEvent,
        tool_call: ToolCall,
    ) -> UserToolApproval:
        try:
            arguments = json.loads(tool_call.function.arguments or "{}")
        except json.JSONDecodeError:
            arguments = tool_call.function.arguments

        print(f"\napproval needed: {tool_call.tool_info.name}")
        print(f"  arguments: {json.dumps(arguments, indent=2)}")
        choice = input("  allow / deny / reason for denial: ").strip().lower()
        if choice in {"a", "allow", "yes", "y"}:
            approval = ApprovalAllow()
        elif choice in {"d", "deny", "no", "n"}:
            approval = ApprovalDeny()
        else:
            approval = ApprovalDeny(reason=choice)

        return UserToolApproval(
            thread_id=pending_event.thread_id,
            tool_call_id=tool_call.id,
            approval=approval,
        )

    def _prompt_mcp_auth_required(self, event: McpAuthRequiredEvent) -> None:
        print("\n[auth] MCP authorization required. Complete OAuth for each server:")
        for server in event.servers:
            print(f"        {server.mcp_server_name}: {server.auth_url}")
        while True:
            answer = input("        Type 'yes' once you've authenticated: ").strip().lower()
            if answer in {"y", "yes"}:
                break

    def build_next_turn_input(self) -> TurnInput:
        next_turn: list[UserToolApproval | UserToolResponse] = []
        pending = self.pending_next_turn_requests[:]
        self.pending_next_turn_requests.clear()

        for pending_event in pending:
            if isinstance(pending_event, McpAuthRequiredEvent):
                self._prompt_mcp_auth_required(pending_event)
                continue
            for tool_ref in pending_event.tool_calls:
                # tool_ref.event_id points at the model.message that emitted this tool
                # call; find the matching tool call inside it by id.
                model_message = self.events[tool_ref.event_id]
                tool_call = next(
                    tc for tc in model_message.tool_calls if tc.id == tool_ref.id
                )
                if isinstance(pending_event, ToolResponseRequiredEvent):
                    next_turn.append(
                        self._prompt_client_side_response(pending_event, tool_call)
                    )
                else:
                    next_turn.append(
                        self._prompt_approval(pending_event, tool_call)
                    )

        return next_turn

    def _handle_event(self, event: object) -> Optional[TurnDoneStatus]:
        # Keep the id-keyed index current: store base events, merge deltas into them.
        if is_event_delta(event):
            merge_event_delta(self.events[event.id], event)
        elif isinstance(event, ModelMessageEvent):
            self.events[event.id] = event

        # A later event on a thread with a different id means that thread's streaming
        # model.message is complete - flush it before handling the new event.
        thread_id = getattr(event, "thread_id", None)
        if thread_id is not None:
            streaming_id = self._streaming_ids.get(thread_id)
            if streaming_id is not None and streaming_id != event.id:
                self._flush_model_message(self.events[streaming_id])
                del self._streaming_ids[thread_id]

        if isinstance(event, TurnCreatedEvent):
            return None

        if isinstance(event, ThreadCreatedEvent):
            if event.thread_id != "main":
                self.open_subagent_threads.add(event.thread_id)
                name = event.agent_info.name
                self.thread_labels[event.thread_id] = name
                print(f"\n[subagent {name} started]")
            return None

        if isinstance(event, ThreadDoneEvent):
            if event.thread_id != "main":
                self.open_subagent_threads.discard(event.thread_id)
                label = self.label(event.thread_id)
                if event.status == "error":
                    print(f"\n[{label} error] {event.message}")
                else:
                    print(f"[subagent {label} finished]")
            return None

        if isinstance(event, McpInitializedEvent):
            for server in event.content:
                print(f"[mcp] connected to {server.mcp_server_name}")
            return None

        if isinstance(event, McpAuthRequiredEvent):
            self.pending_next_turn_requests.append(event)
            return None

        if isinstance(event, SandboxCreatedEvent):
            print(f"[sandbox] provisioned ({event.sandbox_id})")
            return None

        if isinstance(event, ModelMessageEvent):
            # Base model.message: a new message starts streaming on this thread. Its
            # content and tool_calls fill in as the matching deltas arrive.
            self._streaming_ids[event.thread_id] = event.id
            return None

        if isinstance(event, ModelMessageEventDelta):
            self._handle_model_message_delta(event)
            return None

        if isinstance(event, ToolResponseEvent):
            self._handle_tool_response(event)
            return None

        if isinstance(event, ToolResponseRequiredEvent):
            self.pending_next_turn_requests.append(event)
            return None

        if isinstance(event, ToolApprovalRequiredEvent):
            self.pending_next_turn_requests.append(event)
            return None

        if isinstance(event, TurnDoneEvent):
            # Turn-level event (thread_id is null), so flush any messages still
            # streaming - the last message on each thread has no later event to
            # trigger its flush.
            for streaming_id in self._streaming_ids.values():
                self._flush_model_message(self.events[streaming_id])
            self._streaming_ids.clear()

            # event.state is the terminal TurnTerminalState (same as wait_for_completion()).
            if isinstance(event.state, TurnErrorState):
                print(f"\n[error] {event.state.message}")
            return event.state.status

        print(f"\n[unknown event] {event}")
        return None

    def run_turn(self, input_items: TurnInput | None = None) -> TurnDoneStatus:
        last_status: TurnDoneStatus = "done"
        turn = self.session.create_turn(input=input_items)
        for message in turn.stream():
            status = self._handle_event(message)
            if status is not None:
                last_status = status
        return last_status


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Terminal chat client for the TrueFoundry Agent Harness.",
    )
    parser.add_argument(
        "agent_name",
        nargs="?",
        default=os.environ.get("AGENT_NAME"),
        help="Registered agent name (default: AGENT_NAME environment variable)",
    )
    args = parser.parse_args()
    if not args.agent_name:
        parser.error("agent_name is required (pass as argument or set AGENT_NAME)")
    return args


def main() -> None:
    args = parse_args()

    base_url = os.environ.get("GATEWAY_BASE_URL")
    api_key = os.environ.get("TFY_API_KEY")
    if not base_url or not api_key:
        print("Set GATEWAY_BASE_URL and TFY_API_KEY environment variables.")
        sys.exit(1)

    client = TrueFoundryGateway(
        base_url=base_url,
        api_key=api_key,
    )
    session = client.agents.create_session(agent_name=args.agent_name)
    chat = ChatSession(session)

    print("Type a message, or Ctrl-D to exit.")
    while True:
        while chat.pending_next_turn_requests:
            chat.run_turn(chat.build_next_turn_input())

        try:
            text = input("\nyou: ").strip()
        except EOFError:
            print()
            break
        if not text:
            continue
        chat.run_turn([UserMessage(content=text)])


if __name__ == "__main__":
    main()

Sample run

$ python example.py support-bot
Type a message, or Ctrl-D to exit.

you: Refund invoice INV-2031 for tenant acme.
[mcp] connected to truefoundry-mcp
[sandbox] provisioned (my-tenant.a1b2c3d4)

assistant asks: Which refund reason should I record?
  1. Customer churn
  2. Billing error
  3. Goodwill credit
  Or type a free-form answer.
you: 2

assistant: Found invoice INV-2031 for $1,240.00 on 2026-04-12.

approval needed: process_refund
  arguments: {
  "invoice_id": "INV-2031",
  "amount": 1240.0,
  "reason": "Billing error"
}
  allow / deny / reason for denial: allow

[main tool result] {"refund_id":"RF-9914","status":"completed"}

assistant: Refund RF-9914 of $1,240.00 was processed for INV-2031.

you: ^D

Adapting the flow

A few common variations on top of the same skeleton:
  • Persisted history. Save session.id after the first Turn. On the next process start, call client.agents.get_session(session_id) and use list_turns() with turn.list_events() to rebuild UI state, or turn.stream() if the latest Turn is still running.
  • JSON output for piping. Replace the print calls inside _handle_event with json.dumps(event.model_dump()) to emit one event per line for downstream tools.
  • Generative UI. When you detect a fenced ```openui block in assembled model.message content, hand the block to the OpenUI React renderer instead of printing it. Everything else stays the same.
  • Browser / web UI. The same event shape is delivered over Server-Sent Events on POST /sessions/{session_id}/turns. Replace session.create_turn with an SSE consumer; the handlers above are unchanged.

Reference

create_session

client.agents.create_session(agent_name, title=None) -> Session
Create a conversation Session for a saved agent.
ParamTypeRequiredDescription
agent_namestringYesName of the saved agent to invoke.
titlestringNoOptional human-readable title for the session.

list_sessions

client.agents.list_sessions(agent_name, created_by_subject_slug=None, start_timestamp=None, end_timestamp=None, limit=100) -> Iterator[Session]
Iterate over the Sessions for a saved agent, newest-first, transparently paginating through all pages.
ParamTypeRequiredDescription
agent_namestringYesFilter to sessions for a specific named agent.
created_by_subject_slugstringNoFilter to sessions created by a specific user / service account.
start_timestampstringNoISO-8601 lower bound on session creation time.
end_timestampstringNoISO-8601 upper bound on session creation time.
limitintNoNumber of sessions fetched per page (not a total cap). Defaults to 100.

Session

The conversation context for a saved agent. Turns created within a Session are chained automatically, so each Turn sees the history of earlier ones. Key members:
MemberTypeDescription
idstringUnique session identifier. Persist it to resume the conversation later via client.agents.get_session(session_id).
create_turn(...)methodPrepare a Turn and return a LazyTurn without any network call. Start it with stream() (to stream events) or wait_for_completion() / state() (without streaming). See create_turn.
list_turns()methodIterate over the Turns in this session, always in descending order (newest-first).
get_turn(turn_id)methodFetch a single Turn by ID.
cancel()methodCancel the session and any currently running Turn. Idempotent; after this, create_turn() raises.

create_turn

session.create_turn(input=None, previous_turn_id="auto")
  -> LazyTurn
Prepare a Turn in the session. This makes no network call and returns a LazyTurn with no id yet. The Turn is created server-side on the first method call, and the SDK branches on which one you call first: stream() opens an SSE stream, while wait_for_completion() and state() create the Turn without streaming. Calling cancel() or list_events() before the Turn is started raises.
ParamTypeRequiredDescription
inputTurnInputNoInput items for this Turn. See Turn input. Omit or pass [] to resume after mcp.auth_required.
previous_turn_idstring | "auto"NoTurn chaining point. Defaults to "auto", letting the server chain to the latest Turn.

Turn

A single request/response cycle within a Session. Transitions: runningdone | cancelled | error. list_turns() and get_turn() return a fully-qualified Turn whose id and metadata are always populated. create_turn() instead returns a LazyTurn - the same type with the same methods, but its id, previous_turn_id, created_by, and created_at are None until the Turn is started. Starting it via stream(), wait_for_completion(), or state() creates the Turn server-side and populates these (for stream(), from the first turn.created event). Calling list_events() or cancel() on a LazyTurn before it is started raises.
MemberTypeDescription
idstring | NoneUnique turn identifier (UUIDv7). None on a LazyTurn until it is started.
sessionSessionThe Session this Turn belongs to.
previous_turn_idstring | NoneTurn ID this Turn chains from, or None for the first Turn (also None on a LazyTurn until started).
created_bystring | NoneSubject (user / service account) that created this Turn. None on a LazyTurn until started.
created_atstring | NoneISO-8601 timestamp of Turn creation. None on a LazyTurn until started.
inputTurnInput | NoneThe Turn input that triggered this Turn, if any.
state()methodFetch the current lifecycle state from the server. Returns one of TurnRunningState, TurnDoneState, TurnCancelledState, or TurnErrorState. On an unstarted LazyTurn, creates the Turn first.
stream(after_sequence_number=0)methodStart an unstarted LazyTurn and stream its events, or reconnect to a running Turn’s live SSE stream. On reconnect, resumes after after_sequence_number (default 0, from the first event); also auto-resumes on any mid-stream drop. Closes when the Turn reaches a terminal state. On a started Turn, raises if it is not running — use list_events() for completed Turns.
list_events(...)methodReturn a point-in-time snapshot of events accumulated so far, paginating automatically. Only available for completed Turns. order="asc" is natural for replaying history forward. Raises on an unstarted LazyTurn.
wait_for_completion()methodBlock until the Turn reaches a terminal state and return it. Returns immediately if already terminal. On an unstarted LazyTurn, creates the Turn first.
cancel()methodRequest cancellation. Idempotent; safe to call on already-terminal Turns. Cancellation is asynchronous — a few more events may arrive before the stream closes. Raises on an unstarted LazyTurn.

To define or configure the agent itself, see Agent.