| import json |
| import os |
| import traceback |
| |
| os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' |
| from fastapi import FastAPI, Request, HTTPException, Depends |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import RedirectResponse |
| from sqlalchemy.orm import Session |
| from google_auth_oauthlib.flow import Flow |
| from google.oauth2.credentials import Credentials |
| from googleapiclient.discovery import build |
| from apscheduler.schedulers.background import BackgroundScheduler |
| from engine import automation_engine |
| from sheets import get_sheets_service, get_sheet_data, extract_spreadsheet_id |
| import models |
| from models import SessionLocal, engine |
| import firebase_admin |
| from firebase_admin import credentials, firestore |
|
|
| |
| def write_secret_to_file(env_var_name, file_path): |
| secret_content = os.getenv(env_var_name) |
| if secret_content: |
| try: |
| with open(file_path, 'w') as f: |
| f.write(secret_content) |
| print(f"Successfully wrote {env_var_name} to {file_path}") |
| except Exception as e: |
| print(f"Failed to write {env_var_name} to {file_path}: {e}") |
|
|
| |
| write_secret_to_file("GOOGLE_CREDENTIALS_JSON", "credentials.json") |
| write_secret_to_file("FIREBASE_CREDENTIALS_JSON", "fireB.json") |
|
|
| |
| try: |
| if not firebase_admin._apps: |
| |
| if os.path.exists("fireB.json"): |
| cred = credentials.Certificate("fireB.json") |
| firebase_admin.initialize_app(cred) |
| db_firestore = firestore.client() |
| else: |
| print("Warning: fireB.json not found. Activation will be disabled.") |
| db_firestore = None |
| else: |
| db_firestore = firestore.client() |
| except Exception as e: |
| print(f"Firebase initialization failed: {e}") |
| db_firestore = None |
|
|
| |
| |
| state_storage = {} |
|
|
| |
| models.Base.metadata.create_all(bind=engine) |
|
|
| |
| scheduler = BackgroundScheduler() |
| scheduler.add_job(automation_engine, 'interval', minutes=5) |
| scheduler.start() |
|
|
| app = FastAPI() |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=False, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| def get_db(): |
| db = SessionLocal() |
| try: |
| yield db |
| finally: |
| db.close() |
|
|
| |
| CLIENT_SECRETS_FILE = "credentials.json" |
|
|
| |
| SCOPES = [ |
| 'https://www.googleapis.com/auth/gmail.send', |
| 'https://www.googleapis.com/auth/gmail.readonly', |
| 'https://www.googleapis.com/auth/spreadsheets.readonly', |
| 'https://www.googleapis.com/auth/userinfo.email', |
| 'openid' |
| ] |
|
|
| |
| @app.get("/health") |
| def health_check(): |
| return {"status": "healthy", "version": "1.0.1"} |
|
|
| |
| GOOGLE_REDIRECT_URI = os.getenv("GOOGLE_REDIRECT_URI", "http://localhost:8000/auth/callback") |
| FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:3000") |
|
|
| def get_flow(redirect_uri: str = GOOGLE_REDIRECT_URI): |
| return Flow.from_client_secrets_file( |
| CLIENT_SECRETS_FILE, |
| scopes=SCOPES, |
| redirect_uri=redirect_uri |
| ) |
|
|
| @app.get("/auth/login") |
| async def login(): |
| flow = get_flow() |
| authorization_url, state = flow.authorization_url( |
| access_type='offline', |
| include_granted_scopes='true', |
| prompt='consent' |
| ) |
| |
| state_storage[state] = flow.code_verifier |
| return {"url": authorization_url} |
|
|
| @app.get("/auth/callback") |
| async def callback(request: Request, db: Session = Depends(get_db)): |
| try: |
| state = request.query_params.get('state') |
| flow = get_flow() |
| |
| |
| if state in state_storage: |
| flow.code_verifier = state_storage.pop(state) |
| |
| flow.fetch_token(authorization_response=str(request.url)) |
| |
| credentials = flow.credentials |
| |
| |
| service = build('oauth2', 'v2', credentials=credentials) |
| user_info = service.userinfo().get().execute() |
| email = user_info['email'] |
|
|
| |
| user = db.query(models.User).filter(models.User.email == email).first() |
| token_data = { |
| 'token': credentials.token, |
| 'refresh_token': credentials.refresh_token, |
| 'token_uri': credentials.token_uri, |
| 'client_id': credentials.client_id, |
| 'client_secret': credentials.client_secret, |
| 'scopes': credentials.scopes |
| } |
|
|
| if not user: |
| user = models.User(email=email, google_token=token_data) |
| db.add(user) |
| else: |
| user.google_token = token_data |
| |
| db.commit() |
|
|
| |
| return RedirectResponse(url=f"{FRONTEND_URL}/settings?status=success") |
| |
| except Exception as e: |
| print(f"Error in OAuth callback: {e}") |
| traceback.print_exc() |
| return {"error": str(e), "details": traceback.format_exc()} |
|
|
| @app.get("/auth/me") |
| async def get_me(db: Session = Depends(get_db)): |
| |
| |
| user = db.query(models.User).order_by(models.User.id.desc()).first() |
| if not user: |
| return {"connected": False} |
| return { |
| "connected": True, |
| "email": user.email, |
| "id": user.id, |
| "sender_name": user.sender_name, |
| "reply_to": user.reply_to |
| } |
|
|
| @app.get("/stats") |
| async def get_stats(db: Session = Depends(get_db)): |
| """Returns overall platform stats for the dashboard.""" |
| total_sent = db.query(models.Lead).filter(models.Lead.status.in_(["emailed", "replied"])).count() |
| total_replied = db.query(models.Lead).filter(models.Lead.status == "replied").count() |
| total_leads = db.query(models.Lead).count() |
| active_campaigns = db.query(models.Campaign).filter(models.Campaign.status == "active").count() |
| |
| reply_rate = (total_replied / total_sent * 100) if total_sent > 0 else 0 |
| |
| return { |
| "total_sent": total_sent, |
| "active_campaigns": active_campaigns, |
| "reply_rate": f"{reply_rate:.1f}%", |
| "total_leads": total_leads, |
| "name": "User" |
| } |
|
|
| @app.post("/auth/settings") |
| async def update_settings(request: Request, db: Session = Depends(get_db)): |
| data = await request.json() |
| sender_name = data.get("sender_name") |
| reply_to = data.get("reply_to") |
| |
| |
| user = db.query(models.User).order_by(models.User.id.desc()).first() |
| if not user: |
| raise HTTPException(status_code=404, detail="User not found") |
| |
| user.sender_name = sender_name |
| user.reply_to = reply_to |
| db.commit() |
| return {"message": "Settings updated"} |
|
|
| @app.post("/auth/disconnect") |
| async def disconnect(db: Session = Depends(get_db)): |
| |
| |
| user = db.query(models.User).order_by(models.User.id.desc()).first() |
| if user: |
| db.delete(user) |
| db.commit() |
| return {"message": "Successfully disconnected"} |
|
|
| @app.post("/auth/activate") |
| async def activate(request: Request, db: Session = Depends(get_db)): |
| data = await request.json() |
| code = data.get("code") |
| |
| if not db_firestore: |
| raise HTTPException(status_code=500, detail="Firebase not initialized. Check fireB.json.") |
|
|
| try: |
| |
| keys_ref = db_firestore.collection("activation_keys") |
| query = keys_ref.where("code", "==", code).where("used", "==", False).limit(1).get() |
|
|
| if not query: |
| raise HTTPException(status_code=401, detail="Invalid or already used activation code.") |
|
|
| |
| doc = query[0] |
| doc.reference.update({ |
| "used": True, |
| "activated_at": firestore.SERVER_TIMESTAMP |
| }) |
| |
| return {"token": "activated_session_" + doc.id} |
| except HTTPException as he: |
| raise he |
| except Exception as e: |
| print(f"Activation error: {e}") |
| raise HTTPException(status_code=500, detail="Internal server error during activation.") |
|
|
| @app.get("/campaigns") |
| async def get_campaigns(db: Session = Depends(get_db)): |
| campaigns = db.query(models.Campaign).all() |
| results = [] |
| for camp in campaigns: |
| sent = db.query(models.Lead).filter(models.Lead.campaign_id == camp.id, models.Lead.status.in_(["emailed", "replied"])).count() |
| replies = db.query(models.Lead).filter(models.Lead.campaign_id == camp.id, models.Lead.status == "replied").count() |
| leads_count = db.query(models.Lead).filter(models.Lead.campaign_id == camp.id).count() |
| |
| |
| camp_dict = { |
| "id": camp.id, |
| "name": camp.name, |
| "subject": camp.subject, |
| "status": camp.status, |
| "created_at": camp.created_at, |
| "steps": camp.steps, |
| "stats": { |
| "sent": sent, |
| "replies": replies, |
| "leads": leads_count, |
| "reply_rate": f"{(replies/sent*100):.1f}%" if sent > 0 else "0%" |
| } |
| } |
| results.append(camp_dict) |
| return results |
|
|
| @app.get("/campaigns/{campaign_id}") |
| async def get_campaign(campaign_id: int, db: Session = Depends(get_db)): |
| campaign = db.query(models.Campaign).filter(models.Campaign.id == campaign_id).first() |
| if not campaign: |
| raise HTTPException(status_code=404, detail="Campaign not found") |
| return campaign |
|
|
| @app.put("/campaigns/{campaign_id}") |
| async def update_campaign(campaign_id: int, request: Request, db: Session = Depends(get_db)): |
| campaign = db.query(models.Campaign).filter(models.Campaign.id == campaign_id).first() |
| if not campaign: |
| raise HTTPException(status_code=404, detail="Campaign not found") |
| |
| data = await request.json() |
| campaign.name = data.get("name", campaign.name) |
| campaign.subject = data.get("subject", campaign.subject) |
| campaign.steps = data.get("steps", campaign.steps) |
| campaign.user_id = data.get("user_id", campaign.user_id) |
| |
| db.commit() |
| return {"message": "Campaign updated"} |
|
|
| @app.get("/campaigns/{campaign_id}/leads") |
| async def get_leads(campaign_id: int, db: Session = Depends(get_db)): |
| return db.query(models.Lead).filter(models.Lead.campaign_id == campaign_id).all() |
|
|
| db.query(models.Lead).filter(models.Lead.campaign_id == campaign_id).delete() |
| db.commit() |
| return {"message": "Audience cleared successfully"} |
|
|
| @app.post("/campaigns/{campaign_id}/reset-leads") |
| async def reset_leads(campaign_id: int, db: Session = Depends(get_db)): |
| """Resets all leads in a campaign to 'pending' to allow resending.""" |
| db.query(models.Lead).filter(models.Lead.campaign_id == campaign_id).update({ |
| models.Lead.status: "pending", |
| models.Lead.current_step: 0, |
| models.Lead.last_contact_at: None, |
| models.Lead.thread_id: None |
| }) |
| db.commit() |
| return {"message": "All leads reset to pending for resending"} |
|
|
| @app.put("/leads/{lead_id}") |
| async def update_lead(lead_id: int, request: Request, db: Session = Depends(get_db)): |
| lead = db.query(models.Lead).filter(models.Lead.id == lead_id).first() |
| if not lead: |
| raise HTTPException(status_code=404, detail="Lead not found") |
| data = await request.json() |
| lead.email = data.get("email", lead.email) |
| lead.data = data.get("data", lead.data) |
| db.commit() |
| return {"message": "Lead updated"} |
|
|
| @app.delete("/leads/{lead_id}") |
| async def delete_lead(lead_id: int, db: Session = Depends(get_db)): |
| lead = db.query(models.Lead).filter(models.Lead.id == lead_id).first() |
| if not lead: |
| raise HTTPException(status_code=404, detail="Lead not found") |
| db.delete(lead) |
| db.commit() |
| return {"message": "Lead deleted"} |
|
|
| @app.post("/campaigns") |
| async def create_campaign(request: Request, db: Session = Depends(get_db)): |
| data = await request.json() |
| name = data.get("name") |
| subject = data.get("subject") |
| steps = data.get("steps") |
| user_id = data.get("user_id") |
| |
| if not name or not subject or not steps: |
| raise HTTPException(status_code=400, detail="Missing name, subject, or steps") |
| |
| |
| if not user_id: |
| active_user = db.query(models.User).order_by(models.User.id.desc()).first() |
| if active_user: |
| user_id = active_user.id |
|
|
| campaign = models.Campaign(name=name, subject=subject, steps=steps, user_id=user_id) |
| db.add(campaign) |
| db.commit() |
| return {"message": "Campaign created", "id": campaign.id} |
|
|
| @app.post("/campaigns/{campaign_id}/run") |
| async def run_campaign_manually(campaign_id: int, db: Session = Depends(get_db)): |
| try: |
| automation_engine(campaign_id=campaign_id) |
| return {"message": f"Automation engine triggered for campaign {campaign_id}"} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/campaigns/{campaign_id}/test-send") |
| async def test_send(campaign_id: int, db: Session = Depends(get_db)): |
| """Sends a preview of Step 1 to the user's own email address.""" |
| campaign = db.query(models.Campaign).filter(models.Campaign.id == campaign_id).first() |
| if not campaign: |
| raise HTTPException(status_code=404, detail="Campaign not found") |
| |
| user = db.query(models.User).filter(models.User.id == campaign.user_id).first() |
| if not user or not user.google_token: |
| |
| user = db.query(models.User).order_by(models.User.id.desc()).first() |
| if not user or not user.google_token: |
| raise HTTPException(status_code=404, detail="No connected Google account found") |
|
|
| from engine import get_gmail_service, send_email |
| |
| try: |
| service = get_gmail_service(user.google_token) |
| first_step = campaign.steps[0] |
| |
| |
| sample_data = {"name": "Test Name", "company": "Test Company"} |
| msg_body = first_step.get('html_form') or first_step.get('message', '') |
| msg_subject = f"[TEST] {campaign.subject}" |
| |
| for key, val in sample_data.items(): |
| placeholder = f"{{{key}}}" |
| msg_body = msg_body.replace(placeholder, str(val)) |
| msg_subject = msg_subject.replace(placeholder, str(val)) |
| |
| send_email(service, user.email, msg_subject, f"<h3>Campaign Preview (Step 1)</h3><hr/>{msg_body}", user.email, user.sender_name, user.reply_to) |
| return {"message": f"Test email sent to {user.email}"} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/campaigns/{campaign_id}/import-leads") |
| async def import_leads(campaign_id: int, request: Request, db: Session = Depends(get_db)): |
| data = await request.json() |
| sheet_url = data.get("sheet_url") |
| user_id = data.get("user_id") |
|
|
| if not sheet_url or not user_id: |
| raise HTTPException(status_code=400, detail="Missing sheet_url or user_id") |
|
|
| user = db.query(models.User).filter(models.User.id == user_id).first() |
| if not user or not user.google_token: |
| raise HTTPException(status_code=404, detail="User not found or Google not connected") |
|
|
| try: |
| service = get_sheets_service(user.google_token) |
| spreadsheet_id = extract_spreadsheet_id(url=sheet_url) |
| if not spreadsheet_id: |
| raise HTTPException(status_code=400, detail="Invalid spreadsheet URL") |
| |
| sheet_data = get_sheet_data(service, spreadsheet_id) |
| |
| |
| imported_count = 0 |
| updated_count = 0 |
| for row in sheet_data: |
| email = row.get("email") |
| if not email: |
| continue |
| |
| |
| existing_lead = db.query(models.Lead).filter( |
| models.Lead.campaign_id == campaign_id, |
| models.Lead.email == email |
| ).first() |
|
|
| if existing_lead: |
| existing_lead.data = row |
| updated_count += 1 |
| else: |
| lead = models.Lead( |
| email=email, |
| campaign_id=campaign_id, |
| data=row, |
| status="pending" |
| ) |
| db.add(lead) |
| imported_count += 1 |
| |
| db.commit() |
| return { |
| "message": f"Successfully processed leads", |
| "imported": imported_count, |
| "updated": updated_count |
| } |
|
|
| except Exception as e: |
| db.rollback() |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.post("/campaigns/{campaign_id}/import-manual") |
| async def import_leads_manual(campaign_id: int, request: Request, db: Session = Depends(get_db)): |
| data = await request.json() |
| leads_list = data.get("leads") |
|
|
| if not leads_list or not isinstance(leads_list, list): |
| raise HTTPException(status_code=400, detail="Invalid leads data. Expected a list of objects.") |
|
|
| try: |
| imported_count = 0 |
| updated_count = 0 |
| for entry in leads_list: |
| email = entry.get("email") |
| if not email: |
| continue |
| |
| |
| existing_lead = db.query(models.Lead).filter( |
| models.Lead.campaign_id == campaign_id, |
| models.Lead.email == email |
| ).first() |
|
|
| if existing_lead: |
| existing_lead.data = entry |
| updated_count += 1 |
| else: |
| lead = models.Lead( |
| email=email, |
| campaign_id=campaign_id, |
| data=entry, |
| status="pending" |
| ) |
| db.add(lead) |
| imported_count += 1 |
| |
| db.commit() |
| return { |
| "message": f"Successfully processed manual leads", |
| "imported": imported_count, |
| "updated": updated_count |
| } |
|
|
| except Exception as e: |
| db.rollback() |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|