File size: 8,638 Bytes
9a02b57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
Database layer β€” SQLite via Python's built-in sqlite3.
Handles schema creation and all CRUD operations for the telecalling agent.
"""

import sqlite3
import json
import logging
from datetime import datetime
from contextlib import contextmanager
from typing import Optional

from config import DB_PATH

logger = logging.getLogger(__name__)


# ── Schema ────────────────────────────────────────────────────────────────────

SCHEMA = """
CREATE TABLE IF NOT EXISTS calls (
    id              INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp       TEXT    NOT NULL,           -- ISO-8601 UTC
    session_id      TEXT    NOT NULL,           -- UUID per Gradio session
    raw_transcript  TEXT    NOT NULL DEFAULT '',
    intent          TEXT,                       -- book_meeting | reschedule | cancel | ...
    caller_name     TEXT,
    preferred_date  TEXT,                       -- YYYY-MM-DD
    preferred_time  TEXT,                       -- HH:MM (24h)
    duration_mins   INTEGER,
    participants    TEXT    DEFAULT '[]',        -- JSON array of strings
    meeting_type    TEXT,                       -- phone | video | in_person
    notes           TEXT,
    confidence      REAL,                       -- 0.0–1.0 from Qwen
    decision        TEXT,                       -- schedule | ask_followup | reject
    reasoning       TEXT,                       -- MiniCPM's explanation
    status          TEXT    NOT NULL DEFAULT 'open'  -- open | confirmed | cancelled
);

CREATE TABLE IF NOT EXISTS bookings (
    id              INTEGER PRIMARY KEY AUTOINCREMENT,
    call_id         INTEGER NOT NULL REFERENCES calls(id),
    booked_date     TEXT    NOT NULL,           -- YYYY-MM-DD
    booked_time     TEXT    NOT NULL,           -- HH:MM
    duration_mins   INTEGER NOT NULL,
    caller_name     TEXT    NOT NULL,
    meeting_type    TEXT,
    created_at      TEXT    NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_calls_session   ON calls(session_id);
CREATE INDEX IF NOT EXISTS idx_bookings_date   ON bookings(booked_date);
"""


# ── Connection helper ─────────────────────────────────────────────────────────

@contextmanager
def get_conn():
    """Context manager β€” always commits or rolls back cleanly."""
    conn = sqlite3.connect(DB_PATH, check_same_thread=False)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA journal_mode=WAL")   # safe for concurrent Gradio threads
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


def init_db():
    """Create tables if they don't exist. Safe to call on every startup."""
    with get_conn() as conn:
        conn.executescript(SCHEMA)
    logger.info(f"Database ready at {DB_PATH}")


# ── Call record helpers ───────────────────────────────────────────────────────

def create_call(session_id: str) -> int:
    """
    Insert a bare call record at the start of a session.
    Returns the new call id so downstream steps can update it.
    """
    with get_conn() as conn:
        cur = conn.execute(
            "INSERT INTO calls (timestamp, session_id, raw_transcript) VALUES (?, ?, '')",
            (datetime.utcnow().isoformat(), session_id)
        )
        return cur.lastrowid


def append_transcript(call_id: int, new_text: str):
    """Append a transcribed utterance to the running transcript."""
    with get_conn() as conn:
        conn.execute(
            """UPDATE calls
               SET raw_transcript = raw_transcript || '\n' || ?
               WHERE id = ?""",
            (new_text.strip(), call_id)
        )


def update_call_intent(call_id: int, parsed: dict):
    """
    Write Qwen's structured JSON output into the call record.
    `parsed` is expected to match the scheduling JSON schema from config.
    """
    with get_conn() as conn:
        conn.execute(
            """UPDATE calls SET
                intent         = :intent,
                caller_name    = :caller_name,
                preferred_date = :preferred_date,
                preferred_time = :preferred_time,
                duration_mins  = :duration_minutes,
                participants   = :participants,
                meeting_type   = :meeting_type,
                notes          = :notes,
                confidence     = :confidence
               WHERE id = :id""",
            {
                "intent":           parsed.get("intent"),
                "caller_name":      parsed.get("caller_name"),
                "preferred_date":   parsed.get("preferred_date"),
                "preferred_time":   parsed.get("preferred_time"),
                "duration_minutes": parsed.get("duration_minutes"),
                "participants":     json.dumps(parsed.get("participants", [])),
                "meeting_type":     parsed.get("meeting_type"),
                "notes":            parsed.get("notes"),
                "confidence":       parsed.get("confidence"),
                "id":               call_id,
            }
        )


def update_call_decision(call_id: int, decision: str, reasoning: str):
    """Write MiniCPM's evaluation result back to the call record."""
    with get_conn() as conn:
        conn.execute(
            "UPDATE calls SET decision = ?, reasoning = ? WHERE id = ?",
            (decision, reasoning, call_id)
        )


def confirm_booking(call_id: int, parsed: dict) -> int:
    """
    Insert a confirmed booking row and mark the call as confirmed.
    Returns the booking id.
    """
    with get_conn() as conn:
        cur = conn.execute(
            """INSERT INTO bookings
               (call_id, booked_date, booked_time, duration_mins,
                caller_name, meeting_type, created_at)
               VALUES (?, ?, ?, ?, ?, ?, ?)""",
            (
                call_id,
                parsed["preferred_date"],
                parsed["preferred_time"],
                parsed.get("duration_minutes", 30),
                parsed.get("caller_name", "Unknown"),
                parsed.get("meeting_type", "phone"),
                datetime.utcnow().isoformat(),
            )
        )
        conn.execute(
            "UPDATE calls SET status = 'confirmed' WHERE id = ?", (call_id,)
        )
        return cur.lastrowid


# ── Availability check ────────────────────────────────────────────────────────

def is_slot_available(date: str, time: str, duration_mins: int = 30) -> bool:
    """
    Returns True if no existing booking overlaps with the requested slot
    (including the mandatory 15-minute buffer).
    date: YYYY-MM-DD, time: HH:MM
    """
    from datetime import datetime, timedelta

    try:
        start = datetime.strptime(f"{date} {time}", "%Y-%m-%d %H:%M")
        end   = start + timedelta(minutes=duration_mins + 15)   # +15 min buffer
    except ValueError:
        return False

    with get_conn() as conn:
        rows = conn.execute(
            "SELECT booked_time, duration_mins FROM bookings WHERE booked_date = ?",
            (date,)
        ).fetchall()

    for row in rows:
        existing_start = datetime.strptime(f"{date} {row['booked_time']}", "%Y-%m-%d %H:%M")
        existing_end   = existing_start + timedelta(minutes=row["duration_mins"] + 15)
        # overlap check
        if start < existing_end and end > existing_start:
            return False

    return True


def get_booked_slots(date: str) -> list[dict]:
    """Return all bookings for a given date for display in the UI."""
    with get_conn() as conn:
        rows = conn.execute(
            """SELECT booked_time, duration_mins, caller_name, meeting_type
               FROM bookings WHERE booked_date = ? ORDER BY booked_time""",
            (date,)
        ).fetchall()
    return [dict(r) for r in rows]


def get_recent_calls(limit: int = 20) -> list[dict]:
    """Fetch the most recent call records for the call log panel."""
    with get_conn() as conn:
        rows = conn.execute(
            """SELECT id, timestamp, caller_name, intent, decision, status
               FROM calls ORDER BY id DESC LIMIT ?""",
            (limit,)
        ).fetchall()
    return [dict(r) for r in rows]