""" SQLite database setup and CRUD operations for leads and message templates. """ import sqlite3 import csv import io import os from datetime import datetime from typing import Optional DB_PATH = os.path.join(os.path.dirname(__file__), "leads.db") def get_connection(): """Get a database connection with row factory.""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): """Initialize the database tables and seed default template.""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS leads ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, phone TEXT, address TEXT, website TEXT, email TEXT, instagram TEXT, linkedin TEXT, status TEXT DEFAULT 'new', notes TEXT DEFAULT '', follow_up_date TEXT, created_at TEXT DEFAULT (datetime('now')) ) """) # Gracefully add new columns to an existing database try: cursor.execute("ALTER TABLE leads ADD COLUMN email TEXT") except sqlite3.OperationalError: pass try: cursor.execute("ALTER TABLE leads ADD COLUMN instagram TEXT") except sqlite3.OperationalError: pass try: cursor.execute("ALTER TABLE leads ADD COLUMN linkedin TEXT") except sqlite3.OperationalError: pass cursor.execute(""" CREATE TABLE IF NOT EXISTS templates ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, content TEXT NOT NULL, created_at TEXT DEFAULT (datetime('now')) ) """) # Seed a default template if none exist cursor.execute("SELECT COUNT(*) FROM templates") if cursor.fetchone()[0] == 0: cursor.execute( "INSERT INTO templates (name, content) VALUES (?, ?)", ( "Default Outreach", "Hi, I saw your business online and noticed you don't have a proper website. " "I help businesses get more customers by building simple, fast websites. " "Would you like to see a demo?", ), ) conn.commit() conn.close() # ──────────────────────────── Lead CRUD ──────────────────────────── def insert_lead(name: str, phone: str = None, address: str = None, website: str = None, email: str = None, instagram: str = None, linkedin: str = None): """Insert a new lead into the database.""" conn = get_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO leads (name, phone, address, website, email, instagram, linkedin) VALUES (?, ?, ?, ?, ?, ?, ?)", (name, phone, address, website, email, instagram, linkedin), ) conn.commit() lead_id = cursor.lastrowid conn.close() return lead_id def get_all_leads( search: Optional[str] = None, status: Optional[str] = None, sort_by: str = "created_at", sort_order: str = "desc", ): """Get all leads with optional search and filter.""" conn = get_connection() cursor = conn.cursor() query = "SELECT * FROM leads WHERE 1=1" params = [] if search: query += " AND (name LIKE ? OR phone LIKE ? OR address LIKE ?)" like = f"%{search}%" params.extend([like, like, like]) if status: query += " AND status = ?" params.append(status) # Whitelist sort columns to prevent SQL injection allowed_sort = {"created_at", "name", "status", "follow_up_date"} if sort_by not in allowed_sort: sort_by = "created_at" order = "DESC" if sort_order.lower() == "desc" else "ASC" query += f" ORDER BY {sort_by} {order}" cursor.execute(query, params) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_lead_by_id(lead_id: int): """Get a single lead by ID.""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM leads WHERE id = ?", (lead_id,)) row = cursor.fetchone() conn.close() return dict(row) if row else None def update_lead(lead_id: int, **kwargs): """Update a lead's fields.""" conn = get_connection() cursor = conn.cursor() # Filter out None values updates = {k: v for k, v in kwargs.items() if v is not None} if not updates: conn.close() return False set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [lead_id] cursor.execute(f"UPDATE leads SET {set_clause} WHERE id = ?", values) conn.commit() affected = cursor.rowcount conn.close() return affected > 0 def delete_lead(lead_id: int): """Delete a lead by ID.""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM leads WHERE id = ?", (lead_id,)) conn.commit() affected = cursor.rowcount conn.close() return affected > 0 # ──────────────────────────── Stats ──────────────────────────── def get_dashboard_stats(): """Get aggregated stats for the dashboard.""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" SELECT COUNT(*) as total, COALESCE(SUM(CASE WHEN status = 'new' THEN 1 ELSE 0 END), 0) as new, COALESCE(SUM(CASE WHEN status = 'contacted' THEN 1 ELSE 0 END), 0) as contacted, COALESCE(SUM(CASE WHEN status = 'replied' THEN 1 ELSE 0 END), 0) as replied, COALESCE(SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END), 0) as closed FROM leads """) row = cursor.fetchone() conn.close() # Ensure all values are integers (not None) result = dict(row) return {k: (v or 0) for k, v in result.items()} # ──────────────────────────── CSV Export ──────────────────────────── def export_leads_csv(status: Optional[str] = None): """Export leads to CSV string.""" leads = get_all_leads(status=status) output = io.StringIO() writer = csv.DictWriter( output, fieldnames=["id", "name", "phone", "address", "website", "email", "instagram", "linkedin", "status", "notes", "follow_up_date", "created_at"], ) writer.writeheader() writer.writerows(leads) return output.getvalue() # ──────────────────────────── Template CRUD ──────────────────────────── def get_all_templates(): """Get all message templates.""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM templates ORDER BY created_at DESC") rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def create_template(name: str, content: str): """Create a new message template.""" conn = get_connection() cursor = conn.cursor() cursor.execute( "INSERT INTO templates (name, content) VALUES (?, ?)", (name, content), ) conn.commit() tid = cursor.lastrowid conn.close() return tid def update_template(template_id: int, **kwargs): """Update a template.""" conn = get_connection() cursor = conn.cursor() updates = {k: v for k, v in kwargs.items() if v is not None} if not updates: conn.close() return False set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [template_id] cursor.execute(f"UPDATE templates SET {set_clause} WHERE id = ?", values) conn.commit() affected = cursor.rowcount conn.close() return affected > 0 def delete_template(template_id: int): """Delete a template.""" conn = get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM templates WHERE id = ?", (template_id,)) conn.commit() affected = cursor.rowcount conn.close() return affected > 0