NexusCRM / lead_gen_pro /database.py
kidpro2002's picture
🚀 Fusion: NexusCRM + Lead Gen Pro (100% Cloud Docker)
615e194
"""
Lead Generation Database Module
================================
Manages SQLite database for storing and tracking leads.
"""
import sqlite3
import os
from datetime import datetime
from typing import Optional, List, Dict, Any
DB_PATH = os.path.join(os.path.dirname(__file__), "leads.db")
def get_connection() -> sqlite3.Connection:
"""Get a database connection with row factory enabled."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db() -> None:
"""Initialize the database with required tables."""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
phone TEXT,
email TEXT,
address TEXT,
website TEXT,
rating REAL,
reviews_count INTEGER,
source TEXT DEFAULT 'google_maps',
niche TEXT NOT NULL,
country TEXT NOT NULL,
city TEXT,
status TEXT DEFAULT 'new',
whatsapp_sent BOOLEAN DEFAULT 0,
instagram_sent BOOLEAN DEFAULT 0,
replied BOOLEAN DEFAULT 0,
notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS outreach_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
lead_id INTEGER NOT NULL,
channel TEXT NOT NULL,
message_template TEXT,
sent_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
response TEXT,
FOREIGN KEY (lead_id) REFERENCES leads(id)
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_leads_status ON leads(status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_leads_niche ON leads(niche)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_leads_country ON leads(country)
""")
conn.commit()
conn.close()
print(f"[OK] Base de datos inicializada en: {DB_PATH}")
def add_lead(lead_data: Dict[str, Any]) -> int:
"""
Add a new lead to the database.
Args:
lead_data: Dictionary with lead information.
Returns:
The ID of the inserted lead.
"""
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO leads (name, phone, email, address, website, rating,
reviews_count, source, niche, country, city, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
lead_data.get("name"),
lead_data.get("phone"),
lead_data.get("email"),
lead_data.get("address"),
lead_data.get("website"),
lead_data.get("rating"),
lead_data.get("reviews_count"),
lead_data.get("source", "google_maps"),
lead_data.get("niche"),
lead_data.get("country"),
lead_data.get("city"),
lead_data.get("notes")
))
lead_id = cursor.lastrowid
conn.commit()
conn.close()
return lead_id
def get_leads(
niche: Optional[str] = None,
country: Optional[str] = None,
status: Optional[str] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Retrieve leads with optional filters.
Args:
niche: Filter by niche (real_estate, insurance)
country: Filter by country (usa, venezuela)
status: Filter by status (new, contacted, replied, converted)
limit: Maximum number of results
Returns:
List of lead dictionaries.
"""
conn = get_connection()
cursor = conn.cursor()
query = "SELECT * FROM leads WHERE 1=1"
params = []
if niche:
query += " AND niche = ?"
params.append(niche)
if country:
query += " AND country = ?"
params.append(country)
if status:
query += " AND status = ?"
params.append(status)
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def update_lead_status(lead_id: int, status: str, notes: Optional[str] = None) -> None:
"""Update the status of a lead."""
conn = get_connection()
cursor = conn.cursor()
if notes:
cursor.execute("""
UPDATE leads
SET status = ?, notes = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (status, notes, lead_id))
else:
cursor.execute("""
UPDATE leads
SET status = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (status, lead_id))
conn.commit()
conn.close()
def mark_outreach_sent(lead_id: int, channel: str, template: str) -> None:
"""Mark that outreach was sent to a lead and log it."""
conn = get_connection()
cursor = conn.cursor()
# Update lead flags
if channel == "whatsapp":
cursor.execute("UPDATE leads SET whatsapp_sent = 1, status = 'contacted' WHERE id = ?", (lead_id,))
elif channel == "instagram":
cursor.execute("UPDATE leads SET instagram_sent = 1, status = 'contacted' WHERE id = ?", (lead_id,))
# Log the outreach
cursor.execute("""
INSERT INTO outreach_log (lead_id, channel, message_template)
VALUES (?, ?, ?)
""", (lead_id, channel, template))
conn.commit()
conn.close()
def get_pending_outreach(channel: str, niche: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
"""Get leads that haven't been contacted yet via a specific channel."""
conn = get_connection()
cursor = conn.cursor()
query = "SELECT * FROM leads WHERE status = 'new'"
params = []
if channel == "whatsapp":
query += " AND whatsapp_sent = 0 AND phone IS NOT NULL"
elif channel == "instagram":
query += " AND instagram_sent = 0"
if niche:
query += " AND niche = ?"
params.append(niche)
query += " LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_stats() -> Dict[str, Any]:
"""Get statistics about the leads database."""
conn = get_connection()
cursor = conn.cursor()
stats = {}
cursor.execute("SELECT COUNT(*) FROM leads")
stats["total_leads"] = cursor.fetchone()[0]
cursor.execute("SELECT niche, COUNT(*) FROM leads GROUP BY niche")
stats["by_niche"] = dict(cursor.fetchall())
cursor.execute("SELECT country, COUNT(*) FROM leads GROUP BY country")
stats["by_country"] = dict(cursor.fetchall())
cursor.execute("SELECT status, COUNT(*) FROM leads GROUP BY status")
stats["by_status"] = dict(cursor.fetchall())
cursor.execute("SELECT COUNT(*) FROM leads WHERE whatsapp_sent = 1")
stats["whatsapp_sent"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM leads WHERE instagram_sent = 1")
stats["instagram_sent"] = cursor.fetchone()[0]
conn.close()
return stats
if __name__ == "__main__":
# Initialize the database when run directly
init_db()
# Show current stats
stats = get_stats()
print(f"\n[ESTADISTICAS] Estadisticas de la Base de Datos:")
print(f" Total leads: {stats['total_leads']}")
print(f" Por nicho: {stats['by_niche']}")
print(f" Por país: {stats['by_country']}")