Seth
update
952e292
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, Query
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pathlib import Path
from sqlalchemy.orm import Session
import pandas as pd
import uuid
import os
import csv
import io
import concurrent.futures
from typing import Dict, List
import json
import asyncio
import math
from datetime import datetime
from .database import get_db, UploadedFile, Prompt, GeneratedSequence, SmartleadRun
from .models import UploadResponse, PromptSaveRequest, SequenceResponse, SmartleadPushRequest, SmartleadRunResponse
from .gpt_service import generate_email_sequence
from .smartlead_client import SmartleadClient
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create uploads directory
UPLOAD_DIR = Path("/data/uploads")
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# ---- API ----
@app.get("/api/health")
def health():
return {"status": "ok"}
@app.get("/api/hello")
def hello():
return {"message": "Hello from FastAPI"}
@app.post("/api/upload-csv")
async def upload_csv(file: UploadFile = File(...), db: Session = Depends(get_db)):
"""Upload and parse CSV file from Apollo"""
try:
# Generate unique file ID
file_id = str(uuid.uuid4())
file_path = UPLOAD_DIR / f"{file_id}.csv"
# Save file
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
# Parse CSV to count contacts
df = pd.read_csv(file_path)
contact_count = len(df)
# Save to database
db_file = UploadedFile(
file_id=file_id,
filename=file.filename,
contact_count=contact_count,
file_path=str(file_path)
)
db.add(db_file)
db.commit()
return {
"file_id": file_id,
"contact_count": contact_count,
"message": "File uploaded successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error uploading file: {str(e)}")
@app.post("/api/save-prompts")
async def save_prompts(request: PromptSaveRequest, db: Session = Depends(get_db)):
"""Save prompt templates for products"""
try:
# Delete existing prompts for this file
db.query(Prompt).filter(Prompt.file_id == request.file_id).delete()
# Save new prompts
for product_name, prompt_template in request.prompts.items():
prompt = Prompt(
file_id=request.file_id,
product_name=product_name,
prompt_template=prompt_template
)
db.add(prompt)
db.commit()
return {"message": "Prompts saved successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error saving prompts: {str(e)}")
@app.get("/api/generation-status")
async def generation_status(file_id: str = Query(...), db: Session = Depends(get_db)):
"""Return progress for sequence generation (so frontend can resume after sleep/reconnect)."""
db_file = db.query(UploadedFile).filter(UploadedFile.file_id == file_id).first()
if not db_file:
raise HTTPException(status_code=404, detail="File not found")
total_contacts = db_file.contact_count or 0
completed = (
db.query(GeneratedSequence.sequence_id)
.filter(GeneratedSequence.file_id == file_id)
.distinct()
.count()
)
return {
"file_id": file_id,
"total_contacts": total_contacts,
"completed_count": completed,
"is_complete": total_contacts > 0 and completed >= total_contacts,
}
@app.get("/api/sequences")
async def get_sequences(file_id: str = Query(...), db: Session = Depends(get_db)):
"""Return all generated sequences for a file (for catch-up after reconnect)."""
sequences = (
db.query(GeneratedSequence)
.filter(GeneratedSequence.file_id == file_id)
.order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number)
.all()
)
out = []
for seq in sequences:
out.append({
"id": seq.sequence_id,
"emailNumber": seq.email_number,
"firstName": seq.first_name,
"lastName": seq.last_name,
"email": seq.email,
"company": seq.company,
"title": seq.title or "",
"product": seq.product,
"subject": seq.subject,
"emailContent": seq.email_content,
})
return {"sequences": out}
@app.get("/api/generate-sequences")
async def generate_sequences(
file_id: str = Query(...),
reset: bool = Query(True),
db: Session = Depends(get_db),
):
"""Generate email sequences using GPT with Server-Sent Events streaming.
Use reset=1 for a fresh run (clears existing). Use reset=0 to resume after disconnect/sleep."""
async def event_generator():
try:
db_file = db.query(UploadedFile).filter(UploadedFile.file_id == file_id).first()
if not db_file:
yield f"data: {json.dumps({'type': 'error', 'error': 'File not found'})}\n\n"
return
df = pd.read_csv(db_file.file_path)
prompts = db.query(Prompt).filter(Prompt.file_id == file_id).all()
prompt_dict = {p.product_name: p.prompt_template for p in prompts}
if not prompt_dict:
yield f"data: {json.dumps({'type': 'error', 'error': 'No prompts found'})}\n\n"
return
products = list(prompt_dict.keys())
if reset:
db.query(GeneratedSequence).filter(GeneratedSequence.file_id == file_id).delete()
db.commit()
total_contacts = len(df)
sequence_id = 1
for idx, row in df.iterrows():
existing = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.file_id == file_id,
GeneratedSequence.sequence_id == sequence_id,
)
.order_by(GeneratedSequence.email_number)
.all()
)
if existing:
for seq in existing:
sequence_response = {
"id": seq.sequence_id,
"emailNumber": seq.email_number,
"firstName": seq.first_name,
"lastName": seq.last_name,
"email": seq.email,
"company": seq.company,
"title": seq.title or "",
"product": seq.product,
"subject": seq.subject,
"emailContent": seq.email_content,
}
yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n"
progress = min(100, max(0, (sequence_id / total_contacts) * 100)) if total_contacts > 0 else 0
yield f"data: {json.dumps({'type': 'progress', 'progress': float(progress)})}\n\n"
sequence_id += 1
await asyncio.sleep(0.05)
continue
contact = row.to_dict()
product_name = products[sequence_id % len(products)]
prompt_template = prompt_dict[product_name]
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
sequence_data_list = await loop.run_in_executor(
executor,
generate_email_sequence,
contact,
prompt_template,
product_name,
)
for seq_data in sequence_data_list:
db_sequence = GeneratedSequence(
file_id=file_id,
sequence_id=sequence_id,
email_number=seq_data["email_number"],
first_name=seq_data["first_name"],
last_name=seq_data["last_name"],
email=seq_data["email"],
company=seq_data["company"],
title=seq_data.get("title", ""),
product=seq_data["product"],
subject=seq_data["subject"],
email_content=seq_data["email_content"],
)
db.add(db_sequence)
db.commit()
for seq_data in sequence_data_list:
sequence_response = {
"id": sequence_id,
"emailNumber": seq_data["email_number"],
"firstName": seq_data["first_name"],
"lastName": seq_data["last_name"],
"email": seq_data["email"],
"company": seq_data["company"],
"title": seq_data.get("title", ""),
"product": seq_data["product"],
"subject": seq_data["subject"],
"emailContent": seq_data["email_content"],
}
yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n"
progress = min(100, max(0, (sequence_id / total_contacts) * 100)) if total_contacts > 0 else 0
yield f"data: {json.dumps({'type': 'progress', 'progress': float(progress)})}\n\n"
sequence_id += 1
await asyncio.sleep(0.1)
yield f"data: {json.dumps({'type': 'complete'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/api/download-sequences")
async def download_sequences(file_id: str = Query(...), db: Session = Depends(get_db)):
"""Download generated sequences as CSV with all subject/body fields"""
try:
# Get all sequences for this file, grouped by contact
sequences = db.query(GeneratedSequence).filter(
GeneratedSequence.file_id == file_id
).order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number).all()
if not sequences:
raise HTTPException(status_code=404, detail="No sequences found")
# Group sequences by contact
contacts = {}
max_email_number = 0 # Track the maximum email number across all contacts
for seq in sequences:
contact_key = f"{seq.sequence_id}"
if contact_key not in contacts:
contacts[contact_key] = {
'first_name': seq.first_name,
'last_name': seq.last_name,
'email': seq.email,
'company': seq.company,
'title': seq.title or '',
'product': seq.product,
'subjects': {},
'bodies': {}
}
contacts[contact_key]['subjects'][seq.email_number] = seq.subject
contacts[contact_key]['bodies'][seq.email_number] = seq.email_content
# Track the maximum email number
if seq.email_number > max_email_number:
max_email_number = seq.email_number
# Create CSV in memory
# Use QUOTE_MINIMAL to properly quote fields with newlines for Smartlead
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL)
# Write header – fixed format: subject1, body1, subject2, body2, ... (separate columns, not merged)
header = ['First Name', 'Last Name', 'Email', 'Company', 'Title', 'Product']
for i in range(1, max_email_number + 1):
header.extend([f'subject{i}', f'body{i}'])
writer.writerow(header)
# Write rows (one row per contact with subjects/bodies up to max_email_number)
# Preserve newlines in email bodies for proper formatting in Smartlead
for contact in contacts.values():
row = [
contact['first_name'],
contact['last_name'],
contact['email'],
contact['company'],
contact['title'],
contact['product']
]
for i in range(1, max_email_number + 1):
subject = contact['subjects'].get(i, '') or ''
body = contact['bodies'].get(i, '') or ''
# Convert newlines to HTML <br> tags for Smartlead to properly render line breaks
# Smartlead's rich text editor expects HTML formatting
if body:
# Replace newlines with <br> tags for HTML rendering in Smartlead
body_html = body.replace('\n', '<br>').replace('\r', '')
else:
body_html = ''
row.append(subject)
row.append(body_html)
writer.writerow(row)
output.seek(0)
# Return as downloadable file (fixed format: one row per contact, separate subject/body columns)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={
"Content-Disposition": "attachment; filename=email_sequences_fixed.csv"
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error downloading sequences: {str(e)}")
@app.get("/api/smartlead-campaigns")
async def get_smartlead_campaigns():
"""Get list of campaigns from Smartlead"""
try:
client = SmartleadClient()
campaigns = client.get_campaigns()
# Format campaigns for frontend
formatted_campaigns = []
for campaign in campaigns:
campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId')
campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unnamed Campaign'
if campaign_id:
formatted_campaigns.append({
"id": str(campaign_id),
"name": campaign_name
})
return {"campaigns": formatted_campaigns}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch campaigns: {str(e)}")
@app.post("/api/push-to-smartlead")
async def push_to_smartlead(request: SmartleadPushRequest, db: Session = Depends(get_db)):
"""Push generated sequences to Smartlead campaign (add leads to existing campaign)"""
import uuid
try:
# Get all sequences for this file
sequences = db.query(GeneratedSequence).filter(
GeneratedSequence.file_id == request.file_id
).order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number).all()
if not sequences:
raise HTTPException(status_code=404, detail="No sequences found")
# Group sequences by contact
contacts = {}
for seq in sequences:
contact_key = f"{seq.sequence_id}"
if contact_key not in contacts:
contacts[contact_key] = {
'first_name': seq.first_name,
'last_name': seq.last_name,
'email': seq.email,
'company': seq.company,
'title': seq.title or '',
'subjects': {},
'bodies': {}
}
contacts[contact_key]['subjects'][seq.email_number] = seq.subject
contacts[contact_key]['bodies'][seq.email_number] = seq.email_content
# Initialize Smartlead client
try:
client = SmartleadClient()
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Get campaign name for logging
campaigns = client.get_campaigns()
campaign_name = 'Unknown Campaign'
for campaign in campaigns:
campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId')
if str(campaign_id) == str(request.campaign_id):
campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unknown Campaign'
break
# Create run record
run_id = str(uuid.uuid4())
run = SmartleadRun(
run_id=run_id,
file_id=request.file_id,
mode='existing', # Always 'existing' now
campaign_id=request.campaign_id,
campaign_name=campaign_name,
steps_count=0, # Not needed for existing campaigns
dry_run=1 if request.dry_run else 0,
total_leads=len(contacts),
status='pending'
)
db.add(run)
db.commit()
if request.dry_run:
# Dry run - just return what would be sent
return {
"run_id": run_id,
"campaign_id": request.campaign_id,
"campaign_name": campaign_name,
"total": len(contacts),
"added": 0,
"skipped": 0,
"failed": 0,
"errors": [],
"status": "dry_run_completed",
"message": "Dry run completed. No leads were sent to Smartlead."
}
campaign_id = request.campaign_id
# Prepare leads for Smartlead
leads = []
errors = []
added_count = 0
skipped_count = 0
failed_count = 0
for contact in contacts.values():
try:
# Validate required fields
if not contact.get('email'):
errors.append({
"email": contact.get('email', 'unknown'),
"error": "Missing email address"
})
failed_count += 1
continue
# Helper function to safely convert to string and strip
def safe_str(value):
if value is None:
return ""
if isinstance(value, float):
# Handle NaN and other float values
if math.isnan(value):
return ""
return str(value).strip()
return str(value).strip() if value else ""
# Ensure first_name and last_name are not None/empty/float
first_name = safe_str(contact.get('first_name', '')) or 'Contact'
last_name = safe_str(contact.get('last_name', ''))
company = safe_str(contact.get('company', ''))
title = safe_str(contact.get('title', ''))
email = safe_str(contact.get('email', ''))
if not email:
errors.append({
"email": "unknown",
"error": "Missing email address"
})
failed_count += 1
continue
# Build lead object - Smartlead API doesn't allow "company", "title", or custom variables when adding leads
# Custom variables should be set manually in Smartlead via CSV import
lead = {
"email": email,
"first_name": first_name,
"last_name": last_name
}
leads.append(lead)
except Exception as e:
errors.append({
"email": contact.get('email', 'unknown'),
"error": str(e)
})
failed_count += 1
# Add leads to campaign (chunk in batches of 50)
batch_size = 50
for i in range(0, len(leads), batch_size):
batch = leads[i:i + batch_size]
try:
response = client.add_leads_to_campaign(campaign_id, batch)
added_count += len(batch)
except Exception as e:
# Mark batch as failed
for lead in batch:
errors.append({
"email": lead.get('email', 'unknown'),
"error": f"Failed to add lead: {str(e)}"
})
failed_count += 1
# Update run record
run.status = 'completed'
run.added_leads = added_count
run.skipped_leads = skipped_count
run.failed_leads = failed_count
run.error_details = json.dumps(errors) if errors else None
run.completed_at = datetime.utcnow()
db.commit()
# No warning needed - custom variables are now passed as direct fields
return {
"run_id": run_id,
"campaign_id": campaign_id,
"campaign_name": campaign_name,
"total": len(contacts),
"added": added_count,
"skipped": skipped_count,
"failed": failed_count,
"errors": errors[:10], # Return first 10 errors
"status": "completed"
}
except HTTPException:
raise
except Exception as e:
if 'run' in locals():
run.status = 'failed'
run.error_details = str(e)
db.commit()
raise HTTPException(status_code=500, detail=f"Error pushing to Smartlead: {str(e)}")
@app.get("/api/smartlead-runs")
async def get_smartlead_runs(file_id: str = Query(None), db: Session = Depends(get_db)):
"""Get Smartlead run history"""
try:
query = db.query(SmartleadRun)
if file_id:
query = query.filter(SmartleadRun.file_id == file_id)
runs = query.order_by(SmartleadRun.created_at.desc()).limit(50).all()
return [
{
"run_id": run.run_id,
"file_id": run.file_id,
"campaign_id": run.campaign_id,
"campaign_name": run.campaign_name,
"mode": run.mode,
"steps_count": run.steps_count,
"dry_run": bool(run.dry_run),
"total_leads": run.total_leads,
"added_leads": run.added_leads,
"skipped_leads": run.skipped_leads,
"failed_leads": run.failed_leads,
"status": run.status,
"created_at": run.created_at.isoformat() if run.created_at else None,
"completed_at": run.completed_at.isoformat() if run.completed_at else None
}
for run in runs
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching runs: {str(e)}")
# ---- Frontend static serving ----
FRONTEND_DIST = Path(__file__).resolve().parents[2] / "frontend" / "dist"
INDEX_FILE = FRONTEND_DIST / "index.html"
if FRONTEND_DIST.exists():
app.mount("/", StaticFiles(directory=str(FRONTEND_DIST), html=True), name="static")
# SPA fallback: any non-/api route should return React index.html
@app.get("/{full_path:path}")
def spa_fallback(full_path: str):
if full_path.startswith("api/"):
return {"detail": "Not Found"}
return FileResponse(str(INDEX_FILE))