| import asyncio |
| import json |
| import os |
| import shutil |
| import sqlite3 |
| import sys |
| import traceback |
| import uuid |
| from pathlib import Path |
|
|
| import gradio_client.utils as _gc_utils |
| _orig_get_type = _gc_utils.get_type |
| _orig_jstpt = _gc_utils._json_schema_to_python_type |
| def _safe_get_type(schema): |
| if not isinstance(schema, dict): |
| return "Any" |
| return _orig_get_type(schema) |
| def _safe_jstpt(schema, defs=None): |
| if not isinstance(schema, dict): |
| return "Any" |
| return _orig_jstpt(schema, defs) |
| _gc_utils.get_type = _safe_get_type |
| _gc_utils._json_schema_to_python_type = _safe_jstpt |
|
|
| import gradio as gr |
|
|
| INPUT_DIR = Path(".data/input") |
| DB_PATH = Path(".db/Callytics.sqlite") |
|
|
|
|
| def _fetch_last_record(): |
| if not DB_PATH.exists(): |
| return None |
| con = sqlite3.connect(DB_PATH) |
| con.row_factory = sqlite3.Row |
| cur = con.cursor() |
| cur.execute( |
| """ |
| SELECT f.*, t.Name AS Topic |
| FROM File f LEFT JOIN Topic t ON t.ID = f.TopicID |
| ORDER BY f.ID DESC LIMIT 1 |
| """ |
| ) |
| file_row = cur.fetchone() |
| if file_row is None: |
| con.close() |
| return None |
| cur.execute( |
| """ |
| SELECT Speaker, Sequence, StartTime, EndTime, Content, Sentiment, Profane |
| FROM Utterance WHERE FileID = ? ORDER BY Sequence |
| """, |
| (file_row["ID"],), |
| ) |
| utterances = [dict(r) for r in cur.fetchall()] |
| con.close() |
| return dict(file_row), utterances |
|
|
|
|
| def _format_transcript(utterances): |
| if not utterances: |
| return "_no utterances_" |
| lines = [] |
| for u in utterances: |
| flag = " [PROFANE]" if u.get("Profane") else "" |
| sent = f" ({u['Sentiment']})" if u.get("Sentiment") else "" |
| lines.append( |
| f"**[{u['StartTime']:.2f}s] {u['Speaker']}**{sent}{flag}: {u['Content']}" |
| ) |
| return "\n\n".join(lines) |
|
|
|
|
| def _summary_block(file_row): |
| if not file_row: |
| return "_no data in DB yet_" |
| return ( |
| f"- **Topic**: {file_row.get('Topic') or 'Unknown'}\n" |
| f"- **Conflict**: {'yes' if file_row.get('Conflict') else 'no'}\n" |
| f"- **Silence (s)**: {file_row.get('Silence'):.3f}\n" |
| f"- **Duration (s)**: {file_row.get('Duration'):.2f}\n" |
| f"- **Sample rate**: {file_row.get('Rate')}\n" |
| f"- **Channels**: {file_row.get('Channels')}\n\n" |
| f"**Summary**\n\n{file_row.get('Summary') or '_empty_'}" |
| ) |
|
|
|
|
| async def _run_pipeline(audio_path: str): |
| from main import main as pipeline_main |
| await pipeline_main(audio_path) |
|
|
|
|
| def process_audio(audio_file): |
| if audio_file is None: |
| return "Upload a file first.", "", {} |
| INPUT_DIR.mkdir(parents=True, exist_ok=True) |
| src = Path(audio_file) |
| dst = INPUT_DIR / f"{uuid.uuid4().hex}{src.suffix.lower() or '.mp3'}" |
| shutil.copy(src, dst) |
| try: |
| asyncio.run(_run_pipeline(str(dst))) |
| except Exception: |
| tb = traceback.format_exc() |
| return f"### Pipeline error\n```\n{tb}\n```", "", {} |
| finally: |
| if dst.exists(): |
| dst.unlink(missing_ok=True) |
|
|
| result = _fetch_last_record() |
| if result is None: |
| return "_no record in DB — pipeline may have skipped (no dialogue?)_", "", {} |
| file_row, utterances = result |
| return ( |
| _summary_block(file_row), |
| _format_transcript(utterances), |
| {"file": file_row, "utterances": utterances}, |
| ) |
|
|
|
|
| with gr.Blocks(title="Callytics") as demo: |
| gr.Markdown("# Callytics — speech analytics\nUpload a call recording (mp3/wav/flac). The full pipeline runs on CPU; first run downloads models and may take several minutes.") |
| with gr.Row(): |
| audio_in = gr.Audio(label="Audio file", type="filepath", sources=["upload"]) |
| run_btn = gr.Button("Run pipeline", variant="primary") |
| summary_md = gr.Markdown(label="Summary") |
| transcript_md = gr.Markdown(label="Transcript") |
| raw_json = gr.JSON(label="Raw record") |
|
|
| run_btn.click( |
| process_audio, |
| inputs=[audio_in], |
| outputs=[summary_md, transcript_md, raw_json], |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT") or os.environ.get("GRADIO_SERVER_PORT") or "7860") |
| demo.queue(default_concurrency_limit=1).launch( |
| server_name="0.0.0.0", |
| server_port=port, |
| show_api=False, |
| share=False, |
| ) |
|
|