Source code for scitex_audio._mcp.handlers

#!/usr/bin/env python3
# Timestamp: "2026-02-06 23:02:36 (ywatanabe)"
# File: /home/ywatanabe/proj/scitex-python/src/scitex/audio/_mcp/handlers.py


"""Utility handlers for the scitex-audio MCP server."""

import asyncio
import base64
from datetime import datetime
from pathlib import Path

__all__ = [
    "speak_handler",
    "generate_audio_handler",
    "list_backends_handler",
    "list_voices_handler",
    "play_audio_handler",
    "list_audio_files_handler",
    "clear_audio_cache_handler",
    "check_audio_status_handler",
    "speech_queue_status_handler",
    "announce_context_handler",
]


def _get_audio_dir() -> Path:
    """Get the directory where generated TTS files are written.

    Returns ``~/.scitex/audio/runtime/tts/`` — under the ``runtime/``
    carve-out (the only untracked subtree of the audio state dir).
    """
    from .._state_paths import tts_output_dir

    return tts_output_dir()


def _get_signature() -> str:
    """Get signature string with hostname, project, and branch."""
    import os
    import socket
    import subprocess

    hostname = socket.gethostname()
    cwd = os.getcwd()
    project = os.path.basename(cwd)

    branch = None
    try:
        result = subprocess.run(
            ["git", "rev-parse", "--abbrev-ref", "HEAD"],
            capture_output=True,
            text=True,
            cwd=cwd,
            timeout=5,
        )
        if result.returncode == 0:
            branch = result.stdout.strip()
    except Exception:
        pass

    parts = [hostname, project]
    if branch:
        parts.append(branch)

    return ". ".join(parts) + ". "


[docs] async def generate_audio_handler( text: str, backend: str | None = None, voice: str | None = None, output_path: str | None = None, return_base64: bool = False, speak_fn=None, audio_dir=None, ) -> dict: """Generate audio file without playing. Args: speak_fn: Injectable TTS function (testing). Defaults to ``scitex_audio.speak``. audio_dir: Injectable output directory (testing). Defaults to ``_get_audio_dir()``. """ try: if speak_fn is not None: tts_speak = speak_fn else: from .. import speak as tts_speak loop = asyncio.get_event_loop() if not output_path: base_dir = audio_dir if audio_dir is not None else _get_audio_dir() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = str(base_dir / f"tts_{timestamp}.mp3") def do_generate(): return tts_speak( text=text, backend=backend, voice=voice, play=False, output_path=output_path, fallback=True, ) result_path = await loop.run_in_executor(None, do_generate) result = { "success": True, "path": str(result_path), "text": text, "backend": backend, "timestamp": datetime.now().isoformat(), } if result_path.exists(): result["size_kb"] = round(result_path.stat().st_size / 1024, 2) if return_base64 and result_path.exists(): with open(result_path, "rb") as f: result["base64"] = base64.b64encode(f.read()).decode() return result except Exception as e: return {"success": False, "error": str(e)}
[docs] async def list_backends_handler(available_fn=None, fallback_order=None) -> dict: """List available TTS backends. Args: available_fn: Injectable available-backends function (testing). Defaults to ``scitex_audio.available_backends``. fallback_order: Injectable fallback order (testing). Defaults to ``scitex_audio.FALLBACK_ORDER``. """ try: if available_fn is None or fallback_order is None: from .. import FALLBACK_ORDER, available_backends if available_fn is None: available_fn = available_backends if fallback_order is None: fallback_order = FALLBACK_ORDER backends = available_fn() info = [] for b in fallback_order: available = b in backends desc = { "elevenlabs": "ElevenLabs - Paid, high quality", "gtts": "Google TTS - Free, requires internet", "pyttsx3": "System TTS - Offline, uses espeak/SAPI5", } info.append( { "name": b, "available": available, "description": desc.get(b, ""), } ) # Determine actual default based on fallback order default = None for b in fallback_order: if b in backends: default = b break return { "success": True, "backends": info, "available": backends, "default": default, } except Exception as e: return {"success": False, "error": str(e)}
[docs] async def list_voices_handler(backend: str = "gtts", get_tts_fn=None) -> dict: """List available voices for a backend. Args: get_tts_fn: Injectable engine factory (testing). Defaults to ``scitex_audio.get_tts``. """ try: if get_tts_fn is not None: get_tts = get_tts_fn else: from .. import get_tts loop = asyncio.get_event_loop() def do_list(): tts = get_tts(backend) return tts.get_voices() voices = await loop.run_in_executor(None, do_list) return { "success": True, "backend": backend, "voices": voices, "count": len(voices), } except Exception as e: return {"success": False, "error": str(e)}
[docs] async def play_audio_handler(path: str, player=None) -> dict: """Play an audio file. Args: player: Injectable playback callable taking a ``Path`` (testing). Defaults to ``BaseTTS._play_audio``. """ try: from .._engines._base import BaseTTS path_obj = Path(path) if not path_obj.exists(): return {"success": False, "error": f"File not found: {path}"} loop = asyncio.get_event_loop() play = ( player if player is not None else (lambda p: BaseTTS._play_audio(None, p)) ) def do_play(): play(path_obj) await loop.run_in_executor(None, do_play) return { "success": True, "played": str(path_obj), "timestamp": datetime.now().isoformat(), } except Exception as e: return {"success": False, "error": str(e)}
[docs] async def list_audio_files_handler(limit: int = 20, audio_dir=None) -> dict: """List generated audio files. Args: audio_dir: Injectable directory to scan (testing). Defaults to ``_get_audio_dir()``. """ try: audio_dir = audio_dir if audio_dir is not None else _get_audio_dir() if not audio_dir.exists(): return {"success": True, "files": [], "count": 0} audio_files = sorted( list(audio_dir.glob("*.mp3")) + list(audio_dir.glob("*.wav")), key=lambda p: p.stat().st_mtime, reverse=True, )[:limit] files = [] for f in audio_files: files.append( { "name": f.name, "path": str(f), "size_kb": round(f.stat().st_size / 1024, 2), "created": datetime.fromtimestamp(f.stat().st_mtime).isoformat(), } ) total_size = sum(f.stat().st_size for f in audio_dir.glob("*.*")) return { "success": True, "files": files, "count": len(files), "total_size_mb": round(total_size / (1024 * 1024), 2), "audio_dir": str(audio_dir), } except Exception as e: return {"success": False, "error": str(e)}
[docs] async def clear_audio_cache_handler(max_age_hours: float = 24, audio_dir=None) -> dict: """Clear audio cache. Args: audio_dir: Injectable directory to clear (testing). Defaults to ``_get_audio_dir()``. """ try: audio_dir = audio_dir if audio_dir is not None else _get_audio_dir() if not audio_dir.exists(): return {"success": True, "deleted": 0} deleted = 0 now = datetime.now() for f in list(audio_dir.glob("*.mp3")) + list(audio_dir.glob("*.wav")): try: if max_age_hours == 0: f.unlink() deleted += 1 else: mtime = datetime.fromtimestamp(f.stat().st_mtime) age_hours = (now - mtime).total_seconds() / 3600 if age_hours > max_age_hours: f.unlink() deleted += 1 except Exception: pass return { "success": True, "deleted": deleted, "max_age_hours": max_age_hours, } except Exception as e: return {"success": False, "error": str(e)}
[docs] async def check_audio_status_handler(status_fn=None) -> dict: """Check WSL audio connectivity and available playback methods. Args: status_fn: Injectable status probe (testing). Defaults to ``scitex_audio.check_wsl_audio``. """ try: if status_fn is not None: check_wsl_audio = status_fn else: from .. import check_wsl_audio status = check_wsl_audio() status["success"] = True status["timestamp"] = datetime.now().isoformat() return status except Exception as e: return {"success": False, "error": str(e)}
def _emit_browser_speech(text: str) -> None: """Emit OSC escape sequence to relay speech through PTY to browser. When running inside a SciTeX Cloud Apptainer container, there is no local audio sink. Instead, we emit a custom OSC escape: \\x1b]9999;speak:<base64-text>\\x07 This flows through: PTY → WebSocket → browser xterm.js → speakText(). """ import sys b64 = base64.b64encode(text.encode()).decode() # Write to stderr so it doesn't interfere with MCP stdio protocol. # The PTY captures both stdout and stderr. sys.stderr.write(f"\x1b]9999;speak:{b64}\x07") sys.stderr.flush()
[docs] async def speak_handler( text: str, backend: str | None = None, voice: str | None = None, rate: int = 150, speed: float = 1.5, play: bool = True, save: bool = False, output_path: str | None = None, fallback: bool = True, agent_id: str | None = None, wait: bool = True, signature: bool = False, num_threads: int | None = None, speak_fn=None, audio_dir=None, signature_fn=None, ) -> dict: """Convert text to speech with fallback. Args: save: If True and output_path is None, auto-generate a timestamped path. output_path: Explicit path to save the audio file (overrides save flag). signature: If True, prepend hostname/project/branch to text. num_threads: CPU thread count for LuxTTS backend (None=default). speak_fn: Injectable TTS function (testing). Defaults to ``scitex_audio.speak``. audio_dir: Injectable output directory (testing). Defaults to ``_get_audio_dir()``. signature_fn: Injectable signature builder (testing). Defaults to :func:`_get_signature`. """ import os try: # Prepend signature if requested final_text = text sig = None if signature: build_sig = signature_fn if signature_fn is not None else _get_signature sig = build_sig() final_text = sig + text # SciTeX Cloud container mode: relay speech to browser via OSC escape if os.environ.get("SCITEX_CLOUD") == "true": _emit_browser_speech(final_text) result = { "success": True, "text": text, "backend": "browser_relay", "played": True, "play_requested": play, "mode": "cloud_relay", "timestamp": datetime.now().isoformat(), } if signature: result["signature"] = sig result["full_text"] = final_text return result # Local mode: use scitex.audio directly if speak_fn is not None: tts_speak = speak_fn else: from .. import speak as tts_speak loop = asyncio.get_event_loop() if output_path is None and save: base_dir = audio_dir if audio_dir is not None else _get_audio_dir() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = str(base_dir / f"tts_{timestamp}.mp3") def do_speak(): return tts_speak( text=final_text, backend=backend, voice=voice, rate=rate, speed=speed, play=play, output_path=output_path, fallback=fallback, num_threads=num_threads, ) speak_result = await loop.run_in_executor(None, do_speak) result = { "success": speak_result.get("success", True), "text": text, "backend": speak_result.get("backend", backend), "played": speak_result.get("played", False), "play_requested": play, "mode": speak_result.get("mode", "local"), "timestamp": datetime.now().isoformat(), } if signature: result["signature"] = sig result["full_text"] = final_text if speak_result.get("path"): result["path"] = str(speak_result["path"]) return result except Exception as e: return {"success": False, "error": str(e)}
[docs] async def speech_queue_status_handler() -> dict: """Get current speech queue status.""" try: from .._cross_process_lock import AudioPlaybackLock lock = AudioPlaybackLock() try: acquired = lock.acquire(timeout=0.1) if acquired: lock.release() status = {"locked": False, "message": "Audio playback queue is idle"} else: status = {"locked": True, "message": "Audio playback in progress"} except Exception: status = {"locked": False, "message": "Could not check lock state"} status["success"] = True return status except Exception as e: return {"success": False, "error": str(e)}
[docs] async def announce_context_handler( include_full_path: bool = False, branch_resolver=None, speak_fn=None, ) -> dict: """Announce current working directory and git branch. Args: branch_resolver: Injectable callable returning the git branch name (or None) for ``cwd`` (testing). Defaults to a real ``git rev-parse`` subprocess. speak_fn: Injectable speak handler (testing). Defaults to :func:`speak_handler`. """ try: import os import subprocess cwd = os.getcwd() dir_name = cwd if include_full_path else os.path.basename(cwd) if branch_resolver is not None: branch = branch_resolver(cwd) else: branch = None try: result = subprocess.run( ["git", "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True, cwd=cwd, ) if result.returncode == 0: branch = result.stdout.strip() except Exception: pass if branch: text = f"Working in {dir_name}, on branch {branch}" else: text = f"Working in {dir_name}" speak = speak_fn if speak_fn is not None else speak_handler speak_result = await speak(text=text, speed=1.5) return { "success": True, "directory": dir_name, "branch": branch, "announced": text, "speak_result": speak_result, } except Exception as e: return {"success": False, "error": str(e)}
# EOF