Spaces:
Sleeping
Sleeping
File size: 8,287 Bytes
48a3682 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 | """
SQLite database setup and CRUD operations for leads and message templates.
"""
import sqlite3
import csv
import io
import os
from datetime import datetime
from typing import Optional
DB_PATH = os.path.join(os.path.dirname(__file__), "leads.db")
def get_connection():
"""Get a database connection with row factory."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""Initialize the database tables and seed default template."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
phone TEXT,
address TEXT,
website TEXT,
email TEXT,
instagram TEXT,
linkedin TEXT,
status TEXT DEFAULT 'new',
notes TEXT DEFAULT '',
follow_up_date TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
# Gracefully add new columns to an existing database
try: cursor.execute("ALTER TABLE leads ADD COLUMN email TEXT")
except sqlite3.OperationalError: pass
try: cursor.execute("ALTER TABLE leads ADD COLUMN instagram TEXT")
except sqlite3.OperationalError: pass
try: cursor.execute("ALTER TABLE leads ADD COLUMN linkedin TEXT")
except sqlite3.OperationalError: pass
cursor.execute("""
CREATE TABLE IF NOT EXISTS templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
)
""")
# Seed a default template if none exist
cursor.execute("SELECT COUNT(*) FROM templates")
if cursor.fetchone()[0] == 0:
cursor.execute(
"INSERT INTO templates (name, content) VALUES (?, ?)",
(
"Default Outreach",
"Hi, I saw your business online and noticed you don't have a proper website. "
"I help businesses get more customers by building simple, fast websites. "
"Would you like to see a demo?",
),
)
conn.commit()
conn.close()
# ββββββββββββββββββββββββββββ Lead CRUD ββββββββββββββββββββββββββββ
def insert_lead(name: str, phone: str = None, address: str = None, website: str = None, email: str = None, instagram: str = None, linkedin: str = None):
"""Insert a new lead into the database."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO leads (name, phone, address, website, email, instagram, linkedin) VALUES (?, ?, ?, ?, ?, ?, ?)",
(name, phone, address, website, email, instagram, linkedin),
)
conn.commit()
lead_id = cursor.lastrowid
conn.close()
return lead_id
def get_all_leads(
search: Optional[str] = None,
status: Optional[str] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
):
"""Get all leads with optional search and filter."""
conn = get_connection()
cursor = conn.cursor()
query = "SELECT * FROM leads WHERE 1=1"
params = []
if search:
query += " AND (name LIKE ? OR phone LIKE ? OR address LIKE ?)"
like = f"%{search}%"
params.extend([like, like, like])
if status:
query += " AND status = ?"
params.append(status)
# Whitelist sort columns to prevent SQL injection
allowed_sort = {"created_at", "name", "status", "follow_up_date"}
if sort_by not in allowed_sort:
sort_by = "created_at"
order = "DESC" if sort_order.lower() == "desc" else "ASC"
query += f" ORDER BY {sort_by} {order}"
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_lead_by_id(lead_id: int):
"""Get a single lead by ID."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM leads WHERE id = ?", (lead_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def update_lead(lead_id: int, **kwargs):
"""Update a lead's fields."""
conn = get_connection()
cursor = conn.cursor()
# Filter out None values
updates = {k: v for k, v in kwargs.items() if v is not None}
if not updates:
conn.close()
return False
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [lead_id]
cursor.execute(f"UPDATE leads SET {set_clause} WHERE id = ?", values)
conn.commit()
affected = cursor.rowcount
conn.close()
return affected > 0
def delete_lead(lead_id: int):
"""Delete a lead by ID."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM leads WHERE id = ?", (lead_id,))
conn.commit()
affected = cursor.rowcount
conn.close()
return affected > 0
# ββββββββββββββββββββββββββββ Stats ββββββββββββββββββββββββββββ
def get_dashboard_stats():
"""Get aggregated stats for the dashboard."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
COUNT(*) as total,
COALESCE(SUM(CASE WHEN status = 'new' THEN 1 ELSE 0 END), 0) as new,
COALESCE(SUM(CASE WHEN status = 'contacted' THEN 1 ELSE 0 END), 0) as contacted,
COALESCE(SUM(CASE WHEN status = 'replied' THEN 1 ELSE 0 END), 0) as replied,
COALESCE(SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END), 0) as closed
FROM leads
""")
row = cursor.fetchone()
conn.close()
# Ensure all values are integers (not None)
result = dict(row)
return {k: (v or 0) for k, v in result.items()}
# ββββββββββββββββββββββββββββ CSV Export ββββββββββββββββββββββββββββ
def export_leads_csv(status: Optional[str] = None):
"""Export leads to CSV string."""
leads = get_all_leads(status=status)
output = io.StringIO()
writer = csv.DictWriter(
output,
fieldnames=["id", "name", "phone", "address", "website", "email", "instagram", "linkedin", "status", "notes", "follow_up_date", "created_at"],
)
writer.writeheader()
writer.writerows(leads)
return output.getvalue()
# ββββββββββββββββββββββββββββ Template CRUD ββββββββββββββββββββββββββββ
def get_all_templates():
"""Get all message templates."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM templates ORDER BY created_at DESC")
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def create_template(name: str, content: str):
"""Create a new message template."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO templates (name, content) VALUES (?, ?)",
(name, content),
)
conn.commit()
tid = cursor.lastrowid
conn.close()
return tid
def update_template(template_id: int, **kwargs):
"""Update a template."""
conn = get_connection()
cursor = conn.cursor()
updates = {k: v for k, v in kwargs.items() if v is not None}
if not updates:
conn.close()
return False
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [template_id]
cursor.execute(f"UPDATE templates SET {set_clause} WHERE id = ?", values)
conn.commit()
affected = cursor.rowcount
conn.close()
return affected > 0
def delete_template(template_id: int):
"""Delete a template."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM templates WHERE id = ?", (template_id,))
conn.commit()
affected = cursor.rowcount
conn.close()
return affected > 0
|