OpenAI Realtime API ↗

no
Summary: Use Weave to automatically trace your calls to the OpenAI Realtime API.

Original Documentation

Documentation Index#

Fetch the complete documentation index at: https://docs.wandb.ai/llms.txt Use this file to discover all available pages before exploring further.

Use Weave to automatically trace your calls to the OpenAI Realtime API.

Weave’s integration with OpenAI’s Realtime API allows you to automatically trace your application’s speech-to-speech interactions as they occur. You can use this to capture conversations between your agents and users to review and evaluate agent performance.

Integrate realtime traces#

Weave automatically patches into the OpenAI Realtime API and only requires a few additional lines of code to start capturing audio interactions from your application. The following code imports Weave and the Realtime API integration:

import weave
from weave.integrations import patch_openai_realtime

weave.init("your-team-name/your-project-name")
patch_openai_realtime()

# Your application's logic

Once imported into your code and executed, Weave automatically traces the interactions between the user and the OpenAI Realtime API.

Run a realtime voice assistant with the OpenAI Agents SDK#

This example runs a real-time voice assistant that streams microphone audio to OpenAI’s Realtime API and plays back the AI’s spoken responses through your local machine’s speaker. The application uses the OpenAI Agents SDK with RealtimeAgent and RealtimeRunner, and enables Weave tracing by patching with patch_openai_realtime().

To run the example:

  1. Start your Python environment and install the following libraries:

        uv add weave openai-agents websockets pyaudio numpy
        ```

    pip install weave openai-agents websockets pyaudio numpy
    ```
<span class="tab-end"></span>
<span class="tab-group-end"></span>

2. Create a file titled `weave_voice_assistant.py` and add the following code to it.

The highlighted lines indicate Weave's integration in the application. The rest of the code creates the basic voice assistant app.

<Accordion title="weave_voice_assistant.py">
```python
  import argparse
  import asyncio
  import queue
  import sys
  import termios
  import threading
  import tty
  import weave
  import pyaudio
  import numpy as np
  from weave.integrations import patch_openai_realtime
  from agents.realtime import RealtimeAgent, RealtimeRunner

  DEFAULT_WEAVE_PROJECT = "<your-team-name/your-project-name>"

  FORMAT = pyaudio.paInt16
  RATE = 24000  # Required by the OpenAI Realtime API.
  CHUNK = 1024
  MAX_INPUT_CHANNELS = 1
  MAX_OUTPUT_CHANNELS = 1

  INP_DEV_IDX = None
  OUT_DEV_IDX = None

  # Parse CLI args for Weave project name and audio device selection.
  def parse_args():
      parser = argparse.ArgumentParser(description="Realtime agent with Weave logging")
      parser.add_argument(
          "--weave-project",
          default=DEFAULT_WEAVE_PROJECT,
          help=f"Weave project name (default: {DEFAULT_WEAVE_PROJECT})",
          dest="weave_project"
      )
      parser.add_argument(
          "--input-device",
          type=int,
          default=None,
          help="PyAudio input (mic) device index. Defaults to system default. Run mic_detect.py to list devices.",
          dest="input_device"
      )
      parser.add_argument(
          "--output-device",
          type=int,
          default=None,
          help="PyAudio output (speaker) device index. Defaults to system default. Run mic_detect.py to list devices.",
          dest="output_device"
      )
      return parser.parse_args()


  # Initialize Weave and patch the OpenAI Realtime API for tracing.
  def init_weave(project_name: str | None = None) -> None:
      name = project_name or DEFAULT_WEAVE_PROJECT
      weave.init(name)
      patch_openai_realtime()  # Enables automatic tracing of Realtime API sessions.


  mic_enabled = True

  # Listen for 't' key to toggle mic on/off. Runs in a daemon thread.
  def start_keylistener():
      global mic_enabled
      fd = sys.stdin.fileno()
      old_settings = termios.tcgetattr(fd)
      try:
          tty.setcbreak(fd)
          while True:
              ch = sys.stdin.read(1)
              if ch.lower() == 't':
                  mic_enabled = not mic_enabled
                  state = "ON" if mic_enabled else "OFF"
                  print(f"\nšŸŽ™  Mic {state} (press t to toggle)")
              elif ch == '\x03':  # Ctrl-C
                  break
      finally:
          termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)


  # Drain the audio queue and write to the speaker in a background thread.
  def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
      while True:
          data = audio_output_queue.get()
          if data is None:
              break
          output_stream.write(data)


  # Open audio streams, start a Realtime session, and run the send/receive loop.
  async def main(*, input_device_index: int | None = None, output_device_index: int | None = None):
      p = pyaudio.PyAudio()

      if input_device_index is None:
          input_device_index = int(p.get_default_input_device_info()['index'])
      if output_device_index is None:
          output_device_index = int(p.get_default_output_device_info()['index'])

      # Clamp channel count to device capabilities to avoid pyaudio errors.
      input_info = p.get_device_info_by_index(input_device_index)
      output_info = p.get_device_info_by_index(output_device_index)

      input_channels = min(int(input_info['maxInputChannels']), MAX_INPUT_CHANNELS)
      output_channels = min(int(output_info['maxOutputChannels']), MAX_OUTPUT_CHANNELS)

      mic = p.open(
          format=FORMAT,
          channels=input_channels,
          rate=RATE,
          input=True,
          output=False,
          frames_per_buffer=CHUNK,
          input_device_index=input_device_index,
          start=False,
      )
      speaker = p.open(
          format=FORMAT,
          channels=output_channels,
          rate=RATE,
          input=False,
          output=True,
          frames_per_buffer=CHUNK,
          output_device_index=output_device_index,
          start=False,
      )
      mic.start_stream()
      speaker.start_stream()

      # Buffer audio through a queue so in-flight playback can be flushed on interrupt.
      audio_output_queue = queue.Queue()
      threading.Thread(
          target=play_audio, args=(speaker, audio_output_queue), daemon=True
      ).start()

      s_agent = RealtimeAgent(
          name="Speech Assistant",
          instructions="You are a tool using AI. Use tools to accomplish a task whenever possible"
      )

      s_runner = RealtimeRunner(s_agent, config={
          "model_settings": {
              "model_name": "gpt-realtime",
              "modalities": ["audio"],
              "output_modalities": ["audio"],
              "input_audio_format": "pcm16",
              "output_audio_format": "pcm16",
              "speed": 1.2,
              "turn_detection": {
                  "prefix_padding_ms": 100,
                  "silence_duration_ms": 100,
                  "type": "server_vad",
                  "interrupt_response": True,
                  "create_response": True,
              },
          }
      })
      print("--- Session Active (Speak into mic) ---")
      print("šŸŽ™  Mic ON (press t to toggle)")

      threading.Thread(target=start_keylistener, daemon=True).start()

      async with await s_runner.run() as session:
          # Stream mic input to the Realtime API, sending silence when muted.
          async def send_mic_audio():
              silence = b'\x00' * CHUNK * 2  # 2 bytes per sample (16-bit PCM).
              try:
                  while True:
                      raw_data = mic.read(CHUNK, exception_on_overflow=False)

                      if mic_enabled:
                          audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
                          rms = np.sqrt(np.mean(audio_data**2))
                          meter = int(min(rms / 50, 50))
                          print(f"Mic Level: {'ā–ˆ' * meter}{' ' * (50-meter)} | šŸŽ™ ON ", end="\r")
                          await session.send_audio(raw_data)
                      else:
                          print(f"Mic Level: {' ' * 50} | šŸŽ™ OFF", end="\r")
                          await session.send_audio(silence)

                      await asyncio.sleep(0)  # Yield to the event loop between reads.
              except Exception:
                  pass

          # Receive events from the session and route audio to the speaker.
          async def handle_events():
              async for event in session:
                  if event.type == "audio":
                      audio_output_queue.put(event.audio.data)
                  elif event.type == "audio_interrupted":
                      # Flush queued AI audio so it doesn't talk over the user.
                      while not audio_output_queue.empty():
                          try:
                              audio_output_queue.get_nowait()
                          except queue.Empty:
                              break

          mic_task = asyncio.create_task(send_mic_audio())
          try:
              await handle_events()
          finally:
              mic_task.cancel()

      # Cleanup
      audio_output_queue.put(None)  # Signal playback thread to exit.
      mic.close()
      speaker.close()
      p.terminate()

  if __name__ == "__main__":
      args = parse_args()
      init_weave(args.weave_project)
      fd = sys.stdin.fileno()
      old_settings = termios.tcgetattr(fd)
      try:
          asyncio.run(main(input_device_index=args.input_device, output_device_index=args.output_device))
      finally:
          termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
  ```
</Accordion>

3. Update the `DEFAULT_WEAVE_PROJECT` value with your team and project names.

4. Set your `OPENAI_API_KEY` environment variable.

5. Run the code:

python weave_voice_assistant.py


Once running, press **T** on your keyboard to mute or unmute the mic. The assistant uses server-side voice activity detection to handle turn-taking and interruptions.

As you speak to the assistant, Weave captures traces that you can explore in the Weave UI, including the audio from the session.

### Run a realtime voice assistant using WebSockets

The following example connects directly to the OpenAI Realtime API over WebSockets. It streams microphone audio to the API, plays back spoken responses, and supports tool calling (weather lookups, math evaluation, code execution, and file writing). Weave traces the session the using `weave.init()` and `patch_openai_realtime()`.

To run the example:

1. Start your Python environment and install the following libraries:

<span class="tab-group-start"></span>
<span class="tab-start" data-tab-title="uv"></span>
```bash
    uv add weave websockets pyaudio numpy
    ```
<span class="tab-end"></span>

<span class="tab-start" data-tab-title="pip"></span>
```bash
    pip install weave websockets pyaudio numpy
    ```
<span class="tab-end"></span>
<span class="tab-group-end"></span>

2. Create a file titled `tool_definitions.py` and add the following tool definitions to it. The main application imports from this module.

<Accordion title="tool_definitions.py">
```python
  import json
  import subprocess
  import tempfile
  from pathlib import Path
  import weave


  # @function_tool
  @weave.op
  def get_weather(city: str) -> str:
      """Get the current weather for a city.

      Args:
          city: The city name to get weather for.
      """
      return json.dumps({"city": city, "temperature": "72°F", "condition": "sunny"})


  @weave.op
  def calculate(expression: str) -> str:
      """Evaluate a math expression and return the result.

      Args:
          expression: A math expression to evaluate, e.g. '2 + 2'.
      """
      try:
          result = eval(expression)
          return str(result)
      except Exception as e:
          return f"Error: {e}"


  @weave.op
  def run_python_code(code: str) -> str:
      """Write and execute a Python script, returning its stdout/stderr.

      Args:
          code: The Python source code to execute.
      """
      with tempfile.NamedTemporaryFile(
          mode="w", suffix=".py", dir=tempfile.gettempdir(), delete=False
      ) as f:
          f.write(code)
          script_path = Path(f.name)

      try:
          result = subprocess.run(
              ["python", str(script_path)],
              capture_output=True,
              text=True,
              timeout=30,
          )
          output = result.stdout
          if result.stderr:
              output += f"\nSTDERR:\n{result.stderr}"
          if result.returncode != 0:
              output += f"\n(exit code {result.returncode})"
          return output or "(no output)"
      except subprocess.TimeoutExpired:
          return "Error: script timed out after 30 seconds."
      finally:
          script_path.unlink(missing_ok=True)


  @weave.op
  async def write_file(file_path: str, content: str) -> str:
      """Write content to a file on disk.

      Args:
          file_path: The path to write the file to.
          content: The content to write into the file.
      """
      try:
          path = Path(file_path)
          path.parent.mkdir(parents=True, exist_ok=True)
          path.write_text(content)
          return f"Wrote {len(content)} bytes to {file_path}"
      except Exception as e:
          return f"Error writing file: {e}"
  ```
</Accordion>

3. Create a file titled `weave_ws_voice_assistant.py` in the same directory and add the following code to it.

<Accordion title="weave_ws_voice_assistant.py">
```python
  import asyncio
  import base64
  import json
  import os
  import queue
  import threading
  from typing import Any, Callable

  import numpy as np
  import pyaudio
  import websockets

  import weave
  weave.init("<your-team-name/your-project-name>")
  from weave.integrations import patch_openai_realtime
  patch_openai_realtime()

  from tool_definitions import (
      calculate,
      get_weather,
      run_python_code,
      write_file,
  )

  # Audio format (must be PCM16 for the Realtime API).
  FORMAT = pyaudio.paInt16
  RATE = 24000
  CHUNK = 1024
  MAX_INPUT_CHANNELS = 2
  MAX_OUTPUT_CHANNELS = 2

  OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
  REALTIME_URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime"

  DEBUG_WRITE_LOG = False

  # Map tool name -> callable for function call dispatch.
  TOOL_REGISTRY: dict[str, Callable[..., Any]] = {
      "get_weather": get_weather,
      "calculate": calculate,
      "run_python_code": run_python_code,
      "write_file": write_file,
  }

  # Raw tool definitions for the Realtime API session config.
  TOOL_DEFINITIONS = [
      {
          "type": "function",
          "name": "get_weather",
          "description": "Get the current weather for a city.",
          "parameters": {
              "type": "object",
              "properties": {
                  "city": {
                      "type": "string",
                      "description": "The city name to get weather for.",
                  }
              },
              "required": ["city"],
          },
      },
      {
          "type": "function",
          "name": "calculate",
          "description": "Evaluate a math expression and return the result.",
          "parameters": {
              "type": "object",
              "properties": {
                  "expression": {
                      "type": "string",
                      "description": "A math expression to evaluate, e.g. '2 + 2'.",
                  }
              },
              "required": ["expression"],
          },
      },
      {
          "type": "function",
          "name": "run_python_code",
          "description": "Write and execute a Python script, returning its stdout/stderr.",
          "parameters": {
              "type": "object",
              "properties": {
                  "code": {
                      "type": "string",
                      "description": "The Python source code to execute.",
                  }
              },
              "required": ["code"],
          },
      },
      {
          "type": "function",
          "name": "write_file",
          "description": "Write content to a file on disk.",
          "parameters": {
              "type": "object",
              "properties": {
                  "file_path": {
                      "type": "string",
                      "description": "The path to write the file to.",
                  },
                  "content": {
                      "type": "string",
                      "description": "The content to write into the file.",
                  },
              },
              "required": ["file_path", "content"],
          },
      },
  ]


  async def send_event(ws, event: dict) -> None:
      await ws.send(json.dumps(event))


  async def configure_session(ws) -> None:
      event = {
          "type": "session.update",
          "session": {
              "type": "realtime",
              "model": "gpt-realtime",
              "output_modalities": ["audio"],
              "instructions": (
                  "You are a helpful AI assistant with access to tools. "
                  "Use tools to accomplish tasks whenever possible. "
                  "Speak clearly and briefly."
              ),
              "tools": TOOL_DEFINITIONS,
              "tool_choice": "auto",
              "audio": {
                  "input": {
                      "format": {"type": "audio/pcm", "rate": 24000},
                      "transcription": {"model": "gpt-4o-transcribe"},
                      "turn_detection": {
                          "type": "server_vad",
                          "threshold": 0.5,
                          "prefix_padding_ms": 300,
                          "silence_duration_ms": 500,
                      },
                  },
                  "output": {
                      "format": {"type": "audio/pcm", "rate": 24000},
                  },
              },
          },
      }
      await send_event(ws, event)
      print("Session configured.")


  async def handle_function_call(ws, call_id: str, name: str, arguments: str) -> None:
      if not name:
          raise Exception("Did not get a function name")

      print(f"\n[Function Call] {name}({arguments})")
      tool_fn = TOOL_REGISTRY.get(name)
      if tool_fn is None:
          result = json.dumps({"error": f"Unknown function: {name}"})
      else:
          try:
              args = json.loads(arguments)
              result = tool_fn(**args)
              if asyncio.iscoroutine(result):
                  result = await result
          except Exception as e:
              result = json.dumps({"error": str(e)})

      print(f"[Function Result] {result}")

      # Send the function call output back to the model.
      await send_event(ws, {
          "type": "conversation.item.create",
          "item": {
              "type": "function_call_output",
              "call_id": call_id,
              "output": result if isinstance(result, str) else json.dumps(result),
          },
      })

      # Trigger a new response so the model incorporates the function result.
      await send_event(ws, {"type": "response.create"})


  def play_audio(output_stream: pyaudio.Stream, audio_output_queue: queue.Queue):
      """Runs in a separate thread because pyaudio's write() blocks until the
      sound card consumes the samples. Decoupling playback from the async event
      loop lets us flush the queue on interrupt without waiting for in-flight
      writes to finish."""
      while True:
          data = audio_output_queue.get()
          if data is None:
              break
          output_stream.write(data)


  async def send_mic_audio(ws, mic) -> None:
      try:
          while True:
              raw_data = mic.read(CHUNK, exception_on_overflow=False)

              # Visual volume meter.
              audio_data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float64)
              rms = np.sqrt(np.mean(audio_data**2))
              meter = int(min(rms / 50, 50))
              print(f"Mic Level: {'ā–ˆ' * meter}{' ' * (50 - meter)} |", end="\r")

              # Base64-encode and send audio chunk.
              b64_audio = base64.b64encode(raw_data).decode("utf-8")
              await send_event(ws, {
                  "type": "input_audio_buffer.append",
                  "audio": b64_audio,
              })

              await asyncio.sleep(0)
      except asyncio.CancelledError:
          pass


  async def receive_events(ws, audio_output_queue: queue.Queue) -> None:
      # Accumulate function call arguments across delta events.
      pending_calls: dict[str, dict] = {}
      async for raw_message in ws:
          if DEBUG_WRITE_LOG:
              with open("data.jsonl", "a", encoding="utf-8") as f:
                  f.write(json.dumps(raw_message) + "\n")

          event = json.loads(raw_message)
          event_type = event.get("type", "")

          if event_type == "session.created":
              print(raw_message)

          elif event_type == "session.updated":
              print(raw_message)

          elif event_type == "error":
              print(f"\n[Error] {event}")

          elif event_type == "input_audio_buffer.speech_started":
              # Flush queued AI audio so it doesn't talk over the user.
              while not audio_output_queue.empty():
                  try:
                      audio_output_queue.get_nowait()
                  except queue.Empty:
                      break

          elif event_type == "input_audio_buffer.speech_stopped":
              pass

          elif event_type == "input_audio_buffer.committed":
              pass

          elif event_type == "response.created":
              pass

          elif event_type == "response.output_text.delta":
              pass

          elif event_type == "response.output_text.done":
              pass

          # Audio output deltas - queue for playback.
          elif event_type == "response.output_audio.delta":
              audio_bytes = base64.b64decode(event.get("delta", ""))
              audio_output_queue.put(audio_bytes)

          elif event_type == "response.output_audio_transcript.delta":
              pass

          elif event_type == "response.output_audio_transcript.done":
              pass

          # Function call started - initialize pending call.
          elif event_type == "response.output_item.added":
              item = event.get("item", {})
              if item.get("type") == "function_call" and item.get("status") == "in_progress":
                  item_id = item.get("id", "")
                  pending_calls[item_id] = {
                      "call_id": item.get("call_id", ""),
                      "name": item.get("name", ""),
                      "arguments": "",
                  }
                  print(f"\n[Function Call Started] {item.get('name', '')}")

          # Function call argument deltas - accumulate.
          elif event_type == "response.function_call_arguments.delta":
              item_id = event.get("item_id", "")
              if item_id in pending_calls:
                  pending_calls[item_id]["arguments"] += event.get("delta", "")

          elif event_type == "response.function_call_arguments.done":
              item_id = event.get("item_id", "")
              call_info = pending_calls.pop(item_id, None)
              if call_info is None:
                  # Fallback: use data directly from the done event.
                  call_info = {
                      "call_id": event.get("call_id"),
                      "name": event.get("name"),
                      "arguments": event.get("arguments"),
                  }
              try:
                  await handle_function_call(
                      ws,
                      call_info["call_id"],
                      call_info["name"],
                      call_info["arguments"],
                  )
              except Exception as e:
                  print(f"Failed to call function for message {call_info}: error - {e}")

          elif event_type == "response.done":
              pass

          elif event_type == "rate_limits.updated":
              pass

          else:
              print(f"\n[Event: {event_type}]")


  async def main():
      if not OPENAI_API_KEY:
          print("Error: OPENAI_API_KEY environment variable not set")
          return

      p = pyaudio.PyAudio()

      input_device_index = int(p.get_default_input_device_info()['index'])
      output_device_index = int(p.get_default_output_device_info()['index'])

      # Channel count must match the device's capabilities or pyaudio errors upon open.
      input_info = p.get_device_info_by_index(input_device_index)
      output_info = p.get_device_info_by_index(output_device_index)
      input_channels = min(int(input_info['maxInputChannels']), 1)
      output_channels = min(int(output_info['maxOutputChannels']), 1)

      mic = p.open(
          format=FORMAT,
          channels=input_channels,
          rate=RATE,
          input=True,
          output=False,
          frames_per_buffer=CHUNK,
          input_device_index=input_device_index,
          start=False,
      )
      speaker = p.open(
          format=FORMAT,
          channels=output_channels,
          rate=RATE,
          input=False,
          output=True,
          frames_per_buffer=CHUNK,
          output_device_index=output_device_index,
          start=False,
      )
      mic.start_stream()
      speaker.start_stream()

      # Audio goes through a queue so we can flush it when the user interrupts.
      # Writing directly to the speaker makes it impossible to cancel in-flight audio.
      audio_output_queue = queue.Queue()
      threading.Thread(
          target=play_audio, args=(speaker, audio_output_queue), daemon=True
      ).start()

      headers = {
          "Authorization": f"Bearer {OPENAI_API_KEY}",
      }

      print("Connecting to OpenAI Realtime API...")

      async with websockets.connect(
          REALTIME_URL,
          additional_headers=headers,
      ) as ws:
          print("Connected! Configuring session...")
          await configure_session(ws)

          print("--- Session Active (Speak into mic) ---")

          mic_task = asyncio.create_task(send_mic_audio(ws, mic))
          try:
              await receive_events(ws, audio_output_queue)
          finally:
              mic_task.cancel()
              try:
                  await mic_task
              except asyncio.CancelledError:
                  pass

      # Cleanup
      audio_output_queue.put(None)  # Signal playback thread to exit.
      mic.close()
      speaker.close()
      p.terminate()
      print("\nSession ended.")


  if __name__ == "__main__":
      asyncio.run(main())
  ```
</Accordion>

4. Update the `weave.init()` call with your team and project names.

5. Set your `OPENAI_API_KEY` environment variable.

6. Run the code:

```bash
python weave_ws_voice_assistant.py

Once running, press T on your keyboard to mute or unmute the mic. The assistant uses server-side voice activity detection to handle turn-taking and interruptions.

As you speak to the assistant, Weave captures traces that you can explore in the Weave UI, including the audio from the session.

Link last verified June 7, 2026. View original ↗
Source: Weights & Biases Docs
Link last verified: 2026-04-05