import asyncio import json import os import shutil import sqlite3 import sys import traceback import uuid from pathlib import Path import gradio_client.utils as _gc_utils _orig_get_type = _gc_utils.get_type _orig_jstpt = _gc_utils._json_schema_to_python_type def _safe_get_type(schema): if not isinstance(schema, dict): return "Any" return _orig_get_type(schema) def _safe_jstpt(schema, defs=None): if not isinstance(schema, dict): return "Any" return _orig_jstpt(schema, defs) _gc_utils.get_type = _safe_get_type _gc_utils._json_schema_to_python_type = _safe_jstpt import gradio as gr INPUT_DIR = Path(".data/input") DB_PATH = Path(".db/Callytics.sqlite") def _fetch_last_record(): if not DB_PATH.exists(): return None con = sqlite3.connect(DB_PATH) con.row_factory = sqlite3.Row cur = con.cursor() cur.execute( """ SELECT f.*, t.Name AS Topic FROM File f LEFT JOIN Topic t ON t.ID = f.TopicID ORDER BY f.ID DESC LIMIT 1 """ ) file_row = cur.fetchone() if file_row is None: con.close() return None cur.execute( """ SELECT Speaker, Sequence, StartTime, EndTime, Content, Sentiment, Profane FROM Utterance WHERE FileID = ? ORDER BY Sequence """, (file_row["ID"],), ) utterances = [dict(r) for r in cur.fetchall()] con.close() return dict(file_row), utterances def _format_transcript(utterances): if not utterances: return "_no utterances_" lines = [] for u in utterances: flag = " [PROFANE]" if u.get("Profane") else "" sent = f" ({u['Sentiment']})" if u.get("Sentiment") else "" lines.append( f"**[{u['StartTime']:.2f}s] {u['Speaker']}**{sent}{flag}: {u['Content']}" ) return "\n\n".join(lines) def _summary_block(file_row): if not file_row: return "_no data in DB yet_" return ( f"- **Topic**: {file_row.get('Topic') or 'Unknown'}\n" f"- **Conflict**: {'yes' if file_row.get('Conflict') else 'no'}\n" f"- **Silence (s)**: {file_row.get('Silence'):.3f}\n" f"- **Duration (s)**: {file_row.get('Duration'):.2f}\n" f"- **Sample rate**: {file_row.get('Rate')}\n" f"- **Channels**: {file_row.get('Channels')}\n\n" f"**Summary**\n\n{file_row.get('Summary') or '_empty_'}" ) async def _run_pipeline(audio_path: str): from main import main as pipeline_main await pipeline_main(audio_path) def process_audio(audio_file): if audio_file is None: return "Upload a file first.", "", {} INPUT_DIR.mkdir(parents=True, exist_ok=True) src = Path(audio_file) dst = INPUT_DIR / f"{uuid.uuid4().hex}{src.suffix.lower() or '.mp3'}" shutil.copy(src, dst) try: asyncio.run(_run_pipeline(str(dst))) except Exception: tb = traceback.format_exc() return f"### Pipeline error\n```\n{tb}\n```", "", {} finally: if dst.exists(): dst.unlink(missing_ok=True) result = _fetch_last_record() if result is None: return "_no record in DB — pipeline may have skipped (no dialogue?)_", "", {} file_row, utterances = result return ( _summary_block(file_row), _format_transcript(utterances), {"file": file_row, "utterances": utterances}, ) with gr.Blocks(title="Callytics") as demo: gr.Markdown("# Callytics — speech analytics\nUpload a call recording (mp3/wav/flac). The full pipeline runs on CPU; first run downloads models and may take several minutes.") with gr.Row(): audio_in = gr.Audio(label="Audio file", type="filepath", sources=["upload"]) run_btn = gr.Button("Run pipeline", variant="primary") summary_md = gr.Markdown(label="Summary") transcript_md = gr.Markdown(label="Transcript") raw_json = gr.JSON(label="Raw record") run_btn.click( process_audio, inputs=[audio_in], outputs=[summary_md, transcript_md, raw_json], ) if __name__ == "__main__": port = int(os.environ.get("PORT") or os.environ.get("GRADIO_SERVER_PORT") or "7860") demo.queue(default_concurrency_limit=1).launch( server_name="0.0.0.0", server_port=port, show_api=False, share=False, )