Spaces:
Sleeping
Sleeping
| """ | |
| Lead Generation Database Module | |
| ================================ | |
| Manages SQLite database for storing and tracking leads. | |
| """ | |
| import sqlite3 | |
| import os | |
| from datetime import datetime | |
| from typing import Optional, List, Dict, Any | |
| DB_PATH = os.path.join(os.path.dirname(__file__), "leads.db") | |
| def get_connection() -> sqlite3.Connection: | |
| """Get a database connection with row factory enabled.""" | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db() -> None: | |
| """Initialize the database with required tables.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS leads ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| phone TEXT, | |
| email TEXT, | |
| address TEXT, | |
| website TEXT, | |
| rating REAL, | |
| reviews_count INTEGER, | |
| source TEXT DEFAULT 'google_maps', | |
| niche TEXT NOT NULL, | |
| country TEXT NOT NULL, | |
| city TEXT, | |
| status TEXT DEFAULT 'new', | |
| whatsapp_sent BOOLEAN DEFAULT 0, | |
| instagram_sent BOOLEAN DEFAULT 0, | |
| replied BOOLEAN DEFAULT 0, | |
| notes TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS outreach_log ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| lead_id INTEGER NOT NULL, | |
| channel TEXT NOT NULL, | |
| message_template TEXT, | |
| sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| response TEXT, | |
| FOREIGN KEY (lead_id) REFERENCES leads(id) | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_leads_status ON leads(status) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_leads_niche ON leads(niche) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_leads_country ON leads(country) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| print(f"[OK] Base de datos inicializada en: {DB_PATH}") | |
| def add_lead(lead_data: Dict[str, Any]) -> int: | |
| """ | |
| Add a new lead to the database. | |
| Args: | |
| lead_data: Dictionary with lead information. | |
| Returns: | |
| The ID of the inserted lead. | |
| """ | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO leads (name, phone, email, address, website, rating, | |
| reviews_count, source, niche, country, city, notes) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| lead_data.get("name"), | |
| lead_data.get("phone"), | |
| lead_data.get("email"), | |
| lead_data.get("address"), | |
| lead_data.get("website"), | |
| lead_data.get("rating"), | |
| lead_data.get("reviews_count"), | |
| lead_data.get("source", "google_maps"), | |
| lead_data.get("niche"), | |
| lead_data.get("country"), | |
| lead_data.get("city"), | |
| lead_data.get("notes") | |
| )) | |
| lead_id = cursor.lastrowid | |
| conn.commit() | |
| conn.close() | |
| return lead_id | |
| def get_leads( | |
| niche: Optional[str] = None, | |
| country: Optional[str] = None, | |
| status: Optional[str] = None, | |
| limit: int = 100 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve leads with optional filters. | |
| Args: | |
| niche: Filter by niche (real_estate, insurance) | |
| country: Filter by country (usa, venezuela) | |
| status: Filter by status (new, contacted, replied, converted) | |
| limit: Maximum number of results | |
| Returns: | |
| List of lead dictionaries. | |
| """ | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| query = "SELECT * FROM leads WHERE 1=1" | |
| params = [] | |
| if niche: | |
| query += " AND niche = ?" | |
| params.append(niche) | |
| if country: | |
| query += " AND country = ?" | |
| params.append(country) | |
| if status: | |
| query += " AND status = ?" | |
| params.append(status) | |
| query += " ORDER BY created_at DESC LIMIT ?" | |
| params.append(limit) | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def update_lead_status(lead_id: int, status: str, notes: Optional[str] = None) -> None: | |
| """Update the status of a lead.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| if notes: | |
| cursor.execute(""" | |
| UPDATE leads | |
| SET status = ?, notes = ?, updated_at = CURRENT_TIMESTAMP | |
| WHERE id = ? | |
| """, (status, notes, lead_id)) | |
| else: | |
| cursor.execute(""" | |
| UPDATE leads | |
| SET status = ?, updated_at = CURRENT_TIMESTAMP | |
| WHERE id = ? | |
| """, (status, lead_id)) | |
| conn.commit() | |
| conn.close() | |
| def mark_outreach_sent(lead_id: int, channel: str, template: str) -> None: | |
| """Mark that outreach was sent to a lead and log it.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| # Update lead flags | |
| if channel == "whatsapp": | |
| cursor.execute("UPDATE leads SET whatsapp_sent = 1, status = 'contacted' WHERE id = ?", (lead_id,)) | |
| elif channel == "instagram": | |
| cursor.execute("UPDATE leads SET instagram_sent = 1, status = 'contacted' WHERE id = ?", (lead_id,)) | |
| # Log the outreach | |
| cursor.execute(""" | |
| INSERT INTO outreach_log (lead_id, channel, message_template) | |
| VALUES (?, ?, ?) | |
| """, (lead_id, channel, template)) | |
| conn.commit() | |
| conn.close() | |
| def get_pending_outreach(channel: str, niche: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: | |
| """Get leads that haven't been contacted yet via a specific channel.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| query = "SELECT * FROM leads WHERE status = 'new'" | |
| params = [] | |
| if channel == "whatsapp": | |
| query += " AND whatsapp_sent = 0 AND phone IS NOT NULL" | |
| elif channel == "instagram": | |
| query += " AND instagram_sent = 0" | |
| if niche: | |
| query += " AND niche = ?" | |
| params.append(niche) | |
| query += " LIMIT ?" | |
| params.append(limit) | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def get_stats() -> Dict[str, Any]: | |
| """Get statistics about the leads database.""" | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| stats = {} | |
| cursor.execute("SELECT COUNT(*) FROM leads") | |
| stats["total_leads"] = cursor.fetchone()[0] | |
| cursor.execute("SELECT niche, COUNT(*) FROM leads GROUP BY niche") | |
| stats["by_niche"] = dict(cursor.fetchall()) | |
| cursor.execute("SELECT country, COUNT(*) FROM leads GROUP BY country") | |
| stats["by_country"] = dict(cursor.fetchall()) | |
| cursor.execute("SELECT status, COUNT(*) FROM leads GROUP BY status") | |
| stats["by_status"] = dict(cursor.fetchall()) | |
| cursor.execute("SELECT COUNT(*) FROM leads WHERE whatsapp_sent = 1") | |
| stats["whatsapp_sent"] = cursor.fetchone()[0] | |
| cursor.execute("SELECT COUNT(*) FROM leads WHERE instagram_sent = 1") | |
| stats["instagram_sent"] = cursor.fetchone()[0] | |
| conn.close() | |
| return stats | |
| if __name__ == "__main__": | |
| # Initialize the database when run directly | |
| init_db() | |
| # Show current stats | |
| stats = get_stats() | |
| print(f"\n[ESTADISTICAS] Estadisticas de la Base de Datos:") | |
| print(f" Total leads: {stats['total_leads']}") | |
| print(f" Por nicho: {stats['by_niche']}") | |
| print(f" Por país: {stats['by_country']}") | |