| | from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, Query |
| | from fastapi.responses import FileResponse, StreamingResponse |
| | from fastapi.staticfiles import StaticFiles |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from pathlib import Path |
| | from sqlalchemy.orm import Session |
| | import pandas as pd |
| | import uuid |
| | import os |
| | import csv |
| | import io |
| | import concurrent.futures |
| | from typing import Dict, List |
| | import json |
| | import asyncio |
| | import math |
| | from datetime import datetime |
| |
|
| | from .database import get_db, UploadedFile, Prompt, GeneratedSequence, SmartleadRun |
| | from .models import UploadResponse, PromptSaveRequest, SequenceResponse, SmartleadPushRequest, SmartleadRunResponse |
| | from .gpt_service import generate_email_sequence |
| | from .smartlead_client import SmartleadClient |
| |
|
| | app = FastAPI() |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | |
| | UPLOAD_DIR = Path("/data/uploads") |
| | UPLOAD_DIR.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | @app.get("/api/health") |
| | def health(): |
| | return {"status": "ok"} |
| |
|
| | @app.get("/api/hello") |
| | def hello(): |
| | return {"message": "Hello from FastAPI"} |
| |
|
| |
|
| | @app.post("/api/upload-csv") |
| | async def upload_csv(file: UploadFile = File(...), db: Session = Depends(get_db)): |
| | """Upload and parse CSV file from Apollo""" |
| | try: |
| | |
| | file_id = str(uuid.uuid4()) |
| | file_path = UPLOAD_DIR / f"{file_id}.csv" |
| | |
| | |
| | content = await file.read() |
| | with open(file_path, "wb") as f: |
| | f.write(content) |
| | |
| | |
| | df = pd.read_csv(file_path) |
| | contact_count = len(df) |
| | |
| | |
| | db_file = UploadedFile( |
| | file_id=file_id, |
| | filename=file.filename, |
| | contact_count=contact_count, |
| | file_path=str(file_path) |
| | ) |
| | db.add(db_file) |
| | db.commit() |
| | |
| | return { |
| | "file_id": file_id, |
| | "contact_count": contact_count, |
| | "message": "File uploaded successfully" |
| | } |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=f"Error uploading file: {str(e)}") |
| |
|
| |
|
| | @app.post("/api/save-prompts") |
| | async def save_prompts(request: PromptSaveRequest, db: Session = Depends(get_db)): |
| | """Save prompt templates for products""" |
| | try: |
| | |
| | db.query(Prompt).filter(Prompt.file_id == request.file_id).delete() |
| | |
| | |
| | for product_name, prompt_template in request.prompts.items(): |
| | prompt = Prompt( |
| | file_id=request.file_id, |
| | product_name=product_name, |
| | prompt_template=prompt_template |
| | ) |
| | db.add(prompt) |
| | |
| | db.commit() |
| | return {"message": "Prompts saved successfully"} |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=f"Error saving prompts: {str(e)}") |
| |
|
| |
|
| | @app.get("/api/generation-status") |
| | async def generation_status(file_id: str = Query(...), db: Session = Depends(get_db)): |
| | """Return progress for sequence generation (so frontend can resume after sleep/reconnect).""" |
| | db_file = db.query(UploadedFile).filter(UploadedFile.file_id == file_id).first() |
| | if not db_file: |
| | raise HTTPException(status_code=404, detail="File not found") |
| | total_contacts = db_file.contact_count or 0 |
| | completed = ( |
| | db.query(GeneratedSequence.sequence_id) |
| | .filter(GeneratedSequence.file_id == file_id) |
| | .distinct() |
| | .count() |
| | ) |
| | return { |
| | "file_id": file_id, |
| | "total_contacts": total_contacts, |
| | "completed_count": completed, |
| | "is_complete": total_contacts > 0 and completed >= total_contacts, |
| | } |
| |
|
| |
|
| | @app.get("/api/sequences") |
| | async def get_sequences(file_id: str = Query(...), db: Session = Depends(get_db)): |
| | """Return all generated sequences for a file (for catch-up after reconnect).""" |
| | sequences = ( |
| | db.query(GeneratedSequence) |
| | .filter(GeneratedSequence.file_id == file_id) |
| | .order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number) |
| | .all() |
| | ) |
| | out = [] |
| | for seq in sequences: |
| | out.append({ |
| | "id": seq.sequence_id, |
| | "emailNumber": seq.email_number, |
| | "firstName": seq.first_name, |
| | "lastName": seq.last_name, |
| | "email": seq.email, |
| | "company": seq.company, |
| | "title": seq.title or "", |
| | "product": seq.product, |
| | "subject": seq.subject, |
| | "emailContent": seq.email_content, |
| | }) |
| | return {"sequences": out} |
| |
|
| |
|
| | @app.get("/api/generate-sequences") |
| | async def generate_sequences( |
| | file_id: str = Query(...), |
| | reset: bool = Query(True), |
| | db: Session = Depends(get_db), |
| | ): |
| | """Generate email sequences using GPT with Server-Sent Events streaming. |
| | Use reset=1 for a fresh run (clears existing). Use reset=0 to resume after disconnect/sleep.""" |
| | |
| | async def event_generator(): |
| | try: |
| | db_file = db.query(UploadedFile).filter(UploadedFile.file_id == file_id).first() |
| | if not db_file: |
| | yield f"data: {json.dumps({'type': 'error', 'error': 'File not found'})}\n\n" |
| | return |
| | |
| | df = pd.read_csv(db_file.file_path) |
| | prompts = db.query(Prompt).filter(Prompt.file_id == file_id).all() |
| | prompt_dict = {p.product_name: p.prompt_template for p in prompts} |
| | |
| | if not prompt_dict: |
| | yield f"data: {json.dumps({'type': 'error', 'error': 'No prompts found'})}\n\n" |
| | return |
| | |
| | products = list(prompt_dict.keys()) |
| | if reset: |
| | db.query(GeneratedSequence).filter(GeneratedSequence.file_id == file_id).delete() |
| | db.commit() |
| | |
| | total_contacts = len(df) |
| | sequence_id = 1 |
| | |
| | for idx, row in df.iterrows(): |
| | existing = ( |
| | db.query(GeneratedSequence) |
| | .filter( |
| | GeneratedSequence.file_id == file_id, |
| | GeneratedSequence.sequence_id == sequence_id, |
| | ) |
| | .order_by(GeneratedSequence.email_number) |
| | .all() |
| | ) |
| | if existing: |
| | for seq in existing: |
| | sequence_response = { |
| | "id": seq.sequence_id, |
| | "emailNumber": seq.email_number, |
| | "firstName": seq.first_name, |
| | "lastName": seq.last_name, |
| | "email": seq.email, |
| | "company": seq.company, |
| | "title": seq.title or "", |
| | "product": seq.product, |
| | "subject": seq.subject, |
| | "emailContent": seq.email_content, |
| | } |
| | yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n" |
| | progress = min(100, max(0, (sequence_id / total_contacts) * 100)) if total_contacts > 0 else 0 |
| | yield f"data: {json.dumps({'type': 'progress', 'progress': float(progress)})}\n\n" |
| | sequence_id += 1 |
| | await asyncio.sleep(0.05) |
| | continue |
| | |
| | contact = row.to_dict() |
| | product_name = products[sequence_id % len(products)] |
| | prompt_template = prompt_dict[product_name] |
| | |
| | loop = asyncio.get_event_loop() |
| | with concurrent.futures.ThreadPoolExecutor() as executor: |
| | sequence_data_list = await loop.run_in_executor( |
| | executor, |
| | generate_email_sequence, |
| | contact, |
| | prompt_template, |
| | product_name, |
| | ) |
| | |
| | for seq_data in sequence_data_list: |
| | db_sequence = GeneratedSequence( |
| | file_id=file_id, |
| | sequence_id=sequence_id, |
| | email_number=seq_data["email_number"], |
| | first_name=seq_data["first_name"], |
| | last_name=seq_data["last_name"], |
| | email=seq_data["email"], |
| | company=seq_data["company"], |
| | title=seq_data.get("title", ""), |
| | product=seq_data["product"], |
| | subject=seq_data["subject"], |
| | email_content=seq_data["email_content"], |
| | ) |
| | db.add(db_sequence) |
| | |
| | db.commit() |
| | |
| | for seq_data in sequence_data_list: |
| | sequence_response = { |
| | "id": sequence_id, |
| | "emailNumber": seq_data["email_number"], |
| | "firstName": seq_data["first_name"], |
| | "lastName": seq_data["last_name"], |
| | "email": seq_data["email"], |
| | "company": seq_data["company"], |
| | "title": seq_data.get("title", ""), |
| | "product": seq_data["product"], |
| | "subject": seq_data["subject"], |
| | "emailContent": seq_data["email_content"], |
| | } |
| | yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n" |
| | |
| | progress = min(100, max(0, (sequence_id / total_contacts) * 100)) if total_contacts > 0 else 0 |
| | yield f"data: {json.dumps({'type': 'progress', 'progress': float(progress)})}\n\n" |
| | sequence_id += 1 |
| | await asyncio.sleep(0.1) |
| | |
| | yield f"data: {json.dumps({'type': 'complete'})}\n\n" |
| | |
| | except Exception as e: |
| | yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n" |
| | |
| | return StreamingResponse(event_generator(), media_type="text/event-stream") |
| |
|
| |
|
| | @app.get("/api/download-sequences") |
| | async def download_sequences(file_id: str = Query(...), db: Session = Depends(get_db)): |
| | """Download generated sequences as CSV with all subject/body fields""" |
| | try: |
| | |
| | sequences = db.query(GeneratedSequence).filter( |
| | GeneratedSequence.file_id == file_id |
| | ).order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number).all() |
| | |
| | if not sequences: |
| | raise HTTPException(status_code=404, detail="No sequences found") |
| | |
| | |
| | contacts = {} |
| | max_email_number = 0 |
| | for seq in sequences: |
| | contact_key = f"{seq.sequence_id}" |
| | if contact_key not in contacts: |
| | contacts[contact_key] = { |
| | 'first_name': seq.first_name, |
| | 'last_name': seq.last_name, |
| | 'email': seq.email, |
| | 'company': seq.company, |
| | 'title': seq.title or '', |
| | 'product': seq.product, |
| | 'subjects': {}, |
| | 'bodies': {} |
| | } |
| | contacts[contact_key]['subjects'][seq.email_number] = seq.subject |
| | contacts[contact_key]['bodies'][seq.email_number] = seq.email_content |
| | |
| | if seq.email_number > max_email_number: |
| | max_email_number = seq.email_number |
| | |
| | |
| | |
| | output = io.StringIO() |
| | writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) |
| | |
| | |
| | header = ['First Name', 'Last Name', 'Email', 'Company', 'Title', 'Product'] |
| | for i in range(1, max_email_number + 1): |
| | header.extend([f'subject{i}', f'body{i}']) |
| | writer.writerow(header) |
| | |
| | |
| | |
| | for contact in contacts.values(): |
| | row = [ |
| | contact['first_name'], |
| | contact['last_name'], |
| | contact['email'], |
| | contact['company'], |
| | contact['title'], |
| | contact['product'] |
| | ] |
| | for i in range(1, max_email_number + 1): |
| | subject = contact['subjects'].get(i, '') or '' |
| | body = contact['bodies'].get(i, '') or '' |
| | |
| | |
| | |
| | if body: |
| | |
| | body_html = body.replace('\n', '<br>').replace('\r', '') |
| | else: |
| | body_html = '' |
| | |
| | row.append(subject) |
| | row.append(body_html) |
| | writer.writerow(row) |
| | |
| | output.seek(0) |
| | |
| | |
| | return StreamingResponse( |
| | iter([output.getvalue()]), |
| | media_type="text/csv", |
| | headers={ |
| | "Content-Disposition": "attachment; filename=email_sequences_fixed.csv" |
| | } |
| | ) |
| | |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=f"Error downloading sequences: {str(e)}") |
| |
|
| |
|
| | @app.get("/api/smartlead-campaigns") |
| | async def get_smartlead_campaigns(): |
| | """Get list of campaigns from Smartlead""" |
| | try: |
| | client = SmartleadClient() |
| | campaigns = client.get_campaigns() |
| | |
| | |
| | formatted_campaigns = [] |
| | for campaign in campaigns: |
| | campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId') |
| | campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unnamed Campaign' |
| | |
| | if campaign_id: |
| | formatted_campaigns.append({ |
| | "id": str(campaign_id), |
| | "name": campaign_name |
| | }) |
| | |
| | return {"campaigns": formatted_campaigns} |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=f"Failed to fetch campaigns: {str(e)}") |
| |
|
| |
|
| | @app.post("/api/push-to-smartlead") |
| | async def push_to_smartlead(request: SmartleadPushRequest, db: Session = Depends(get_db)): |
| | """Push generated sequences to Smartlead campaign (add leads to existing campaign)""" |
| | import uuid |
| | |
| | try: |
| | |
| | sequences = db.query(GeneratedSequence).filter( |
| | GeneratedSequence.file_id == request.file_id |
| | ).order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number).all() |
| | |
| | if not sequences: |
| | raise HTTPException(status_code=404, detail="No sequences found") |
| | |
| | |
| | contacts = {} |
| | for seq in sequences: |
| | contact_key = f"{seq.sequence_id}" |
| | if contact_key not in contacts: |
| | contacts[contact_key] = { |
| | 'first_name': seq.first_name, |
| | 'last_name': seq.last_name, |
| | 'email': seq.email, |
| | 'company': seq.company, |
| | 'title': seq.title or '', |
| | 'subjects': {}, |
| | 'bodies': {} |
| | } |
| | contacts[contact_key]['subjects'][seq.email_number] = seq.subject |
| | contacts[contact_key]['bodies'][seq.email_number] = seq.email_content |
| | |
| | |
| | try: |
| | client = SmartleadClient() |
| | except ValueError as e: |
| | raise HTTPException(status_code=400, detail=str(e)) |
| | |
| | |
| | campaigns = client.get_campaigns() |
| | campaign_name = 'Unknown Campaign' |
| | for campaign in campaigns: |
| | campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId') |
| | if str(campaign_id) == str(request.campaign_id): |
| | campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unknown Campaign' |
| | break |
| | |
| | |
| | run_id = str(uuid.uuid4()) |
| | run = SmartleadRun( |
| | run_id=run_id, |
| | file_id=request.file_id, |
| | mode='existing', |
| | campaign_id=request.campaign_id, |
| | campaign_name=campaign_name, |
| | steps_count=0, |
| | dry_run=1 if request.dry_run else 0, |
| | total_leads=len(contacts), |
| | status='pending' |
| | ) |
| | db.add(run) |
| | db.commit() |
| | |
| | if request.dry_run: |
| | |
| | return { |
| | "run_id": run_id, |
| | "campaign_id": request.campaign_id, |
| | "campaign_name": campaign_name, |
| | "total": len(contacts), |
| | "added": 0, |
| | "skipped": 0, |
| | "failed": 0, |
| | "errors": [], |
| | "status": "dry_run_completed", |
| | "message": "Dry run completed. No leads were sent to Smartlead." |
| | } |
| | |
| | campaign_id = request.campaign_id |
| | |
| | |
| | leads = [] |
| | errors = [] |
| | added_count = 0 |
| | skipped_count = 0 |
| | failed_count = 0 |
| | |
| | for contact in contacts.values(): |
| | try: |
| | |
| | if not contact.get('email'): |
| | errors.append({ |
| | "email": contact.get('email', 'unknown'), |
| | "error": "Missing email address" |
| | }) |
| | failed_count += 1 |
| | continue |
| | |
| | |
| | def safe_str(value): |
| | if value is None: |
| | return "" |
| | if isinstance(value, float): |
| | |
| | if math.isnan(value): |
| | return "" |
| | return str(value).strip() |
| | return str(value).strip() if value else "" |
| | |
| | |
| | first_name = safe_str(contact.get('first_name', '')) or 'Contact' |
| | last_name = safe_str(contact.get('last_name', '')) |
| | company = safe_str(contact.get('company', '')) |
| | title = safe_str(contact.get('title', '')) |
| | email = safe_str(contact.get('email', '')) |
| | |
| | if not email: |
| | errors.append({ |
| | "email": "unknown", |
| | "error": "Missing email address" |
| | }) |
| | failed_count += 1 |
| | continue |
| | |
| | |
| | |
| | lead = { |
| | "email": email, |
| | "first_name": first_name, |
| | "last_name": last_name |
| | } |
| | |
| | leads.append(lead) |
| | |
| | except Exception as e: |
| | errors.append({ |
| | "email": contact.get('email', 'unknown'), |
| | "error": str(e) |
| | }) |
| | failed_count += 1 |
| | |
| | |
| | batch_size = 50 |
| | for i in range(0, len(leads), batch_size): |
| | batch = leads[i:i + batch_size] |
| | try: |
| | response = client.add_leads_to_campaign(campaign_id, batch) |
| | added_count += len(batch) |
| | except Exception as e: |
| | |
| | for lead in batch: |
| | errors.append({ |
| | "email": lead.get('email', 'unknown'), |
| | "error": f"Failed to add lead: {str(e)}" |
| | }) |
| | failed_count += 1 |
| | |
| | |
| | run.status = 'completed' |
| | run.added_leads = added_count |
| | run.skipped_leads = skipped_count |
| | run.failed_leads = failed_count |
| | run.error_details = json.dumps(errors) if errors else None |
| | run.completed_at = datetime.utcnow() |
| | db.commit() |
| | |
| | |
| | |
| | return { |
| | "run_id": run_id, |
| | "campaign_id": campaign_id, |
| | "campaign_name": campaign_name, |
| | "total": len(contacts), |
| | "added": added_count, |
| | "skipped": skipped_count, |
| | "failed": failed_count, |
| | "errors": errors[:10], |
| | "status": "completed" |
| | } |
| | |
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | if 'run' in locals(): |
| | run.status = 'failed' |
| | run.error_details = str(e) |
| | db.commit() |
| | raise HTTPException(status_code=500, detail=f"Error pushing to Smartlead: {str(e)}") |
| |
|
| |
|
| | @app.get("/api/smartlead-runs") |
| | async def get_smartlead_runs(file_id: str = Query(None), db: Session = Depends(get_db)): |
| | """Get Smartlead run history""" |
| | try: |
| | query = db.query(SmartleadRun) |
| | if file_id: |
| | query = query.filter(SmartleadRun.file_id == file_id) |
| | |
| | runs = query.order_by(SmartleadRun.created_at.desc()).limit(50).all() |
| | |
| | return [ |
| | { |
| | "run_id": run.run_id, |
| | "file_id": run.file_id, |
| | "campaign_id": run.campaign_id, |
| | "campaign_name": run.campaign_name, |
| | "mode": run.mode, |
| | "steps_count": run.steps_count, |
| | "dry_run": bool(run.dry_run), |
| | "total_leads": run.total_leads, |
| | "added_leads": run.added_leads, |
| | "skipped_leads": run.skipped_leads, |
| | "failed_leads": run.failed_leads, |
| | "status": run.status, |
| | "created_at": run.created_at.isoformat() if run.created_at else None, |
| | "completed_at": run.completed_at.isoformat() if run.completed_at else None |
| | } |
| | for run in runs |
| | ] |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=f"Error fetching runs: {str(e)}") |
| |
|
| |
|
| | |
| | FRONTEND_DIST = Path(__file__).resolve().parents[2] / "frontend" / "dist" |
| | INDEX_FILE = FRONTEND_DIST / "index.html" |
| |
|
| | if FRONTEND_DIST.exists(): |
| | app.mount("/", StaticFiles(directory=str(FRONTEND_DIST), html=True), name="static") |
| |
|
| | |
| | @app.get("/{full_path:path}") |
| | def spa_fallback(full_path: str): |
| | if full_path.startswith("api/"): |
| | return {"detail": "Not Found"} |
| | return FileResponse(str(INDEX_FILE)) |
| |
|