Skip to main content
The Realtime API enables low-latency, bidirectional streaming over a persistent WebSocket connection. You can send and receive text and audio in real time, enabling use cases like voice assistants and interactive agents. The gateway proxies the WebSocket connection to the provider using each provider’s native SDK.
ApproachSupported providersBase path
Provider proxy (native SDK)Google Gemini, Google Vertex AIwss://{controlPlaneUrl}/live/{providerAccountName}
Before you start: Replace {controlPlaneUrl} with your gateway URL and your-tfy-api-key with your TrueFoundry API key. Replace {providerAccountName} with the display name of your provider account on TrueFoundry.
Model names: The model ID in code must match the display name of the model on your TrueFoundry provider account.
Which SDK to use: Use the google-genai Python SDK for both Google Gemini and Google Vertex AI, pointed at the gateway WebSocket URL above.
Model type: When adding a realtime model to the gateway, make sure to select Realtime as the model type.

Code snippet

After adding the models, you can get a ready-to-use code snippet from the TrueFoundry platform or use the examples below. The example below demonstrates a realtime audio session, streaming microphone input to the model and playing back audio responses through the speaker. You can adapt the code to use other modalities as needed.
"""
Gemini Live API - Realtime Audio Streaming
pip install google-genai pyaudio
"""
import asyncio
import pyaudio
from google import genai
from google.genai import types

FORMAT = pyaudio.paInt16
CHANNELS = 1
SEND_SAMPLE_RATE = 16000
RECEIVE_SAMPLE_RATE = 24000
CHUNK_SIZE = 1024

API_KEY = "your-tfy-api-key"
MODEL = "gemini-live-2.5-flash"  # actual model id
BASE_URL = "https://{controlPlaneUrl}/api/llm/live/{geminiProviderAccountName}"

client = genai.Client(
    http_options={
        "base_url": BASE_URL,
        "headers": {
            "Authorization": f"Bearer {API_KEY}",
        },
    },
    api_key=API_KEY,
)

CONFIG = types.LiveConnectConfig(
    response_modalities=["AUDIO"],
    speech_config=types.SpeechConfig(
        voice_config=types.VoiceConfig(
            prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
        )
    ),
)

pya = pyaudio.PyAudio()


async def main():
    try:
        async with client.aio.live.connect(model=MODEL, config=CONFIG) as session:
            print("Connected!")

            mic_info = pya.get_default_input_device_info()
            mic_stream = pya.open(
                format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE,
                input=True, input_device_index=mic_info["index"],
                frames_per_buffer=CHUNK_SIZE,
            )

            speaker_stream = pya.open(
                format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE,
                output=True,
            )

            audio_in_queue = asyncio.Queue()

            async def send_audio():
                while True:
                    data = await asyncio.to_thread(
                        mic_stream.read, CHUNK_SIZE, exception_on_overflow=False
                    )
                    await session.send(input={"data": data, "mime_type": "audio/pcm"})

            async def receive_audio():
                while True:
                    turn = session.receive()
                    was_interrupted = False
                    async for response in turn:
                        if data := response.data:
                            audio_in_queue.put_nowait(data)
                        if text := response.text:
                            print(text, end="")

                        if (
                            hasattr(response, "server_content")
                            and response.server_content
                            and hasattr(response.server_content, "interrupted")
                            and response.server_content.interrupted
                        ):
                            was_interrupted = True

                    if was_interrupted:
                        while not audio_in_queue.empty():
                            audio_in_queue.get_nowait()

            async def play_audio():
                while True:
                    data = await audio_in_queue.get()
                    await asyncio.to_thread(speaker_stream.write, data)

            async with asyncio.TaskGroup() as tg:
                tg.create_task(send_audio())
                tg.create_task(receive_audio())
                tg.create_task(play_audio())

    except Exception as e:
        print(f"Error: {e}")
    finally:
        pya.terminate()


asyncio.run(main())

References