Spaces:
Running
Running
| """ | |
| utils/database.py β SQLite persistence layer for leads. | |
| Provides CRUD operations and cross-run deduplication so leads | |
| survive server restarts and are never scraped twice. | |
| Uses WAL journal mode for concurrent read access from Flask threads. | |
| """ | |
| import os | |
| import re | |
| import shutil | |
| import sqlite3 | |
| import threading | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Tuple | |
| import config | |
| from models import Lead | |
| from utils.logger import get_logger | |
| logger = get_logger(__name__) | |
| # Thread-local storage for connections | |
| _local = threading.local() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Schema | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _SCHEMA = """ | |
| CREATE TABLE IF NOT EXISTS leads ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| business_name TEXT NOT NULL, | |
| service_category TEXT DEFAULT '', | |
| phone TEXT DEFAULT '', | |
| email TEXT DEFAULT '', | |
| address TEXT DEFAULT '', | |
| website TEXT DEFAULT '', | |
| contact_person TEXT DEFAULT '', | |
| source TEXT DEFAULT '', | |
| lead_source_scraper TEXT DEFAULT '', | |
| notes TEXT DEFAULT '', | |
| user_notes TEXT DEFAULT '', | |
| score TEXT DEFAULT 'LOW', | |
| score_points INTEGER DEFAULT 0, | |
| industry TEXT DEFAULT '', | |
| product_category TEXT DEFAULT '', | |
| status TEXT DEFAULT 'New', | |
| fingerprint TEXT DEFAULT '', | |
| -- Website analysis flags | |
| has_https INTEGER DEFAULT 0, | |
| is_mobile_friendly INTEGER DEFAULT 1, | |
| is_outdated INTEGER DEFAULT 0, | |
| has_chatbot INTEGER DEFAULT 0, | |
| has_online_booking INTEGER DEFAULT 0, | |
| social_facebook TEXT DEFAULT '', | |
| social_instagram TEXT DEFAULT '', | |
| has_contact_form INTEGER DEFAULT 0, | |
| ssl_expiry_days INTEGER, | |
| page_speed_score REAL, | |
| load_time_sec REAL, | |
| site_status TEXT DEFAULT '', | |
| gmb_exists INTEGER DEFAULT 0, | |
| review_count INTEGER DEFAULT 0, | |
| opportunity_pitch TEXT DEFAULT '', | |
| -- Timestamps | |
| scraped_at TEXT DEFAULT '', | |
| updated_at TEXT DEFAULT '', | |
| -- Dedup | |
| UNIQUE(fingerprint) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_leads_score ON leads(score); | |
| CREATE INDEX IF NOT EXISTS idx_leads_status ON leads(status); | |
| CREATE INDEX IF NOT EXISTS idx_leads_service ON leads(service_category); | |
| CREATE INDEX IF NOT EXISTS idx_leads_fingerprint ON leads(fingerprint); | |
| CREATE INDEX IF NOT EXISTS idx_leads_name ON leads(business_name COLLATE NOCASE); | |
| """ | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Connection management | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_conn() -> sqlite3.Connection: | |
| """Get a thread-local SQLite connection with WAL mode enabled.""" | |
| if not hasattr(_local, "conn") or _local.conn is None: | |
| _local.conn = sqlite3.connect( | |
| config.DB_PATH, | |
| check_same_thread=False, | |
| timeout=10, | |
| ) | |
| _local.conn.execute("PRAGMA journal_mode=WAL") | |
| _local.conn.execute("PRAGMA busy_timeout=5000") | |
| _local.conn.execute("PRAGMA foreign_keys=ON") | |
| _local.conn.row_factory = sqlite3.Row | |
| return _local.conn | |
| def init_db() -> None: | |
| """Create tables and indexes if they don't exist. Backup if configured.""" | |
| # Backup existing DB | |
| if config.DB_BACKUP_ON_STARTUP and os.path.isfile(config.DB_PATH): | |
| bak = config.DB_PATH + ".bak" | |
| try: | |
| shutil.copy2(config.DB_PATH, bak) | |
| logger.debug(f"Database backed up to {bak}") | |
| except Exception as e: | |
| logger.warning(f"DB backup failed: {e}") | |
| conn = _get_conn() | |
| conn.executescript(_SCHEMA) | |
| conn.commit() | |
| _maybe_migrate(conn) | |
| logger.info(f"Database initialised at {config.DB_PATH}") | |
| def _maybe_migrate(conn: sqlite3.Connection) -> None: | |
| """Add any missing columns from newer schema versions.""" | |
| cursor = conn.execute("PRAGMA table_info(leads)") | |
| existing_cols = {row["name"] for row in cursor.fetchall()} | |
| # Columns added in v2 β add if missing | |
| v2_columns = { | |
| "contact_person": "TEXT DEFAULT ''", | |
| "lead_source_scraper": "TEXT DEFAULT ''", | |
| "user_notes": "TEXT DEFAULT ''", | |
| "industry": "TEXT DEFAULT ''", | |
| "product_category": "TEXT DEFAULT ''", | |
| "status": "TEXT DEFAULT 'New'", | |
| "social_facebook": "TEXT DEFAULT ''", | |
| "social_instagram": "TEXT DEFAULT ''", | |
| "has_contact_form": "INTEGER DEFAULT 0", | |
| "ssl_expiry_days": "INTEGER", | |
| "page_speed_score": "REAL", | |
| "load_time_sec": "REAL", | |
| "site_status": "TEXT DEFAULT ''", | |
| "gmb_exists": "INTEGER DEFAULT 0", | |
| "review_count": "INTEGER DEFAULT 0", | |
| "opportunity_pitch": "TEXT DEFAULT ''", | |
| } | |
| for col_name, col_def in v2_columns.items(): | |
| if col_name not in existing_cols: | |
| try: | |
| conn.execute(f"ALTER TABLE leads ADD COLUMN {col_name} {col_def}") | |
| logger.info(f"Migration: added column '{col_name}'") | |
| except sqlite3.OperationalError: | |
| pass # Column already exists | |
| conn.commit() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CRUD Operations | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def insert_lead(lead: Lead) -> Optional[int]: | |
| """ | |
| Insert a Lead into the database. | |
| Returns the row ID on success, or None if the lead already exists | |
| (fingerprint conflict). | |
| """ | |
| conn = _get_conn() | |
| try: | |
| lead.stamp_updated() | |
| if not lead.scraped_at: | |
| lead.stamp_scraped() | |
| cursor = conn.execute(""" | |
| INSERT OR IGNORE INTO leads ( | |
| business_name, service_category, phone, email, address, website, | |
| contact_person, source, lead_source_scraper, notes, user_notes, | |
| score, score_points, industry, product_category, status, | |
| fingerprint, has_https, is_mobile_friendly, is_outdated, | |
| has_chatbot, has_online_booking, social_facebook, | |
| social_instagram, has_contact_form, ssl_expiry_days, | |
| page_speed_score, load_time_sec, site_status, gmb_exists, | |
| review_count, opportunity_pitch, scraped_at, updated_at | |
| ) VALUES ( | |
| ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, | |
| ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? | |
| ) | |
| """, ( | |
| lead.business_name, lead.service_category, lead.phone, lead.email, | |
| lead.address, lead.website, lead.contact_person, lead.source, | |
| lead.lead_source_scraper, lead.notes, lead.user_notes, | |
| lead.score, lead.score_points, lead.industry, lead.product_category, | |
| lead.status, lead.fingerprint, | |
| int(lead.has_https), int(lead.is_mobile_friendly), | |
| int(lead.is_outdated), int(lead.has_chatbot), | |
| int(lead.has_online_booking), lead.social_facebook, | |
| lead.social_instagram, int(lead.has_contact_form), | |
| lead.ssl_expiry_days, lead.page_speed_score, lead.load_time_sec, | |
| lead.site_status, int(lead.gmb_exists), lead.review_count, | |
| lead.opportunity_pitch, lead.scraped_at, lead.updated_at, | |
| )) | |
| conn.commit() | |
| if cursor.rowcount > 0: | |
| lead.db_id = cursor.lastrowid | |
| return cursor.lastrowid | |
| return None # Already existed | |
| except sqlite3.Error as e: | |
| logger.error(f"DB insert error: {e}") | |
| conn.rollback() | |
| return None | |
| def insert_leads(leads: List[Lead]) -> Tuple[int, int]: | |
| """Insert multiple leads. Returns (inserted_count, skipped_count).""" | |
| inserted = 0 | |
| skipped = 0 | |
| for lead in leads: | |
| row_id = insert_lead(lead) | |
| if row_id: | |
| inserted += 1 | |
| else: | |
| skipped += 1 | |
| logger.info(f"Bulk insert: {inserted} new, {skipped} skipped (duplicates)") | |
| return inserted, skipped | |
| def lead_exists(fingerprint: str) -> bool: | |
| """Check if a lead with this fingerprint already exists in the DB.""" | |
| conn = _get_conn() | |
| row = conn.execute( | |
| "SELECT 1 FROM leads WHERE fingerprint = ? LIMIT 1", | |
| (fingerprint,) | |
| ).fetchone() | |
| return row is not None | |
| def get_all_leads( | |
| page: int = 1, | |
| per_page: int = 25, | |
| score_filter: str = "", | |
| service_filter: str = "", | |
| status_filter: str = "", | |
| search: str = "", | |
| sort_by: str = "score_points", | |
| sort_dir: str = "DESC", | |
| ) -> Tuple[List[Lead], int]: | |
| """ | |
| Retrieve leads from the database with filtering, sorting, and pagination. | |
| Returns (list_of_leads, total_count). | |
| """ | |
| conn = _get_conn() | |
| where_clauses = [] | |
| params = [] | |
| if score_filter and score_filter.upper() != "ALL": | |
| where_clauses.append("score = ?") | |
| params.append(score_filter.upper()) | |
| if service_filter and service_filter.lower() != "all": | |
| where_clauses.append("service_category = ?") | |
| params.append(service_filter.lower()) | |
| if status_filter and status_filter != "all": | |
| where_clauses.append("status = ?") | |
| params.append(status_filter) | |
| if search: | |
| where_clauses.append( | |
| "(business_name LIKE ? OR email LIKE ? OR phone LIKE ? OR address LIKE ?)" | |
| ) | |
| like = f"%{search}%" | |
| params.extend([like, like, like, like]) | |
| where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" | |
| # Validate sort column to prevent SQL injection | |
| allowed_sorts = { | |
| "score_points", "business_name", "service_category", | |
| "score", "status", "scraped_at", "updated_at", | |
| } | |
| if sort_by not in allowed_sorts: | |
| sort_by = "score_points" | |
| sort_dir = "DESC" if sort_dir.upper() == "DESC" else "ASC" | |
| # Total count | |
| count_sql = f"SELECT COUNT(*) FROM leads{where_sql}" | |
| total = conn.execute(count_sql, params).fetchone()[0] | |
| # Paginated query | |
| offset = (page - 1) * per_page | |
| data_sql = ( | |
| f"SELECT * FROM leads{where_sql} " | |
| f"ORDER BY {sort_by} {sort_dir} " | |
| f"LIMIT ? OFFSET ?" | |
| ) | |
| rows = conn.execute(data_sql, params + [per_page, offset]).fetchall() | |
| leads = [_row_to_lead(row) for row in rows] | |
| return leads, total | |
| def get_lead_by_id(db_id: int) -> Optional[Lead]: | |
| """Retrieve a single lead by its database ID.""" | |
| conn = _get_conn() | |
| row = conn.execute("SELECT * FROM leads WHERE id = ?", (db_id,)).fetchone() | |
| return _row_to_lead(row) if row else None | |
| def update_lead_status(db_id: int, status: str) -> bool: | |
| """Update the status of a lead (New/Contacted/Converted/Rejected).""" | |
| valid = {"New", "Contacted", "Converted", "Rejected"} | |
| if status not in valid: | |
| logger.warning(f"Invalid status: {status}") | |
| return False | |
| conn = _get_conn() | |
| conn.execute( | |
| "UPDATE leads SET status = ?, updated_at = ? WHERE id = ?", | |
| (status, datetime.now().isoformat(), db_id), | |
| ) | |
| conn.commit() | |
| return True | |
| def bulk_update_status(db_ids: List[int], status: str) -> int: | |
| """Update status for multiple leads. Returns count updated.""" | |
| valid = {"New", "Contacted", "Converted", "Rejected"} | |
| if status not in valid: | |
| return 0 | |
| conn = _get_conn() | |
| now = datetime.now().isoformat() | |
| placeholders = ",".join("?" * len(db_ids)) | |
| conn.execute( | |
| f"UPDATE leads SET status = ?, updated_at = ? WHERE id IN ({placeholders})", | |
| [status, now] + db_ids, | |
| ) | |
| conn.commit() | |
| return len(db_ids) | |
| def update_lead_notes(db_id: int, notes: str) -> bool: | |
| """Update user notes for a lead.""" | |
| conn = _get_conn() | |
| conn.execute( | |
| "UPDATE leads SET user_notes = ?, updated_at = ? WHERE id = ?", | |
| (notes, datetime.now().isoformat(), db_id), | |
| ) | |
| conn.commit() | |
| return True | |
| def update_lead_full(lead: Lead) -> bool: | |
| """Update all fields of an existing lead.""" | |
| if lead.db_id is None: | |
| return False | |
| conn = _get_conn() | |
| lead.stamp_updated() | |
| conn.execute(""" | |
| UPDATE leads SET | |
| business_name=?, service_category=?, phone=?, email=?, address=?, | |
| website=?, contact_person=?, source=?, lead_source_scraper=?, | |
| notes=?, score=?, score_points=?, industry=?, product_category=?, | |
| has_https=?, is_mobile_friendly=?, is_outdated=?, has_chatbot=?, | |
| has_online_booking=?, social_facebook=?, social_instagram=?, | |
| has_contact_form=?, ssl_expiry_days=?, page_speed_score=?, | |
| load_time_sec=?, site_status=?, gmb_exists=?, review_count=?, | |
| opportunity_pitch=?, updated_at=? | |
| WHERE id=? | |
| """, ( | |
| lead.business_name, lead.service_category, lead.phone, lead.email, | |
| lead.address, lead.website, lead.contact_person, lead.source, | |
| lead.lead_source_scraper, lead.notes, lead.score, lead.score_points, | |
| lead.industry, lead.product_category, | |
| int(lead.has_https), int(lead.is_mobile_friendly), | |
| int(lead.is_outdated), int(lead.has_chatbot), | |
| int(lead.has_online_booking), lead.social_facebook, | |
| lead.social_instagram, int(lead.has_contact_form), | |
| lead.ssl_expiry_days, lead.page_speed_score, lead.load_time_sec, | |
| lead.site_status, int(lead.gmb_exists), lead.review_count, | |
| lead.opportunity_pitch, lead.updated_at, lead.db_id, | |
| )) | |
| conn.commit() | |
| return True | |
| def get_stats() -> Dict: | |
| """Aggregate statistics for the dashboard.""" | |
| conn = _get_conn() | |
| total = conn.execute("SELECT COUNT(*) FROM leads").fetchone()[0] | |
| scores = {} | |
| for row in conn.execute("SELECT score, COUNT(*) as c FROM leads GROUP BY score"): | |
| scores[row["score"]] = row["c"] | |
| services = {} | |
| for row in conn.execute("SELECT service_category, COUNT(*) as c FROM leads GROUP BY service_category"): | |
| services[row["service_category"]] = row["c"] | |
| statuses = {} | |
| for row in conn.execute("SELECT status, COUNT(*) as c FROM leads GROUP BY status"): | |
| statuses[row["status"]] = row["c"] | |
| avg_score = conn.execute("SELECT AVG(score_points) FROM leads").fetchone()[0] or 0 | |
| analysis = conn.execute(""" | |
| SELECT | |
| ROUND(100.0 * SUM(CASE WHEN website != '' THEN 1 ELSE 0 END) / MAX(COUNT(*), 1), 1) as has_website_pct, | |
| ROUND(100.0 * SUM(CASE WHEN website != '' AND has_https = 0 THEN 1 ELSE 0 END) / MAX(COUNT(*), 1), 1) as no_https_pct, | |
| ROUND(100.0 * SUM(CASE WHEN is_outdated = 1 THEN 1 ELSE 0 END) / MAX(COUNT(*), 1), 1) as outdated_pct, | |
| ROUND(100.0 * SUM(CASE WHEN has_chatbot = 0 AND has_online_booking = 0 THEN 1 ELSE 0 END) / MAX(COUNT(*), 1), 1) as no_automation_pct | |
| FROM leads | |
| """).fetchone() | |
| return { | |
| "total": total, | |
| "by_score": { | |
| "HIGH": scores.get("HIGH", 0), | |
| "MEDIUM": scores.get("MEDIUM", 0), | |
| "LOW": scores.get("LOW", 0), | |
| }, | |
| "by_service": { | |
| "web": services.get("web", 0), | |
| "app": services.get("app", 0), | |
| "ai": services.get("ai", 0), | |
| }, | |
| "by_status": { | |
| "New": statuses.get("New", 0), | |
| "Contacted": statuses.get("Contacted", 0), | |
| "Converted": statuses.get("Converted", 0), | |
| "Rejected": statuses.get("Rejected", 0), | |
| }, | |
| "avg_score": round(avg_score, 1), | |
| "has_website_pct": analysis["has_website_pct"] or 0, | |
| "no_https_pct": analysis["no_https_pct"] or 0, | |
| "outdated_pct": analysis["outdated_pct"] or 0, | |
| "no_automation_pct": analysis["no_automation_pct"] or 0, | |
| } | |
| def get_top_leads(limit: int = 5) -> List[Lead]: | |
| """Get top N leads by score points.""" | |
| conn = _get_conn() | |
| rows = conn.execute( | |
| "SELECT * FROM leads ORDER BY score_points DESC LIMIT ?", (limit,) | |
| ).fetchall() | |
| return [_row_to_lead(row) for row in rows] | |
| def delete_all_leads() -> int: | |
| """Delete ALL leads from the database. Returns the number of rows deleted.""" | |
| conn = _get_conn() | |
| cursor = conn.execute("DELETE FROM leads") | |
| deleted = cursor.rowcount | |
| conn.commit() | |
| logger.info(f"Deleted all leads: {deleted} rows removed") | |
| return deleted | |
| def search_by_name(name: str, threshold: float = 0.85) -> List[Lead]: | |
| """Search for leads with similar names (for cross-run dedup).""" | |
| conn = _get_conn() | |
| rows = conn.execute( | |
| "SELECT * FROM leads WHERE business_name LIKE ?", | |
| (f"%{name}%",), | |
| ).fetchall() | |
| return [_row_to_lead(row) for row in rows] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Row β Lead conversion | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _row_to_lead(row: sqlite3.Row) -> Lead: | |
| """Convert a SQLite Row to a Lead object.""" | |
| return Lead( | |
| business_name=row["business_name"] or "", | |
| service_category=row["service_category"] or "", | |
| phone=row["phone"] or "", | |
| email=row["email"] or "", | |
| address=row["address"] or "", | |
| website=row["website"] or "", | |
| contact_person=row["contact_person"] or "", | |
| source=row["source"] or "", | |
| lead_source_scraper=row["lead_source_scraper"] or "", | |
| notes=row["notes"] or "", | |
| user_notes=row["user_notes"] or "", | |
| score=row["score"] or "LOW", | |
| score_points=row["score_points"] or 0, | |
| industry=row["industry"] or "", | |
| product_category=row["product_category"] or "", | |
| status=row["status"] or "New", | |
| db_id=row["id"], | |
| scraped_at=row["scraped_at"] or "", | |
| updated_at=row["updated_at"] or "", | |
| has_https=bool(row["has_https"]), | |
| is_mobile_friendly=bool(row["is_mobile_friendly"]), | |
| is_outdated=bool(row["is_outdated"]), | |
| has_chatbot=bool(row["has_chatbot"]), | |
| has_online_booking=bool(row["has_online_booking"]), | |
| social_facebook=row["social_facebook"] or "", | |
| social_instagram=row["social_instagram"] or "", | |
| has_contact_form=bool(row["has_contact_form"]), | |
| ssl_expiry_days=row["ssl_expiry_days"], | |
| page_speed_score=row["page_speed_score"], | |
| load_time_sec=row["load_time_sec"], | |
| site_status=row["site_status"] or "", | |
| gmb_exists=bool(row["gmb_exists"]), | |
| review_count=row["review_count"] or 0, | |
| opportunity_pitch=row["opportunity_pitch"] or "", | |
| fingerprint=row["fingerprint"] or "", | |
| ) | |