Spaces:
Running
Running
| """ | |
| Database layer β SQLite via Python's built-in sqlite3. | |
| Handles schema creation and all CRUD operations for the telecalling agent. | |
| """ | |
| import sqlite3 | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from contextlib import contextmanager | |
| from typing import Optional | |
| from config import DB_PATH | |
| logger = logging.getLogger(__name__) | |
| # ββ Schema ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SCHEMA = """ | |
| CREATE TABLE IF NOT EXISTS calls ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, -- ISO-8601 UTC | |
| session_id TEXT NOT NULL, -- UUID per Gradio session | |
| raw_transcript TEXT NOT NULL DEFAULT '', | |
| intent TEXT, -- book_meeting | reschedule | cancel | ... | |
| caller_name TEXT, | |
| preferred_date TEXT, -- YYYY-MM-DD | |
| preferred_time TEXT, -- HH:MM (24h) | |
| duration_mins INTEGER, | |
| participants TEXT DEFAULT '[]', -- JSON array of strings | |
| meeting_type TEXT, -- phone | video | in_person | |
| notes TEXT, | |
| confidence REAL, -- 0.0β1.0 from Qwen | |
| decision TEXT, -- schedule | ask_followup | reject | |
| reasoning TEXT, -- MiniCPM's explanation | |
| status TEXT NOT NULL DEFAULT 'open' -- open | confirmed | cancelled | |
| ); | |
| CREATE TABLE IF NOT EXISTS bookings ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| call_id INTEGER NOT NULL REFERENCES calls(id), | |
| booked_date TEXT NOT NULL, -- YYYY-MM-DD | |
| booked_time TEXT NOT NULL, -- HH:MM | |
| duration_mins INTEGER NOT NULL, | |
| caller_name TEXT NOT NULL, | |
| meeting_type TEXT, | |
| created_at TEXT NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_calls_session ON calls(session_id); | |
| CREATE INDEX IF NOT EXISTS idx_bookings_date ON bookings(booked_date); | |
| """ | |
| # ββ Connection helper βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_conn(): | |
| """Context manager β always commits or rolls back cleanly.""" | |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") # safe for concurrent Gradio threads | |
| try: | |
| yield conn | |
| conn.commit() | |
| except Exception: | |
| conn.rollback() | |
| raise | |
| finally: | |
| conn.close() | |
| def init_db(): | |
| """Create tables if they don't exist. Safe to call on every startup.""" | |
| with get_conn() as conn: | |
| conn.executescript(SCHEMA) | |
| logger.info(f"Database ready at {DB_PATH}") | |
| # ββ Call record helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_call(session_id: str) -> int: | |
| """ | |
| Insert a bare call record at the start of a session. | |
| Returns the new call id so downstream steps can update it. | |
| """ | |
| with get_conn() as conn: | |
| cur = conn.execute( | |
| "INSERT INTO calls (timestamp, session_id, raw_transcript) VALUES (?, ?, '')", | |
| (datetime.utcnow().isoformat(), session_id) | |
| ) | |
| return cur.lastrowid | |
| def append_transcript(call_id: int, new_text: str): | |
| """Append a transcribed utterance to the running transcript.""" | |
| with get_conn() as conn: | |
| conn.execute( | |
| """UPDATE calls | |
| SET raw_transcript = raw_transcript || '\n' || ? | |
| WHERE id = ?""", | |
| (new_text.strip(), call_id) | |
| ) | |
| def update_call_intent(call_id: int, parsed: dict): | |
| """ | |
| Write Qwen's structured JSON output into the call record. | |
| `parsed` is expected to match the scheduling JSON schema from config. | |
| """ | |
| with get_conn() as conn: | |
| conn.execute( | |
| """UPDATE calls SET | |
| intent = :intent, | |
| caller_name = :caller_name, | |
| preferred_date = :preferred_date, | |
| preferred_time = :preferred_time, | |
| duration_mins = :duration_minutes, | |
| participants = :participants, | |
| meeting_type = :meeting_type, | |
| notes = :notes, | |
| confidence = :confidence | |
| WHERE id = :id""", | |
| { | |
| "intent": parsed.get("intent"), | |
| "caller_name": parsed.get("caller_name"), | |
| "preferred_date": parsed.get("preferred_date"), | |
| "preferred_time": parsed.get("preferred_time"), | |
| "duration_minutes": parsed.get("duration_minutes"), | |
| "participants": json.dumps(parsed.get("participants", [])), | |
| "meeting_type": parsed.get("meeting_type"), | |
| "notes": parsed.get("notes"), | |
| "confidence": parsed.get("confidence"), | |
| "id": call_id, | |
| } | |
| ) | |
| def update_call_decision(call_id: int, decision: str, reasoning: str): | |
| """Write MiniCPM's evaluation result back to the call record.""" | |
| with get_conn() as conn: | |
| conn.execute( | |
| "UPDATE calls SET decision = ?, reasoning = ? WHERE id = ?", | |
| (decision, reasoning, call_id) | |
| ) | |
| def confirm_booking(call_id: int, parsed: dict) -> int: | |
| """ | |
| Insert a confirmed booking row and mark the call as confirmed. | |
| Returns the booking id. | |
| """ | |
| with get_conn() as conn: | |
| cur = conn.execute( | |
| """INSERT INTO bookings | |
| (call_id, booked_date, booked_time, duration_mins, | |
| caller_name, meeting_type, created_at) | |
| VALUES (?, ?, ?, ?, ?, ?, ?)""", | |
| ( | |
| call_id, | |
| parsed["preferred_date"], | |
| parsed["preferred_time"], | |
| parsed.get("duration_minutes", 30), | |
| parsed.get("caller_name", "Unknown"), | |
| parsed.get("meeting_type", "phone"), | |
| datetime.utcnow().isoformat(), | |
| ) | |
| ) | |
| conn.execute( | |
| "UPDATE calls SET status = 'confirmed' WHERE id = ?", (call_id,) | |
| ) | |
| return cur.lastrowid | |
| # ββ Availability check ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def is_slot_available(date: str, time: str, duration_mins: int = 30) -> bool: | |
| """ | |
| Returns True if no existing booking overlaps with the requested slot | |
| (including the mandatory 15-minute buffer). | |
| date: YYYY-MM-DD, time: HH:MM | |
| """ | |
| from datetime import datetime, timedelta | |
| try: | |
| start = datetime.strptime(f"{date} {time}", "%Y-%m-%d %H:%M") | |
| end = start + timedelta(minutes=duration_mins + 15) # +15 min buffer | |
| except ValueError: | |
| return False | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| "SELECT booked_time, duration_mins FROM bookings WHERE booked_date = ?", | |
| (date,) | |
| ).fetchall() | |
| for row in rows: | |
| existing_start = datetime.strptime(f"{date} {row['booked_time']}", "%Y-%m-%d %H:%M") | |
| existing_end = existing_start + timedelta(minutes=row["duration_mins"] + 15) | |
| # overlap check | |
| if start < existing_end and end > existing_start: | |
| return False | |
| return True | |
| def get_booked_slots(date: str) -> list[dict]: | |
| """Return all bookings for a given date for display in the UI.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| """SELECT booked_time, duration_mins, caller_name, meeting_type | |
| FROM bookings WHERE booked_date = ? ORDER BY booked_time""", | |
| (date,) | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| def get_recent_calls(limit: int = 20) -> list[dict]: | |
| """Fetch the most recent call records for the call log panel.""" | |
| with get_conn() as conn: | |
| rows = conn.execute( | |
| """SELECT id, timestamp, caller_name, intent, decision, status | |
| FROM calls ORDER BY id DESC LIMIT ?""", | |
| (limit,) | |
| ).fetchall() | |
| return [dict(r) for r in rows] |