TeleAgent / db.py
prianshujha
added pipeline file
9a02b57
Raw
History Blame Contribute Delete
8.64 kB
"""
Database layer β€” SQLite via Python's built-in sqlite3.
Handles schema creation and all CRUD operations for the telecalling agent.
"""
import sqlite3
import json
import logging
from datetime import datetime
from contextlib import contextmanager
from typing import Optional
from config import DB_PATH
logger = logging.getLogger(__name__)
# ── Schema ────────────────────────────────────────────────────────────────────
SCHEMA = """
CREATE TABLE IF NOT EXISTS calls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL, -- ISO-8601 UTC
session_id TEXT NOT NULL, -- UUID per Gradio session
raw_transcript TEXT NOT NULL DEFAULT '',
intent TEXT, -- book_meeting | reschedule | cancel | ...
caller_name TEXT,
preferred_date TEXT, -- YYYY-MM-DD
preferred_time TEXT, -- HH:MM (24h)
duration_mins INTEGER,
participants TEXT DEFAULT '[]', -- JSON array of strings
meeting_type TEXT, -- phone | video | in_person
notes TEXT,
confidence REAL, -- 0.0–1.0 from Qwen
decision TEXT, -- schedule | ask_followup | reject
reasoning TEXT, -- MiniCPM's explanation
status TEXT NOT NULL DEFAULT 'open' -- open | confirmed | cancelled
);
CREATE TABLE IF NOT EXISTS bookings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
call_id INTEGER NOT NULL REFERENCES calls(id),
booked_date TEXT NOT NULL, -- YYYY-MM-DD
booked_time TEXT NOT NULL, -- HH:MM
duration_mins INTEGER NOT NULL,
caller_name TEXT NOT NULL,
meeting_type TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_calls_session ON calls(session_id);
CREATE INDEX IF NOT EXISTS idx_bookings_date ON bookings(booked_date);
"""
# ── Connection helper ─────────────────────────────────────────────────────────
@contextmanager
def get_conn():
"""Context manager β€” always commits or rolls back cleanly."""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") # safe for concurrent Gradio threads
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db():
"""Create tables if they don't exist. Safe to call on every startup."""
with get_conn() as conn:
conn.executescript(SCHEMA)
logger.info(f"Database ready at {DB_PATH}")
# ── Call record helpers ───────────────────────────────────────────────────────
def create_call(session_id: str) -> int:
"""
Insert a bare call record at the start of a session.
Returns the new call id so downstream steps can update it.
"""
with get_conn() as conn:
cur = conn.execute(
"INSERT INTO calls (timestamp, session_id, raw_transcript) VALUES (?, ?, '')",
(datetime.utcnow().isoformat(), session_id)
)
return cur.lastrowid
def append_transcript(call_id: int, new_text: str):
"""Append a transcribed utterance to the running transcript."""
with get_conn() as conn:
conn.execute(
"""UPDATE calls
SET raw_transcript = raw_transcript || '\n' || ?
WHERE id = ?""",
(new_text.strip(), call_id)
)
def update_call_intent(call_id: int, parsed: dict):
"""
Write Qwen's structured JSON output into the call record.
`parsed` is expected to match the scheduling JSON schema from config.
"""
with get_conn() as conn:
conn.execute(
"""UPDATE calls SET
intent = :intent,
caller_name = :caller_name,
preferred_date = :preferred_date,
preferred_time = :preferred_time,
duration_mins = :duration_minutes,
participants = :participants,
meeting_type = :meeting_type,
notes = :notes,
confidence = :confidence
WHERE id = :id""",
{
"intent": parsed.get("intent"),
"caller_name": parsed.get("caller_name"),
"preferred_date": parsed.get("preferred_date"),
"preferred_time": parsed.get("preferred_time"),
"duration_minutes": parsed.get("duration_minutes"),
"participants": json.dumps(parsed.get("participants", [])),
"meeting_type": parsed.get("meeting_type"),
"notes": parsed.get("notes"),
"confidence": parsed.get("confidence"),
"id": call_id,
}
)
def update_call_decision(call_id: int, decision: str, reasoning: str):
"""Write MiniCPM's evaluation result back to the call record."""
with get_conn() as conn:
conn.execute(
"UPDATE calls SET decision = ?, reasoning = ? WHERE id = ?",
(decision, reasoning, call_id)
)
def confirm_booking(call_id: int, parsed: dict) -> int:
"""
Insert a confirmed booking row and mark the call as confirmed.
Returns the booking id.
"""
with get_conn() as conn:
cur = conn.execute(
"""INSERT INTO bookings
(call_id, booked_date, booked_time, duration_mins,
caller_name, meeting_type, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
call_id,
parsed["preferred_date"],
parsed["preferred_time"],
parsed.get("duration_minutes", 30),
parsed.get("caller_name", "Unknown"),
parsed.get("meeting_type", "phone"),
datetime.utcnow().isoformat(),
)
)
conn.execute(
"UPDATE calls SET status = 'confirmed' WHERE id = ?", (call_id,)
)
return cur.lastrowid
# ── Availability check ────────────────────────────────────────────────────────
def is_slot_available(date: str, time: str, duration_mins: int = 30) -> bool:
"""
Returns True if no existing booking overlaps with the requested slot
(including the mandatory 15-minute buffer).
date: YYYY-MM-DD, time: HH:MM
"""
from datetime import datetime, timedelta
try:
start = datetime.strptime(f"{date} {time}", "%Y-%m-%d %H:%M")
end = start + timedelta(minutes=duration_mins + 15) # +15 min buffer
except ValueError:
return False
with get_conn() as conn:
rows = conn.execute(
"SELECT booked_time, duration_mins FROM bookings WHERE booked_date = ?",
(date,)
).fetchall()
for row in rows:
existing_start = datetime.strptime(f"{date} {row['booked_time']}", "%Y-%m-%d %H:%M")
existing_end = existing_start + timedelta(minutes=row["duration_mins"] + 15)
# overlap check
if start < existing_end and end > existing_start:
return False
return True
def get_booked_slots(date: str) -> list[dict]:
"""Return all bookings for a given date for display in the UI."""
with get_conn() as conn:
rows = conn.execute(
"""SELECT booked_time, duration_mins, caller_name, meeting_type
FROM bookings WHERE booked_date = ? ORDER BY booked_time""",
(date,)
).fetchall()
return [dict(r) for r in rows]
def get_recent_calls(limit: int = 20) -> list[dict]:
"""Fetch the most recent call records for the call log panel."""
with get_conn() as conn:
rows = conn.execute(
"""SELECT id, timestamp, caller_name, intent, decision, status
FROM calls ORDER BY id DESC LIMIT ?""",
(limit,)
).fetchall()
return [dict(r) for r in rows]