Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI application β Lead Generation & Outreach System. | |
| Provides REST endpoints for scraping Google Maps leads, | |
| managing leads/templates, and exporting data. | |
| """ | |
| import logging | |
| import os | |
| import threading | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import FastAPI, HTTPException, Query, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import Response, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| import re | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import database as db | |
| from models import ( | |
| ScrapeRequest, | |
| ScrapeStatusResponse, | |
| LeadUpdate, | |
| TemplateCreate, | |
| TemplateUpdate, | |
| DashboardStats, | |
| ) | |
| from scraper import scrape_google_maps | |
| # ββββββββββββββββββββββββ Logging ββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ββββββββββββββββββββββββ App Setup ββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="Lead Generation System", | |
| description="Scrape Google Maps leads, manage them, and send WhatsApp outreach.", | |
| version="1.0.0", | |
| ) | |
| # CORS β allow frontend to call the API | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize database on startup | |
| def startup(): | |
| db.init_db() | |
| logger.info("Database initialized β") | |
| # ββββββββββββββββββββββββ Scraper State ββββββββββββββββββββββββ | |
| scrape_state = { | |
| "status": "idle", # idle | running | completed | failed | |
| "total_found": 0, | |
| "message": "", | |
| "lock": threading.Lock(), | |
| } | |
| def fetch_socials_background(lead_id: int, website: str): | |
| """Scrape website for email, instagram, linkedin.""" | |
| if not website: | |
| return | |
| try: | |
| if not website.startswith('http'): | |
| website = 'https://' + website | |
| headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'} | |
| r = requests.get(website, headers=headers, timeout=10) | |
| soup = BeautifulSoup(r.text, 'html.parser') | |
| email = None | |
| instagram = None | |
| linkedin = None | |
| for a in soup.find_all('a', href=True): | |
| href = a['href'] | |
| if href.startswith('mailto:') and not email: | |
| email = href.replace('mailto:', '').split('?')[0].strip() | |
| if 'instagram.com' in href and not instagram: | |
| instagram = href.strip() | |
| if 'linkedin.com' in href and not linkedin: | |
| linkedin = href.strip() | |
| if not email: | |
| matches = re.findall(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', r.text) | |
| if matches: | |
| # filter out common false positives like image extensions | |
| for m in matches: | |
| if not any(m.endswith(x) for x in ['.png', '.jpg', '.jpeg', '.gif', '.webp']): | |
| email = m | |
| break | |
| updates = {} | |
| if email: updates['email'] = email | |
| if instagram: updates['instagram'] = instagram | |
| if linkedin: updates['linkedin'] = linkedin | |
| if updates: | |
| db.update_lead(lead_id, **updates) | |
| logger.info(f"Socials found for lead {lead_id}: {updates}") | |
| except Exception as e: | |
| logger.warning(f"website scrape failed for {website}: {e}") | |
| def _run_scrape_task(query: str, limit: int, lat: float = None, lng: float = None, zoom: int = 14): | |
| """Background scraping task.""" | |
| with scrape_state["lock"]: | |
| scrape_state["status"] = "running" | |
| scrape_state["total_found"] = 0 | |
| scrape_state["message"] = f"Scraping: {query}" | |
| def on_progress(count, msg): | |
| scrape_state["total_found"] = count | |
| scrape_state["message"] = msg | |
| try: | |
| leads = scrape_google_maps(query, limit, lat, lng, zoom, progress_callback=on_progress) | |
| # Insert leads into database | |
| added = 0 | |
| for lead in leads: | |
| if lead.get("name"): | |
| lead_id = db.insert_lead( | |
| name=lead["name"], | |
| phone=lead.get("phone"), | |
| address=lead.get("address"), | |
| website=lead.get("website"), | |
| ) | |
| added += 1 | |
| # trigger website scrape if there is a website | |
| if lead.get("website"): | |
| threading.Thread(target=fetch_socials_background, args=(lead_id, lead.get("website")), daemon=True).start() | |
| with scrape_state["lock"]: | |
| scrape_state["status"] = "completed" | |
| scrape_state["total_found"] = added | |
| scrape_state["message"] = f"Done! Added {added} leads from '{query}'" | |
| logger.info(f"Scrape complete: {added} leads added for '{query}'") | |
| except Exception as e: | |
| logger.error(f"Scrape failed: {e}") | |
| with scrape_state["lock"]: | |
| scrape_state["status"] = "failed" | |
| scrape_state["message"] = f"Error: {str(e)}" | |
| # ββββββββββββββββββββββββ Scraper Endpoints ββββββββββββββββββββββββ | |
| def trigger_scrape(req: ScrapeRequest, background_tasks: BackgroundTasks): | |
| """Start a scraping job in the background.""" | |
| if scrape_state["status"] == "running": | |
| raise HTTPException(status_code=409, detail="A scrape is already running") | |
| background_tasks.add_task(_run_scrape_task, req.query, req.limit, req.lat, req.lng, req.zoom) | |
| return ScrapeStatusResponse(status="running", message=f"Started scraping: {req.query}") | |
| def get_scrape_status(): | |
| """Get the current scraping job status.""" | |
| return ScrapeStatusResponse( | |
| status=scrape_state["status"], | |
| total_found=scrape_state["total_found"], | |
| message=scrape_state["message"], | |
| ) | |
| # ββββββββββββββββββββββββ Leads Endpoints ββββββββββββββββββββββββ | |
| def get_leads( | |
| search: Optional[str] = Query(None, description="Search by name, phone, or address"), | |
| status: Optional[str] = Query(None, description="Filter by status"), | |
| sort_by: str = Query("created_at", description="Sort field"), | |
| sort_order: str = Query("desc", description="Sort order: asc or desc"), | |
| ): | |
| """Get all leads with optional search, filter, and sort.""" | |
| leads = db.get_all_leads(search=search, status=status, sort_by=sort_by, sort_order=sort_order) | |
| return {"leads": leads, "count": len(leads)} | |
| # NOTE: Static paths must be defined BEFORE parameterized /{lead_id} paths | |
| def export_csv(status: Optional[str] = Query(None)): | |
| """Export leads to CSV file.""" | |
| csv_data = db.export_leads_csv(status=status) | |
| return Response( | |
| content=csv_data, | |
| media_type="text/csv", | |
| headers={"Content-Disposition": "attachment; filename=leads_export.csv"}, | |
| ) | |
| def get_lead(lead_id: int): | |
| """Get a single lead by ID.""" | |
| lead = db.get_lead_by_id(lead_id) | |
| if not lead: | |
| raise HTTPException(status_code=404, detail="Lead not found") | |
| return lead | |
| def update_lead(lead_id: int, data: LeadUpdate): | |
| """Update a lead's status, notes, etc.""" | |
| update_data = data.model_dump(exclude_unset=True) | |
| if not update_data: | |
| raise HTTPException(status_code=400, detail="No fields to update") | |
| success = db.update_lead(lead_id, **update_data) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Lead not found") | |
| return {"message": "Lead updated", "lead": db.get_lead_by_id(lead_id)} | |
| def delete_lead(lead_id: int): | |
| """Delete a lead.""" | |
| success = db.delete_lead(lead_id) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Lead not found") | |
| return {"message": "Lead deleted"} | |
| # ββββββββββββββββββββββββ Stats Endpoint ββββββββββββββββββββββββ | |
| def get_stats(): | |
| """Get dashboard statistics.""" | |
| return db.get_dashboard_stats() | |
| # ββββββββββββββββββββββββ Template Endpoints ββββββββββββββββββββββββ | |
| def get_templates(): | |
| """Get all message templates.""" | |
| templates = db.get_all_templates() | |
| return {"templates": templates} | |
| def create_template(data: TemplateCreate): | |
| """Create a new message template.""" | |
| tid = db.create_template(data.name, data.content) | |
| return {"message": "Template created", "id": tid} | |
| def update_template(template_id: int, data: TemplateUpdate): | |
| """Update a message template.""" | |
| update_data = data.model_dump(exclude_unset=True) | |
| success = db.update_template(template_id, **update_data) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Template not found") | |
| return {"message": "Template updated"} | |
| def delete_template(template_id: int): | |
| """Delete a message template.""" | |
| success = db.delete_template(template_id) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Template not found") | |
| return {"message": "Template deleted"} | |
| # ββββββββββββββββββββββββ Serve Frontend ββββββββββββββββββββββββ | |
| # Resolve the frontend directory relative to this file | |
| FRONTEND_DIR = Path(__file__).resolve().parent.parent / "frontend" | |
| if FRONTEND_DIR.exists(): | |
| # Serve static files (CSS, JS) from /static | |
| app.mount("/static", StaticFiles(directory=str(FRONTEND_DIR)), name="static") | |
| def serve_index(): | |
| """Serve the main frontend page.""" | |
| return FileResponse(str(FRONTEND_DIR / "index.html")) | |
| def serve_frontend(path: str): | |
| """Serve frontend static files (CSS, JS, etc).""" | |
| file_path = FRONTEND_DIR / path | |
| if file_path.exists() and file_path.is_file(): | |
| return FileResponse(str(file_path)) | |
| return FileResponse(str(FRONTEND_DIR / "index.html")) | |
| # ββββββββββββββββββββββββ Run ββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run("main:app", host="0.0.0.0", port=port, reload=True) | |