""" Lead Generation Database Module ================================ Manages SQLite database for storing and tracking leads. """ import sqlite3 import os from datetime import datetime from typing import Optional, List, Dict, Any DB_PATH = os.path.join(os.path.dirname(__file__), "leads.db") def get_connection() -> sqlite3.Connection: """Get a database connection with row factory enabled.""" conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db() -> None: """Initialize the database with required tables.""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS leads ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, phone TEXT, email TEXT, address TEXT, website TEXT, rating REAL, reviews_count INTEGER, source TEXT DEFAULT 'google_maps', niche TEXT NOT NULL, country TEXT NOT NULL, city TEXT, status TEXT DEFAULT 'new', whatsapp_sent BOOLEAN DEFAULT 0, instagram_sent BOOLEAN DEFAULT 0, replied BOOLEAN DEFAULT 0, notes TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS outreach_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, lead_id INTEGER NOT NULL, channel TEXT NOT NULL, message_template TEXT, sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, response TEXT, FOREIGN KEY (lead_id) REFERENCES leads(id) ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_leads_status ON leads(status) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_leads_niche ON leads(niche) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_leads_country ON leads(country) """) conn.commit() conn.close() print(f"[OK] Base de datos inicializada en: {DB_PATH}") def add_lead(lead_data: Dict[str, Any]) -> int: """ Add a new lead to the database. Args: lead_data: Dictionary with lead information. Returns: The ID of the inserted lead. """ conn = get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO leads (name, phone, email, address, website, rating, reviews_count, source, niche, country, city, notes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( lead_data.get("name"), lead_data.get("phone"), lead_data.get("email"), lead_data.get("address"), lead_data.get("website"), lead_data.get("rating"), lead_data.get("reviews_count"), lead_data.get("source", "google_maps"), lead_data.get("niche"), lead_data.get("country"), lead_data.get("city"), lead_data.get("notes") )) lead_id = cursor.lastrowid conn.commit() conn.close() return lead_id def get_leads( niche: Optional[str] = None, country: Optional[str] = None, status: Optional[str] = None, limit: int = 100 ) -> List[Dict[str, Any]]: """ Retrieve leads with optional filters. Args: niche: Filter by niche (real_estate, insurance) country: Filter by country (usa, venezuela) status: Filter by status (new, contacted, replied, converted) limit: Maximum number of results Returns: List of lead dictionaries. """ conn = get_connection() cursor = conn.cursor() query = "SELECT * FROM leads WHERE 1=1" params = [] if niche: query += " AND niche = ?" params.append(niche) if country: query += " AND country = ?" params.append(country) if status: query += " AND status = ?" params.append(status) query += " ORDER BY created_at DESC LIMIT ?" params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def update_lead_status(lead_id: int, status: str, notes: Optional[str] = None) -> None: """Update the status of a lead.""" conn = get_connection() cursor = conn.cursor() if notes: cursor.execute(""" UPDATE leads SET status = ?, notes = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (status, notes, lead_id)) else: cursor.execute(""" UPDATE leads SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (status, lead_id)) conn.commit() conn.close() def mark_outreach_sent(lead_id: int, channel: str, template: str) -> None: """Mark that outreach was sent to a lead and log it.""" conn = get_connection() cursor = conn.cursor() # Update lead flags if channel == "whatsapp": cursor.execute("UPDATE leads SET whatsapp_sent = 1, status = 'contacted' WHERE id = ?", (lead_id,)) elif channel == "instagram": cursor.execute("UPDATE leads SET instagram_sent = 1, status = 'contacted' WHERE id = ?", (lead_id,)) # Log the outreach cursor.execute(""" INSERT INTO outreach_log (lead_id, channel, message_template) VALUES (?, ?, ?) """, (lead_id, channel, template)) conn.commit() conn.close() def get_pending_outreach(channel: str, niche: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: """Get leads that haven't been contacted yet via a specific channel.""" conn = get_connection() cursor = conn.cursor() query = "SELECT * FROM leads WHERE status = 'new'" params = [] if channel == "whatsapp": query += " AND whatsapp_sent = 0 AND phone IS NOT NULL" elif channel == "instagram": query += " AND instagram_sent = 0" if niche: query += " AND niche = ?" params.append(niche) query += " LIMIT ?" params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_stats() -> Dict[str, Any]: """Get statistics about the leads database.""" conn = get_connection() cursor = conn.cursor() stats = {} cursor.execute("SELECT COUNT(*) FROM leads") stats["total_leads"] = cursor.fetchone()[0] cursor.execute("SELECT niche, COUNT(*) FROM leads GROUP BY niche") stats["by_niche"] = dict(cursor.fetchall()) cursor.execute("SELECT country, COUNT(*) FROM leads GROUP BY country") stats["by_country"] = dict(cursor.fetchall()) cursor.execute("SELECT status, COUNT(*) FROM leads GROUP BY status") stats["by_status"] = dict(cursor.fetchall()) cursor.execute("SELECT COUNT(*) FROM leads WHERE whatsapp_sent = 1") stats["whatsapp_sent"] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM leads WHERE instagram_sent = 1") stats["instagram_sent"] = cursor.fetchone()[0] conn.close() return stats if __name__ == "__main__": # Initialize the database when run directly init_db() # Show current stats stats = get_stats() print(f"\n[ESTADISTICAS] Estadisticas de la Base de Datos:") print(f" Total leads: {stats['total_leads']}") print(f" Por nicho: {stats['by_niche']}") print(f" Por paĆ­s: {stats['by_country']}")