#!/usr/bin/env python3
# Timestamp: "2026-02-06 23:02:36 (ywatanabe)"
# File: /home/ywatanabe/proj/scitex-python/src/scitex/audio/_mcp/handlers.py
"""Utility handlers for the scitex-audio MCP server."""
import asyncio
import base64
from datetime import datetime
from pathlib import Path
__all__ = [
"speak_handler",
"generate_audio_handler",
"list_backends_handler",
"list_voices_handler",
"play_audio_handler",
"list_audio_files_handler",
"clear_audio_cache_handler",
"check_audio_status_handler",
"speech_queue_status_handler",
"announce_context_handler",
]
def _get_audio_dir() -> Path:
"""Get the directory where generated TTS files are written.
Returns ``~/.scitex/audio/runtime/tts/`` — under the ``runtime/``
carve-out (the only untracked subtree of the audio state dir).
"""
from .._state_paths import tts_output_dir
return tts_output_dir()
def _get_signature() -> str:
"""Get signature string with hostname, project, and branch."""
import os
import socket
import subprocess
hostname = socket.gethostname()
cwd = os.getcwd()
project = os.path.basename(cwd)
branch = None
try:
result = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True,
text=True,
cwd=cwd,
timeout=5,
)
if result.returncode == 0:
branch = result.stdout.strip()
except Exception:
pass
parts = [hostname, project]
if branch:
parts.append(branch)
return ". ".join(parts) + ". "
[docs]
async def generate_audio_handler(
text: str,
backend: str | None = None,
voice: str | None = None,
output_path: str | None = None,
return_base64: bool = False,
speak_fn=None,
audio_dir=None,
) -> dict:
"""Generate audio file without playing.
Args:
speak_fn: Injectable TTS function (testing). Defaults to
``scitex_audio.speak``.
audio_dir: Injectable output directory (testing). Defaults to
``_get_audio_dir()``.
"""
try:
if speak_fn is not None:
tts_speak = speak_fn
else:
from .. import speak as tts_speak
loop = asyncio.get_event_loop()
if not output_path:
base_dir = audio_dir if audio_dir is not None else _get_audio_dir()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = str(base_dir / f"tts_{timestamp}.mp3")
def do_generate():
return tts_speak(
text=text,
backend=backend,
voice=voice,
play=False,
output_path=output_path,
fallback=True,
)
result_path = await loop.run_in_executor(None, do_generate)
result = {
"success": True,
"path": str(result_path),
"text": text,
"backend": backend,
"timestamp": datetime.now().isoformat(),
}
if result_path.exists():
result["size_kb"] = round(result_path.stat().st_size / 1024, 2)
if return_base64 and result_path.exists():
with open(result_path, "rb") as f:
result["base64"] = base64.b64encode(f.read()).decode()
return result
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def list_backends_handler(available_fn=None, fallback_order=None) -> dict:
"""List available TTS backends.
Args:
available_fn: Injectable available-backends function (testing).
Defaults to ``scitex_audio.available_backends``.
fallback_order: Injectable fallback order (testing). Defaults to
``scitex_audio.FALLBACK_ORDER``.
"""
try:
if available_fn is None or fallback_order is None:
from .. import FALLBACK_ORDER, available_backends
if available_fn is None:
available_fn = available_backends
if fallback_order is None:
fallback_order = FALLBACK_ORDER
backends = available_fn()
info = []
for b in fallback_order:
available = b in backends
desc = {
"elevenlabs": "ElevenLabs - Paid, high quality",
"gtts": "Google TTS - Free, requires internet",
"pyttsx3": "System TTS - Offline, uses espeak/SAPI5",
}
info.append(
{
"name": b,
"available": available,
"description": desc.get(b, ""),
}
)
# Determine actual default based on fallback order
default = None
for b in fallback_order:
if b in backends:
default = b
break
return {
"success": True,
"backends": info,
"available": backends,
"default": default,
}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def list_voices_handler(backend: str = "gtts", get_tts_fn=None) -> dict:
"""List available voices for a backend.
Args:
get_tts_fn: Injectable engine factory (testing). Defaults to
``scitex_audio.get_tts``.
"""
try:
if get_tts_fn is not None:
get_tts = get_tts_fn
else:
from .. import get_tts
loop = asyncio.get_event_loop()
def do_list():
tts = get_tts(backend)
return tts.get_voices()
voices = await loop.run_in_executor(None, do_list)
return {
"success": True,
"backend": backend,
"voices": voices,
"count": len(voices),
}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def play_audio_handler(path: str, player=None) -> dict:
"""Play an audio file.
Args:
player: Injectable playback callable taking a ``Path`` (testing).
Defaults to ``BaseTTS._play_audio``.
"""
try:
from .._engines._base import BaseTTS
path_obj = Path(path)
if not path_obj.exists():
return {"success": False, "error": f"File not found: {path}"}
loop = asyncio.get_event_loop()
play = (
player if player is not None else (lambda p: BaseTTS._play_audio(None, p))
)
def do_play():
play(path_obj)
await loop.run_in_executor(None, do_play)
return {
"success": True,
"played": str(path_obj),
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def list_audio_files_handler(limit: int = 20, audio_dir=None) -> dict:
"""List generated audio files.
Args:
audio_dir: Injectable directory to scan (testing). Defaults to
``_get_audio_dir()``.
"""
try:
audio_dir = audio_dir if audio_dir is not None else _get_audio_dir()
if not audio_dir.exists():
return {"success": True, "files": [], "count": 0}
audio_files = sorted(
list(audio_dir.glob("*.mp3")) + list(audio_dir.glob("*.wav")),
key=lambda p: p.stat().st_mtime,
reverse=True,
)[:limit]
files = []
for f in audio_files:
files.append(
{
"name": f.name,
"path": str(f),
"size_kb": round(f.stat().st_size / 1024, 2),
"created": datetime.fromtimestamp(f.stat().st_mtime).isoformat(),
}
)
total_size = sum(f.stat().st_size for f in audio_dir.glob("*.*"))
return {
"success": True,
"files": files,
"count": len(files),
"total_size_mb": round(total_size / (1024 * 1024), 2),
"audio_dir": str(audio_dir),
}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def clear_audio_cache_handler(max_age_hours: float = 24, audio_dir=None) -> dict:
"""Clear audio cache.
Args:
audio_dir: Injectable directory to clear (testing). Defaults to
``_get_audio_dir()``.
"""
try:
audio_dir = audio_dir if audio_dir is not None else _get_audio_dir()
if not audio_dir.exists():
return {"success": True, "deleted": 0}
deleted = 0
now = datetime.now()
for f in list(audio_dir.glob("*.mp3")) + list(audio_dir.glob("*.wav")):
try:
if max_age_hours == 0:
f.unlink()
deleted += 1
else:
mtime = datetime.fromtimestamp(f.stat().st_mtime)
age_hours = (now - mtime).total_seconds() / 3600
if age_hours > max_age_hours:
f.unlink()
deleted += 1
except Exception:
pass
return {
"success": True,
"deleted": deleted,
"max_age_hours": max_age_hours,
}
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def check_audio_status_handler(status_fn=None) -> dict:
"""Check WSL audio connectivity and available playback methods.
Args:
status_fn: Injectable status probe (testing). Defaults to
``scitex_audio.check_wsl_audio``.
"""
try:
if status_fn is not None:
check_wsl_audio = status_fn
else:
from .. import check_wsl_audio
status = check_wsl_audio()
status["success"] = True
status["timestamp"] = datetime.now().isoformat()
return status
except Exception as e:
return {"success": False, "error": str(e)}
def _emit_browser_speech(text: str) -> None:
"""Emit OSC escape sequence to relay speech through PTY to browser.
When running inside a SciTeX Cloud Apptainer container, there is no
local audio sink. Instead, we emit a custom OSC escape:
\\x1b]9999;speak:<base64-text>\\x07
This flows through: PTY → WebSocket → browser xterm.js → speakText().
"""
import sys
b64 = base64.b64encode(text.encode()).decode()
# Write to stderr so it doesn't interfere with MCP stdio protocol.
# The PTY captures both stdout and stderr.
sys.stderr.write(f"\x1b]9999;speak:{b64}\x07")
sys.stderr.flush()
[docs]
async def speak_handler(
text: str,
backend: str | None = None,
voice: str | None = None,
rate: int = 150,
speed: float = 1.5,
play: bool = True,
save: bool = False,
output_path: str | None = None,
fallback: bool = True,
agent_id: str | None = None,
wait: bool = True,
signature: bool = False,
num_threads: int | None = None,
speak_fn=None,
audio_dir=None,
signature_fn=None,
) -> dict:
"""Convert text to speech with fallback.
Args:
save: If True and output_path is None, auto-generate a timestamped path.
output_path: Explicit path to save the audio file (overrides save flag).
signature: If True, prepend hostname/project/branch to text.
num_threads: CPU thread count for LuxTTS backend (None=default).
speak_fn: Injectable TTS function (testing). Defaults to
``scitex_audio.speak``.
audio_dir: Injectable output directory (testing). Defaults to
``_get_audio_dir()``.
signature_fn: Injectable signature builder (testing). Defaults to
:func:`_get_signature`.
"""
import os
try:
# Prepend signature if requested
final_text = text
sig = None
if signature:
build_sig = signature_fn if signature_fn is not None else _get_signature
sig = build_sig()
final_text = sig + text
# SciTeX Cloud container mode: relay speech to browser via OSC escape
if os.environ.get("SCITEX_CLOUD") == "true":
_emit_browser_speech(final_text)
result = {
"success": True,
"text": text,
"backend": "browser_relay",
"played": True,
"play_requested": play,
"mode": "cloud_relay",
"timestamp": datetime.now().isoformat(),
}
if signature:
result["signature"] = sig
result["full_text"] = final_text
return result
# Local mode: use scitex.audio directly
if speak_fn is not None:
tts_speak = speak_fn
else:
from .. import speak as tts_speak
loop = asyncio.get_event_loop()
if output_path is None and save:
base_dir = audio_dir if audio_dir is not None else _get_audio_dir()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = str(base_dir / f"tts_{timestamp}.mp3")
def do_speak():
return tts_speak(
text=final_text,
backend=backend,
voice=voice,
rate=rate,
speed=speed,
play=play,
output_path=output_path,
fallback=fallback,
num_threads=num_threads,
)
speak_result = await loop.run_in_executor(None, do_speak)
result = {
"success": speak_result.get("success", True),
"text": text,
"backend": speak_result.get("backend", backend),
"played": speak_result.get("played", False),
"play_requested": play,
"mode": speak_result.get("mode", "local"),
"timestamp": datetime.now().isoformat(),
}
if signature:
result["signature"] = sig
result["full_text"] = final_text
if speak_result.get("path"):
result["path"] = str(speak_result["path"])
return result
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def speech_queue_status_handler() -> dict:
"""Get current speech queue status."""
try:
from .._cross_process_lock import AudioPlaybackLock
lock = AudioPlaybackLock()
try:
acquired = lock.acquire(timeout=0.1)
if acquired:
lock.release()
status = {"locked": False, "message": "Audio playback queue is idle"}
else:
status = {"locked": True, "message": "Audio playback in progress"}
except Exception:
status = {"locked": False, "message": "Could not check lock state"}
status["success"] = True
return status
except Exception as e:
return {"success": False, "error": str(e)}
[docs]
async def announce_context_handler(
include_full_path: bool = False,
branch_resolver=None,
speak_fn=None,
) -> dict:
"""Announce current working directory and git branch.
Args:
branch_resolver: Injectable callable returning the git branch name
(or None) for ``cwd`` (testing). Defaults to a real ``git
rev-parse`` subprocess.
speak_fn: Injectable speak handler (testing). Defaults to
:func:`speak_handler`.
"""
try:
import os
import subprocess
cwd = os.getcwd()
dir_name = cwd if include_full_path else os.path.basename(cwd)
if branch_resolver is not None:
branch = branch_resolver(cwd)
else:
branch = None
try:
result = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True,
text=True,
cwd=cwd,
)
if result.returncode == 0:
branch = result.stdout.strip()
except Exception:
pass
if branch:
text = f"Working in {dir_name}, on branch {branch}"
else:
text = f"Working in {dir_name}"
speak = speak_fn if speak_fn is not None else speak_handler
speak_result = await speak(text=text, speed=1.5)
return {
"success": True,
"directory": dir_name,
"branch": branch,
"announced": text,
"speak_result": speak_result,
}
except Exception as e:
return {"success": False, "error": str(e)}
# EOF