Spaces:
Sleeping
Sleeping
| """ | |
| SQLite database setup and CRUD operations for leads and message templates. | |
| """ | |
| import sqlite3 | |
| import csv | |
| import io | |
| import os | |
| from datetime import datetime | |
| from typing import Optional | |
| DB_PATH = os.path.join(os.path.dirname(__file__), "leads.db") | |
| def get_connection(): | |
| """Get a database connection with row factory.""" | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| """Initialize the database tables and seed default template.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS leads ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| phone TEXT, | |
| address TEXT, | |
| website TEXT, | |
| email TEXT, | |
| instagram TEXT, | |
| linkedin TEXT, | |
| status TEXT DEFAULT 'new', | |
| notes TEXT DEFAULT '', | |
| follow_up_date TEXT, | |
| created_at TEXT DEFAULT (datetime('now')) | |
| ) | |
| """) | |
| # Gracefully add new columns to an existing database | |
| try: cursor.execute("ALTER TABLE leads ADD COLUMN email TEXT") | |
| except sqlite3.OperationalError: pass | |
| try: cursor.execute("ALTER TABLE leads ADD COLUMN instagram TEXT") | |
| except sqlite3.OperationalError: pass | |
| try: cursor.execute("ALTER TABLE leads ADD COLUMN linkedin TEXT") | |
| except sqlite3.OperationalError: pass | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS templates ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| content TEXT NOT NULL, | |
| created_at TEXT DEFAULT (datetime('now')) | |
| ) | |
| """) | |
| # Seed a default template if none exist | |
| cursor.execute("SELECT COUNT(*) FROM templates") | |
| if cursor.fetchone()[0] == 0: | |
| cursor.execute( | |
| "INSERT INTO templates (name, content) VALUES (?, ?)", | |
| ( | |
| "Default Outreach", | |
| "Hi, I saw your business online and noticed you don't have a proper website. " | |
| "I help businesses get more customers by building simple, fast websites. " | |
| "Would you like to see a demo?", | |
| ), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| # ββββββββββββββββββββββββββββ Lead CRUD ββββββββββββββββββββββββββββ | |
| def insert_lead(name: str, phone: str = None, address: str = None, website: str = None, email: str = None, instagram: str = None, linkedin: str = None): | |
| """Insert a new lead into the database.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "INSERT INTO leads (name, phone, address, website, email, instagram, linkedin) VALUES (?, ?, ?, ?, ?, ?, ?)", | |
| (name, phone, address, website, email, instagram, linkedin), | |
| ) | |
| conn.commit() | |
| lead_id = cursor.lastrowid | |
| conn.close() | |
| return lead_id | |
| def get_all_leads( | |
| search: Optional[str] = None, | |
| status: Optional[str] = None, | |
| sort_by: str = "created_at", | |
| sort_order: str = "desc", | |
| ): | |
| """Get all leads with optional search and filter.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| query = "SELECT * FROM leads WHERE 1=1" | |
| params = [] | |
| if search: | |
| query += " AND (name LIKE ? OR phone LIKE ? OR address LIKE ?)" | |
| like = f"%{search}%" | |
| params.extend([like, like, like]) | |
| if status: | |
| query += " AND status = ?" | |
| params.append(status) | |
| # Whitelist sort columns to prevent SQL injection | |
| allowed_sort = {"created_at", "name", "status", "follow_up_date"} | |
| if sort_by not in allowed_sort: | |
| sort_by = "created_at" | |
| order = "DESC" if sort_order.lower() == "desc" else "ASC" | |
| query += f" ORDER BY {sort_by} {order}" | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def get_lead_by_id(lead_id: int): | |
| """Get a single lead by ID.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM leads WHERE id = ?", (lead_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| def update_lead(lead_id: int, **kwargs): | |
| """Update a lead's fields.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| # Filter out None values | |
| updates = {k: v for k, v in kwargs.items() if v is not None} | |
| if not updates: | |
| conn.close() | |
| return False | |
| set_clause = ", ".join(f"{k} = ?" for k in updates) | |
| values = list(updates.values()) + [lead_id] | |
| cursor.execute(f"UPDATE leads SET {set_clause} WHERE id = ?", values) | |
| conn.commit() | |
| affected = cursor.rowcount | |
| conn.close() | |
| return affected > 0 | |
| def delete_lead(lead_id: int): | |
| """Delete a lead by ID.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM leads WHERE id = ?", (lead_id,)) | |
| conn.commit() | |
| affected = cursor.rowcount | |
| conn.close() | |
| return affected > 0 | |
| # ββββββββββββββββββββββββββββ Stats ββββββββββββββββββββββββββββ | |
| def get_dashboard_stats(): | |
| """Get aggregated stats for the dashboard.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT | |
| COUNT(*) as total, | |
| COALESCE(SUM(CASE WHEN status = 'new' THEN 1 ELSE 0 END), 0) as new, | |
| COALESCE(SUM(CASE WHEN status = 'contacted' THEN 1 ELSE 0 END), 0) as contacted, | |
| COALESCE(SUM(CASE WHEN status = 'replied' THEN 1 ELSE 0 END), 0) as replied, | |
| COALESCE(SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END), 0) as closed | |
| FROM leads | |
| """) | |
| row = cursor.fetchone() | |
| conn.close() | |
| # Ensure all values are integers (not None) | |
| result = dict(row) | |
| return {k: (v or 0) for k, v in result.items()} | |
| # ββββββββββββββββββββββββββββ CSV Export ββββββββββββββββββββββββββββ | |
| def export_leads_csv(status: Optional[str] = None): | |
| """Export leads to CSV string.""" | |
| leads = get_all_leads(status=status) | |
| output = io.StringIO() | |
| writer = csv.DictWriter( | |
| output, | |
| fieldnames=["id", "name", "phone", "address", "website", "email", "instagram", "linkedin", "status", "notes", "follow_up_date", "created_at"], | |
| ) | |
| writer.writeheader() | |
| writer.writerows(leads) | |
| return output.getvalue() | |
| # ββββββββββββββββββββββββββββ Template CRUD ββββββββββββββββββββββββββββ | |
| def get_all_templates(): | |
| """Get all message templates.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM templates ORDER BY created_at DESC") | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def create_template(name: str, content: str): | |
| """Create a new message template.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "INSERT INTO templates (name, content) VALUES (?, ?)", | |
| (name, content), | |
| ) | |
| conn.commit() | |
| tid = cursor.lastrowid | |
| conn.close() | |
| return tid | |
| def update_template(template_id: int, **kwargs): | |
| """Update a template.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| updates = {k: v for k, v in kwargs.items() if v is not None} | |
| if not updates: | |
| conn.close() | |
| return False | |
| set_clause = ", ".join(f"{k} = ?" for k in updates) | |
| values = list(updates.values()) + [template_id] | |
| cursor.execute(f"UPDATE templates SET {set_clause} WHERE id = ?", values) | |
| conn.commit() | |
| affected = cursor.rowcount | |
| conn.close() | |
| return affected > 0 | |
| def delete_template(template_id: int): | |
| """Delete a template.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM templates WHERE id = ?", (template_id,)) | |
| conn.commit() | |
| affected = cursor.rowcount | |
| conn.close() | |
| return affected > 0 | |