Scraping / backend /database.py
Harshasnade's picture
πŸš€ LeadFlow - Lead Generation & Outreach System with premium UI
48a3682
"""
SQLite database setup and CRUD operations for leads and message templates.
"""
import sqlite3
import csv
import io
import os
from datetime import datetime
from typing import Optional
DB_PATH = os.path.join(os.path.dirname(__file__), "leads.db")
def get_connection():
"""Get a database connection with row factory."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
"""Initialize the database tables and seed default template."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
phone TEXT,
address TEXT,
website TEXT,
email TEXT,
instagram TEXT,
linkedin TEXT,
status TEXT DEFAULT 'new',
notes TEXT DEFAULT '',
follow_up_date TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
# Gracefully add new columns to an existing database
try: cursor.execute("ALTER TABLE leads ADD COLUMN email TEXT")
except sqlite3.OperationalError: pass
try: cursor.execute("ALTER TABLE leads ADD COLUMN instagram TEXT")
except sqlite3.OperationalError: pass
try: cursor.execute("ALTER TABLE leads ADD COLUMN linkedin TEXT")
except sqlite3.OperationalError: pass
cursor.execute("""
CREATE TABLE IF NOT EXISTS templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
)
""")
# Seed a default template if none exist
cursor.execute("SELECT COUNT(*) FROM templates")
if cursor.fetchone()[0] == 0:
cursor.execute(
"INSERT INTO templates (name, content) VALUES (?, ?)",
(
"Default Outreach",
"Hi, I saw your business online and noticed you don't have a proper website. "
"I help businesses get more customers by building simple, fast websites. "
"Would you like to see a demo?",
),
)
conn.commit()
conn.close()
# ──────────────────────────── Lead CRUD ────────────────────────────
def insert_lead(name: str, phone: str = None, address: str = None, website: str = None, email: str = None, instagram: str = None, linkedin: str = None):
"""Insert a new lead into the database."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO leads (name, phone, address, website, email, instagram, linkedin) VALUES (?, ?, ?, ?, ?, ?, ?)",
(name, phone, address, website, email, instagram, linkedin),
)
conn.commit()
lead_id = cursor.lastrowid
conn.close()
return lead_id
def get_all_leads(
search: Optional[str] = None,
status: Optional[str] = None,
sort_by: str = "created_at",
sort_order: str = "desc",
):
"""Get all leads with optional search and filter."""
conn = get_connection()
cursor = conn.cursor()
query = "SELECT * FROM leads WHERE 1=1"
params = []
if search:
query += " AND (name LIKE ? OR phone LIKE ? OR address LIKE ?)"
like = f"%{search}%"
params.extend([like, like, like])
if status:
query += " AND status = ?"
params.append(status)
# Whitelist sort columns to prevent SQL injection
allowed_sort = {"created_at", "name", "status", "follow_up_date"}
if sort_by not in allowed_sort:
sort_by = "created_at"
order = "DESC" if sort_order.lower() == "desc" else "ASC"
query += f" ORDER BY {sort_by} {order}"
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_lead_by_id(lead_id: int):
"""Get a single lead by ID."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM leads WHERE id = ?", (lead_id,))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def update_lead(lead_id: int, **kwargs):
"""Update a lead's fields."""
conn = get_connection()
cursor = conn.cursor()
# Filter out None values
updates = {k: v for k, v in kwargs.items() if v is not None}
if not updates:
conn.close()
return False
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [lead_id]
cursor.execute(f"UPDATE leads SET {set_clause} WHERE id = ?", values)
conn.commit()
affected = cursor.rowcount
conn.close()
return affected > 0
def delete_lead(lead_id: int):
"""Delete a lead by ID."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM leads WHERE id = ?", (lead_id,))
conn.commit()
affected = cursor.rowcount
conn.close()
return affected > 0
# ──────────────────────────── Stats ────────────────────────────
def get_dashboard_stats():
"""Get aggregated stats for the dashboard."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
COUNT(*) as total,
COALESCE(SUM(CASE WHEN status = 'new' THEN 1 ELSE 0 END), 0) as new,
COALESCE(SUM(CASE WHEN status = 'contacted' THEN 1 ELSE 0 END), 0) as contacted,
COALESCE(SUM(CASE WHEN status = 'replied' THEN 1 ELSE 0 END), 0) as replied,
COALESCE(SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END), 0) as closed
FROM leads
""")
row = cursor.fetchone()
conn.close()
# Ensure all values are integers (not None)
result = dict(row)
return {k: (v or 0) for k, v in result.items()}
# ──────────────────────────── CSV Export ────────────────────────────
def export_leads_csv(status: Optional[str] = None):
"""Export leads to CSV string."""
leads = get_all_leads(status=status)
output = io.StringIO()
writer = csv.DictWriter(
output,
fieldnames=["id", "name", "phone", "address", "website", "email", "instagram", "linkedin", "status", "notes", "follow_up_date", "created_at"],
)
writer.writeheader()
writer.writerows(leads)
return output.getvalue()
# ──────────────────────────── Template CRUD ────────────────────────────
def get_all_templates():
"""Get all message templates."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM templates ORDER BY created_at DESC")
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def create_template(name: str, content: str):
"""Create a new message template."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO templates (name, content) VALUES (?, ?)",
(name, content),
)
conn.commit()
tid = cursor.lastrowid
conn.close()
return tid
def update_template(template_id: int, **kwargs):
"""Update a template."""
conn = get_connection()
cursor = conn.cursor()
updates = {k: v for k, v in kwargs.items() if v is not None}
if not updates:
conn.close()
return False
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [template_id]
cursor.execute(f"UPDATE templates SET {set_clause} WHERE id = ?", values)
conn.commit()
affected = cursor.rowcount
conn.close()
return affected > 0
def delete_template(template_id: int):
"""Delete a template."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM templates WHERE id = ?", (template_id,))
conn.commit()
affected = cursor.rowcount
conn.close()
return affected > 0