Files
2026-04-16 10:00:37 -04:00

4.9 KiB

Agent guide — server/

The audio pipeline and FastAPI surface. The video stack lives under video_models/ and has its own guide.

Module map

  • main.py — FastAPI app, lifespan, /ws/chat WebSocket, video/avatar HTTP endpoints. Keep business logic out of here; this is a transport layer.
  • models.pyModelManager.load_all(). All models are loaded once at startup in a fixed order: VAD → ASR → LLM → TTS → (optional) Video. video_engine stays None when config.video.enabled is false — callers MUST tolerate that.
  • config.py — thin YAML loader, exposes a single config dict. Don't scatter yaml.safe_load elsewhere.
  • pipeline.pyConversationSession, one instance per WebSocket. Owns per-session state (VAD stream, conversation history, KV cache, cancel event). Orchestrates VAD → ASR → LLM → TTS and optionally the video branch.
  • vad.py — Silero VAD via ONNX Runtime on CPU. StreamingVAD.process_chunk(pcm_16k) → utterance | None. Returns a full utterance only on speech→silence transition.
  • asr.py — Qwen3-ASR wrapper. Sync, called under asyncio.to_thread.
  • llm.py — two backends behind a common generate(history, max_new_tokens, kv_cache_state) → (text, KVCacheState) signature: LLMEngine (local transformers) and LMStudioEngine (HTTP, no KV cache).
  • tts.py — Kokoro wrapper. The per-segment generator yields (graphemes, _ps, audio_f32) tuples at 24 kHz.
  • audio_utils.pypcm_bytes_to_float32 / float32_to_pcm_bytes. The WebSocket protocol is 16 kHz int16 PCM in, 24 kHz int16 PCM out.
  • video.pyVideoConfig, LoRASpec, VideoEngine. Orchestrates Wan2.2 + MuseTalk. Gated by config.video.enabled.
  • video_models/ — see video_models/AGENT.md for Blackwell/GGUF gotchas.

Session lifecycle (pipeline.py)

  1. Client connects → ConversationSession(models, send_json, send_bytes)start() emits {type: "status", state: "listening"} and a {type: "video_mode", ...} hint.
  2. Inbound binary frames are 16 kHz int16 PCM → handle_audio_chunk → VAD. On speech→silence the session kicks off _process_utterance as an asyncio.Task so it never blocks the WebSocket receive loop.
  3. _process_utterance flow: ASR → LLM → TTS stream. Each blocking call wraps in asyncio.to_thread.
  4. TTS output is split into short segments via _split_into_segments before synthesis so streaming chunks stay small.
  5. TTS runs on a background threading.Thread, feeding a queue.Queue that the async loop drains.

Barge-in

  • cancel_event: threading.Event is the single stop signal. Checked between every stage and inside the TTS queue drain loop.
  • Two ways to trigger: new VAD utterance while is_responding (handle_audio_chunk), or an explicit {type: "interrupt", last_chunk_id} text message (interrupt).
  • On cancel: set the event, send {type: "interrupt"} to the client so it flushes its audio buffer, and discard any pending video clip.
  • Don't add work after cancel_event.is_set() without checking — that's how zombie audio/video reaches the client after a barge-in.

Video branch

The audio pipeline changes shape when video_engine.is_ready():

  • PCM chunks and response_text are not streamed during the turn — they're buffered.
  • TTS audio is concatenated into one float32 array.
  • After TTS completes, video_engine.generate_speaking_clip(audio, sr, reply_text) renders the MP4 (blocking, wrapped in to_thread).
  • The full clip + final text is sent as a single speaking_clip message.

If you extend the audio pipeline, preserve this dual-mode behaviour: the client's UX is very different between the two paths, and mixing them (e.g. sending PCM and a clip) will double-play the audio.

Conventions

  • Dataclasses for structured config (see VideoConfig, LoRASpec). Parse once in *.from_dict; don't re-read config.yml mid-session.
  • asyncio.to_thread for any sync model call from an async context. Never call .generate() / .transcribe() / .pipeline() directly on the event loop.
  • Locks: VideoEngine._lock serialises model state mutations; ConversationSession is not locked because each WebSocket gets its own instance.
  • Logging: one log = logging.getLogger(__name__) per module. INFO for lifecycle and per-turn milestones; avoid DEBUG spam in hot loops.
  • Keep main.py thin. New endpoints should delegate to a method on ModelManager or an engine class.

Testing

  • tests/unit/test_pipeline_video_branch.py — the video vs. audio path selection. Update it if you change the use_video condition.
  • tests/unit/test_video_config.py / test_video_engine_logic.py — config parsing and the pure logic in video.py.
  • Component tests live in tests/component/ and require the Docker GPU environment. See tests/README.md.