from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, Query from fastapi.responses import FileResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pathlib import Path from sqlalchemy.orm import Session import pandas as pd import uuid import os import csv import io import concurrent.futures from typing import Dict, List import json import asyncio import math from datetime import datetime from .database import get_db, UploadedFile, Prompt, GeneratedSequence, SmartleadRun from .models import UploadResponse, PromptSaveRequest, SequenceResponse, SmartleadPushRequest, SmartleadRunResponse from .gpt_service import generate_email_sequence from .smartlead_client import SmartleadClient app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Create uploads directory UPLOAD_DIR = Path("/data/uploads") UPLOAD_DIR.mkdir(parents=True, exist_ok=True) # ---- API ---- @app.get("/api/health") def health(): return {"status": "ok"} @app.get("/api/hello") def hello(): return {"message": "Hello from FastAPI"} @app.post("/api/upload-csv") async def upload_csv(file: UploadFile = File(...), db: Session = Depends(get_db)): """Upload and parse CSV file from Apollo""" try: # Generate unique file ID file_id = str(uuid.uuid4()) file_path = UPLOAD_DIR / f"{file_id}.csv" # Save file content = await file.read() with open(file_path, "wb") as f: f.write(content) # Parse CSV to count contacts df = pd.read_csv(file_path) contact_count = len(df) # Save to database db_file = UploadedFile( file_id=file_id, filename=file.filename, contact_count=contact_count, file_path=str(file_path) ) db.add(db_file) db.commit() return { "file_id": file_id, "contact_count": contact_count, "message": "File uploaded successfully" } except Exception as e: raise HTTPException(status_code=500, detail=f"Error uploading file: {str(e)}") @app.post("/api/save-prompts") async def save_prompts(request: PromptSaveRequest, db: Session = Depends(get_db)): """Save prompt templates for products""" try: # Delete existing prompts for this file db.query(Prompt).filter(Prompt.file_id == request.file_id).delete() # Save new prompts for product_name, prompt_template in request.prompts.items(): prompt = Prompt( file_id=request.file_id, product_name=product_name, prompt_template=prompt_template ) db.add(prompt) db.commit() return {"message": "Prompts saved successfully"} except Exception as e: raise HTTPException(status_code=500, detail=f"Error saving prompts: {str(e)}") @app.get("/api/generation-status") async def generation_status(file_id: str = Query(...), db: Session = Depends(get_db)): """Return progress for sequence generation (so frontend can resume after sleep/reconnect).""" db_file = db.query(UploadedFile).filter(UploadedFile.file_id == file_id).first() if not db_file: raise HTTPException(status_code=404, detail="File not found") total_contacts = db_file.contact_count or 0 completed = ( db.query(GeneratedSequence.sequence_id) .filter(GeneratedSequence.file_id == file_id) .distinct() .count() ) return { "file_id": file_id, "total_contacts": total_contacts, "completed_count": completed, "is_complete": total_contacts > 0 and completed >= total_contacts, } @app.get("/api/sequences") async def get_sequences(file_id: str = Query(...), db: Session = Depends(get_db)): """Return all generated sequences for a file (for catch-up after reconnect).""" sequences = ( db.query(GeneratedSequence) .filter(GeneratedSequence.file_id == file_id) .order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number) .all() ) out = [] for seq in sequences: out.append({ "id": seq.sequence_id, "emailNumber": seq.email_number, "firstName": seq.first_name, "lastName": seq.last_name, "email": seq.email, "company": seq.company, "title": seq.title or "", "product": seq.product, "subject": seq.subject, "emailContent": seq.email_content, }) return {"sequences": out} @app.get("/api/generate-sequences") async def generate_sequences( file_id: str = Query(...), reset: bool = Query(True), db: Session = Depends(get_db), ): """Generate email sequences using GPT with Server-Sent Events streaming. Use reset=1 for a fresh run (clears existing). Use reset=0 to resume after disconnect/sleep.""" async def event_generator(): try: db_file = db.query(UploadedFile).filter(UploadedFile.file_id == file_id).first() if not db_file: yield f"data: {json.dumps({'type': 'error', 'error': 'File not found'})}\n\n" return df = pd.read_csv(db_file.file_path) prompts = db.query(Prompt).filter(Prompt.file_id == file_id).all() prompt_dict = {p.product_name: p.prompt_template for p in prompts} if not prompt_dict: yield f"data: {json.dumps({'type': 'error', 'error': 'No prompts found'})}\n\n" return products = list(prompt_dict.keys()) if reset: db.query(GeneratedSequence).filter(GeneratedSequence.file_id == file_id).delete() db.commit() total_contacts = len(df) sequence_id = 1 for idx, row in df.iterrows(): existing = ( db.query(GeneratedSequence) .filter( GeneratedSequence.file_id == file_id, GeneratedSequence.sequence_id == sequence_id, ) .order_by(GeneratedSequence.email_number) .all() ) if existing: for seq in existing: sequence_response = { "id": seq.sequence_id, "emailNumber": seq.email_number, "firstName": seq.first_name, "lastName": seq.last_name, "email": seq.email, "company": seq.company, "title": seq.title or "", "product": seq.product, "subject": seq.subject, "emailContent": seq.email_content, } yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n" progress = min(100, max(0, (sequence_id / total_contacts) * 100)) if total_contacts > 0 else 0 yield f"data: {json.dumps({'type': 'progress', 'progress': float(progress)})}\n\n" sequence_id += 1 await asyncio.sleep(0.05) continue contact = row.to_dict() product_name = products[sequence_id % len(products)] prompt_template = prompt_dict[product_name] loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as executor: sequence_data_list = await loop.run_in_executor( executor, generate_email_sequence, contact, prompt_template, product_name, ) for seq_data in sequence_data_list: db_sequence = GeneratedSequence( file_id=file_id, sequence_id=sequence_id, email_number=seq_data["email_number"], first_name=seq_data["first_name"], last_name=seq_data["last_name"], email=seq_data["email"], company=seq_data["company"], title=seq_data.get("title", ""), product=seq_data["product"], subject=seq_data["subject"], email_content=seq_data["email_content"], ) db.add(db_sequence) db.commit() for seq_data in sequence_data_list: sequence_response = { "id": sequence_id, "emailNumber": seq_data["email_number"], "firstName": seq_data["first_name"], "lastName": seq_data["last_name"], "email": seq_data["email"], "company": seq_data["company"], "title": seq_data.get("title", ""), "product": seq_data["product"], "subject": seq_data["subject"], "emailContent": seq_data["email_content"], } yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n" progress = min(100, max(0, (sequence_id / total_contacts) * 100)) if total_contacts > 0 else 0 yield f"data: {json.dumps({'type': 'progress', 'progress': float(progress)})}\n\n" sequence_id += 1 await asyncio.sleep(0.1) yield f"data: {json.dumps({'type': 'complete'})}\n\n" except Exception as e: yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") @app.get("/api/download-sequences") async def download_sequences(file_id: str = Query(...), db: Session = Depends(get_db)): """Download generated sequences as CSV with all subject/body fields""" try: # Get all sequences for this file, grouped by contact sequences = db.query(GeneratedSequence).filter( GeneratedSequence.file_id == file_id ).order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number).all() if not sequences: raise HTTPException(status_code=404, detail="No sequences found") # Group sequences by contact contacts = {} max_email_number = 0 # Track the maximum email number across all contacts for seq in sequences: contact_key = f"{seq.sequence_id}" if contact_key not in contacts: contacts[contact_key] = { 'first_name': seq.first_name, 'last_name': seq.last_name, 'email': seq.email, 'company': seq.company, 'title': seq.title or '', 'product': seq.product, 'subjects': {}, 'bodies': {} } contacts[contact_key]['subjects'][seq.email_number] = seq.subject contacts[contact_key]['bodies'][seq.email_number] = seq.email_content # Track the maximum email number if seq.email_number > max_email_number: max_email_number = seq.email_number # Create CSV in memory # Use QUOTE_MINIMAL to properly quote fields with newlines for Smartlead output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) # Write header – fixed format: subject1, body1, subject2, body2, ... (separate columns, not merged) header = ['First Name', 'Last Name', 'Email', 'Company', 'Title', 'Product'] for i in range(1, max_email_number + 1): header.extend([f'subject{i}', f'body{i}']) writer.writerow(header) # Write rows (one row per contact with subjects/bodies up to max_email_number) # Preserve newlines in email bodies for proper formatting in Smartlead for contact in contacts.values(): row = [ contact['first_name'], contact['last_name'], contact['email'], contact['company'], contact['title'], contact['product'] ] for i in range(1, max_email_number + 1): subject = contact['subjects'].get(i, '') or '' body = contact['bodies'].get(i, '') or '' # Convert newlines to HTML
tags for Smartlead to properly render line breaks # Smartlead's rich text editor expects HTML formatting if body: # Replace newlines with
tags for HTML rendering in Smartlead body_html = body.replace('\n', '
').replace('\r', '') else: body_html = '' row.append(subject) row.append(body_html) writer.writerow(row) output.seek(0) # Return as downloadable file (fixed format: one row per contact, separate subject/body columns) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={ "Content-Disposition": "attachment; filename=email_sequences_fixed.csv" } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error downloading sequences: {str(e)}") @app.get("/api/smartlead-campaigns") async def get_smartlead_campaigns(): """Get list of campaigns from Smartlead""" try: client = SmartleadClient() campaigns = client.get_campaigns() # Format campaigns for frontend formatted_campaigns = [] for campaign in campaigns: campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId') campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unnamed Campaign' if campaign_id: formatted_campaigns.append({ "id": str(campaign_id), "name": campaign_name }) return {"campaigns": formatted_campaigns} except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to fetch campaigns: {str(e)}") @app.post("/api/push-to-smartlead") async def push_to_smartlead(request: SmartleadPushRequest, db: Session = Depends(get_db)): """Push generated sequences to Smartlead campaign (add leads to existing campaign)""" import uuid try: # Get all sequences for this file sequences = db.query(GeneratedSequence).filter( GeneratedSequence.file_id == request.file_id ).order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number).all() if not sequences: raise HTTPException(status_code=404, detail="No sequences found") # Group sequences by contact contacts = {} for seq in sequences: contact_key = f"{seq.sequence_id}" if contact_key not in contacts: contacts[contact_key] = { 'first_name': seq.first_name, 'last_name': seq.last_name, 'email': seq.email, 'company': seq.company, 'title': seq.title or '', 'subjects': {}, 'bodies': {} } contacts[contact_key]['subjects'][seq.email_number] = seq.subject contacts[contact_key]['bodies'][seq.email_number] = seq.email_content # Initialize Smartlead client try: client = SmartleadClient() except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Get campaign name for logging campaigns = client.get_campaigns() campaign_name = 'Unknown Campaign' for campaign in campaigns: campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId') if str(campaign_id) == str(request.campaign_id): campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unknown Campaign' break # Create run record run_id = str(uuid.uuid4()) run = SmartleadRun( run_id=run_id, file_id=request.file_id, mode='existing', # Always 'existing' now campaign_id=request.campaign_id, campaign_name=campaign_name, steps_count=0, # Not needed for existing campaigns dry_run=1 if request.dry_run else 0, total_leads=len(contacts), status='pending' ) db.add(run) db.commit() if request.dry_run: # Dry run - just return what would be sent return { "run_id": run_id, "campaign_id": request.campaign_id, "campaign_name": campaign_name, "total": len(contacts), "added": 0, "skipped": 0, "failed": 0, "errors": [], "status": "dry_run_completed", "message": "Dry run completed. No leads were sent to Smartlead." } campaign_id = request.campaign_id # Prepare leads for Smartlead leads = [] errors = [] added_count = 0 skipped_count = 0 failed_count = 0 for contact in contacts.values(): try: # Validate required fields if not contact.get('email'): errors.append({ "email": contact.get('email', 'unknown'), "error": "Missing email address" }) failed_count += 1 continue # Helper function to safely convert to string and strip def safe_str(value): if value is None: return "" if isinstance(value, float): # Handle NaN and other float values if math.isnan(value): return "" return str(value).strip() return str(value).strip() if value else "" # Ensure first_name and last_name are not None/empty/float first_name = safe_str(contact.get('first_name', '')) or 'Contact' last_name = safe_str(contact.get('last_name', '')) company = safe_str(contact.get('company', '')) title = safe_str(contact.get('title', '')) email = safe_str(contact.get('email', '')) if not email: errors.append({ "email": "unknown", "error": "Missing email address" }) failed_count += 1 continue # Build lead object - Smartlead API doesn't allow "company", "title", or custom variables when adding leads # Custom variables should be set manually in Smartlead via CSV import lead = { "email": email, "first_name": first_name, "last_name": last_name } leads.append(lead) except Exception as e: errors.append({ "email": contact.get('email', 'unknown'), "error": str(e) }) failed_count += 1 # Add leads to campaign (chunk in batches of 50) batch_size = 50 for i in range(0, len(leads), batch_size): batch = leads[i:i + batch_size] try: response = client.add_leads_to_campaign(campaign_id, batch) added_count += len(batch) except Exception as e: # Mark batch as failed for lead in batch: errors.append({ "email": lead.get('email', 'unknown'), "error": f"Failed to add lead: {str(e)}" }) failed_count += 1 # Update run record run.status = 'completed' run.added_leads = added_count run.skipped_leads = skipped_count run.failed_leads = failed_count run.error_details = json.dumps(errors) if errors else None run.completed_at = datetime.utcnow() db.commit() # No warning needed - custom variables are now passed as direct fields return { "run_id": run_id, "campaign_id": campaign_id, "campaign_name": campaign_name, "total": len(contacts), "added": added_count, "skipped": skipped_count, "failed": failed_count, "errors": errors[:10], # Return first 10 errors "status": "completed" } except HTTPException: raise except Exception as e: if 'run' in locals(): run.status = 'failed' run.error_details = str(e) db.commit() raise HTTPException(status_code=500, detail=f"Error pushing to Smartlead: {str(e)}") @app.get("/api/smartlead-runs") async def get_smartlead_runs(file_id: str = Query(None), db: Session = Depends(get_db)): """Get Smartlead run history""" try: query = db.query(SmartleadRun) if file_id: query = query.filter(SmartleadRun.file_id == file_id) runs = query.order_by(SmartleadRun.created_at.desc()).limit(50).all() return [ { "run_id": run.run_id, "file_id": run.file_id, "campaign_id": run.campaign_id, "campaign_name": run.campaign_name, "mode": run.mode, "steps_count": run.steps_count, "dry_run": bool(run.dry_run), "total_leads": run.total_leads, "added_leads": run.added_leads, "skipped_leads": run.skipped_leads, "failed_leads": run.failed_leads, "status": run.status, "created_at": run.created_at.isoformat() if run.created_at else None, "completed_at": run.completed_at.isoformat() if run.completed_at else None } for run in runs ] except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching runs: {str(e)}") # ---- Frontend static serving ---- FRONTEND_DIST = Path(__file__).resolve().parents[2] / "frontend" / "dist" INDEX_FILE = FRONTEND_DIST / "index.html" if FRONTEND_DIST.exists(): app.mount("/", StaticFiles(directory=str(FRONTEND_DIST), html=True), name="static") # SPA fallback: any non-/api route should return React index.html @app.get("/{full_path:path}") def spa_fallback(full_path: str): if full_path.startswith("api/"): return {"detail": "Not Found"} return FileResponse(str(INDEX_FILE))