mali / main.py
babaTEEpe's picture
Update main.py
c50efa7 verified
import json
import os
import traceback
# Allow HTTP for local development OAuth
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from google_auth_oauthlib.flow import Flow
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from apscheduler.schedulers.background import BackgroundScheduler
from engine import automation_engine
from sheets import get_sheets_service, get_sheet_data, extract_spreadsheet_id
import models
from models import SessionLocal, engine
import firebase_admin
from firebase_admin import credentials, firestore
# Helper to write environment variables to files (for Hugging Face secrets)
def write_secret_to_file(env_var_name, file_path):
secret_content = os.getenv(env_var_name)
if secret_content:
try:
with open(file_path, 'w') as f:
f.write(secret_content)
print(f"Successfully wrote {env_var_name} to {file_path}")
except Exception as e:
print(f"Failed to write {env_var_name} to {file_path}: {e}")
# Write credentials from environment if they exist (Production)
write_secret_to_file("GOOGLE_CREDENTIALS_JSON", "credentials.json")
write_secret_to_file("FIREBASE_CREDENTIALS_JSON", "fireB.json")
# Initialize Firebase
try:
if not firebase_admin._apps:
# Check if the file exists (either uploaded or written from env)
if os.path.exists("fireB.json"):
cred = credentials.Certificate("fireB.json")
firebase_admin.initialize_app(cred)
db_firestore = firestore.client()
else:
print("Warning: fireB.json not found. Activation will be disabled.")
db_firestore = None
else:
db_firestore = firestore.client()
except Exception as e:
print(f"Firebase initialization failed: {e}")
db_firestore = None
# Temporary storage for PKCE verifiers (keyed by state)
# In production, use session middleware or Redis
state_storage = {}
# Init DB
models.Base.metadata.create_all(bind=engine)
# Init Scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(automation_engine, 'interval', minutes=5)
scheduler.start()
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all for now to debug connectivity
allow_credentials=False, # Standard fetch doesn't need this, and "*" requires False
allow_methods=["*"],
allow_headers=["*"],
)
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Load credentials from the file provided by user
CLIENT_SECRETS_FILE = "credentials.json"
# In a real app, these would be in environment variables or config
SCOPES = [
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/spreadsheets.readonly',
'https://www.googleapis.com/auth/userinfo.email',
'openid'
]
# Health check for deployment testing
@app.get("/health")
def health_check():
return {"status": "healthy", "version": "1.0.1"}
# Set these in your Hugging Face Space secrets!
GOOGLE_REDIRECT_URI = os.getenv("GOOGLE_REDIRECT_URI", "http://localhost:8000/auth/callback")
FRONTEND_URL = os.getenv("FRONTEND_URL", "http://localhost:3000")
def get_flow(redirect_uri: str = GOOGLE_REDIRECT_URI):
return Flow.from_client_secrets_file(
CLIENT_SECRETS_FILE,
scopes=SCOPES,
redirect_uri=redirect_uri
)
@app.get("/auth/login")
async def login():
flow = get_flow()
authorization_url, state = flow.authorization_url(
access_type='offline',
include_granted_scopes='true',
prompt='consent'
)
# Save the verifier for the callback
state_storage[state] = flow.code_verifier
return {"url": authorization_url}
@app.get("/auth/callback")
async def callback(request: Request, db: Session = Depends(get_db)):
try:
state = request.query_params.get('state')
flow = get_flow()
# Restore the verifier
if state in state_storage:
flow.code_verifier = state_storage.pop(state)
flow.fetch_token(authorization_response=str(request.url))
credentials = flow.credentials
# Use UserInfo to get the email address of the person who just logged in
service = build('oauth2', 'v2', credentials=credentials)
user_info = service.userinfo().get().execute()
email = user_info['email']
# Store user and token in DB
user = db.query(models.User).filter(models.User.email == email).first()
token_data = {
'token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes
}
if not user:
user = models.User(email=email, google_token=token_data)
db.add(user)
else:
user.google_token = token_data
db.commit()
# Redirect back to frontend settings page upon success
return RedirectResponse(url=f"{FRONTEND_URL}/settings?status=success")
except Exception as e:
print(f"Error in OAuth callback: {e}")
traceback.print_exc()
return {"error": str(e), "details": traceback.format_exc()}
@app.get("/auth/me")
async def get_me(db: Session = Depends(get_db)):
# In a real app, this would use session/auth headers
# For now, we return the first user in the DB (the primary user)
user = db.query(models.User).order_by(models.User.id.desc()).first()
if not user:
return {"connected": False}
return {
"connected": True,
"email": user.email,
"id": user.id,
"sender_name": user.sender_name,
"reply_to": user.reply_to
}
@app.get("/stats")
async def get_stats(db: Session = Depends(get_db)):
"""Returns overall platform stats for the dashboard."""
total_sent = db.query(models.Lead).filter(models.Lead.status.in_(["emailed", "replied"])).count()
total_replied = db.query(models.Lead).filter(models.Lead.status == "replied").count()
total_leads = db.query(models.Lead).count()
active_campaigns = db.query(models.Campaign).filter(models.Campaign.status == "active").count()
reply_rate = (total_replied / total_sent * 100) if total_sent > 0 else 0
return {
"total_sent": total_sent,
"active_campaigns": active_campaigns,
"reply_rate": f"{reply_rate:.1f}%",
"total_leads": total_leads,
"name": "User" # Placeholder for Firebase name later
}
@app.post("/auth/settings")
async def update_settings(request: Request, db: Session = Depends(get_db)):
data = await request.json()
sender_name = data.get("sender_name")
reply_to = data.get("reply_to")
# In a real app, identify user from session
user = db.query(models.User).order_by(models.User.id.desc()).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.sender_name = sender_name
user.reply_to = reply_to
db.commit()
return {"message": "Settings updated"}
@app.post("/auth/disconnect")
async def disconnect(db: Session = Depends(get_db)):
# Delete the most recent user to "disconnect"
# In a real app with sessions, we'd delete based on session_id
user = db.query(models.User).order_by(models.User.id.desc()).first()
if user:
db.delete(user)
db.commit()
return {"message": "Successfully disconnected"}
@app.post("/auth/activate")
async def activate(request: Request, db: Session = Depends(get_db)):
data = await request.json()
code = data.get("code")
if not db_firestore:
raise HTTPException(status_code=500, detail="Firebase not initialized. Check fireB.json.")
try:
# Search for the code in the 'activation_keys' collection
keys_ref = db_firestore.collection("activation_keys")
query = keys_ref.where("code", "==", code).where("used", "==", False).limit(1).get()
if not query:
raise HTTPException(status_code=401, detail="Invalid or already used activation code.")
# Mark as used
doc = query[0]
doc.reference.update({
"used": True,
"activated_at": firestore.SERVER_TIMESTAMP
})
return {"token": "activated_session_" + doc.id}
except HTTPException as he:
raise he
except Exception as e:
print(f"Activation error: {e}")
raise HTTPException(status_code=500, detail="Internal server error during activation.")
@app.get("/campaigns")
async def get_campaigns(db: Session = Depends(get_db)):
campaigns = db.query(models.Campaign).all()
results = []
for camp in campaigns:
sent = db.query(models.Lead).filter(models.Lead.campaign_id == camp.id, models.Lead.status.in_(["emailed", "replied"])).count()
replies = db.query(models.Lead).filter(models.Lead.campaign_id == camp.id, models.Lead.status == "replied").count()
leads_count = db.query(models.Lead).filter(models.Lead.campaign_id == camp.id).count()
# Convert SQLAlchemy model to dict and add counts
camp_dict = {
"id": camp.id,
"name": camp.name,
"subject": camp.subject,
"status": camp.status,
"created_at": camp.created_at,
"steps": camp.steps,
"stats": {
"sent": sent,
"replies": replies,
"leads": leads_count,
"reply_rate": f"{(replies/sent*100):.1f}%" if sent > 0 else "0%"
}
}
results.append(camp_dict)
return results
@app.get("/campaigns/{campaign_id}")
async def get_campaign(campaign_id: int, db: Session = Depends(get_db)):
campaign = db.query(models.Campaign).filter(models.Campaign.id == campaign_id).first()
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
return campaign
@app.put("/campaigns/{campaign_id}")
async def update_campaign(campaign_id: int, request: Request, db: Session = Depends(get_db)):
campaign = db.query(models.Campaign).filter(models.Campaign.id == campaign_id).first()
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
data = await request.json()
campaign.name = data.get("name", campaign.name)
campaign.subject = data.get("subject", campaign.subject)
campaign.steps = data.get("steps", campaign.steps)
campaign.user_id = data.get("user_id", campaign.user_id)
db.commit()
return {"message": "Campaign updated"}
@app.get("/campaigns/{campaign_id}/leads")
async def get_leads(campaign_id: int, db: Session = Depends(get_db)):
return db.query(models.Lead).filter(models.Lead.campaign_id == campaign_id).all()
db.query(models.Lead).filter(models.Lead.campaign_id == campaign_id).delete()
db.commit()
return {"message": "Audience cleared successfully"}
@app.post("/campaigns/{campaign_id}/reset-leads")
async def reset_leads(campaign_id: int, db: Session = Depends(get_db)):
"""Resets all leads in a campaign to 'pending' to allow resending."""
db.query(models.Lead).filter(models.Lead.campaign_id == campaign_id).update({
models.Lead.status: "pending",
models.Lead.current_step: 0,
models.Lead.last_contact_at: None,
models.Lead.thread_id: None
})
db.commit()
return {"message": "All leads reset to pending for resending"}
@app.put("/leads/{lead_id}")
async def update_lead(lead_id: int, request: Request, db: Session = Depends(get_db)):
lead = db.query(models.Lead).filter(models.Lead.id == lead_id).first()
if not lead:
raise HTTPException(status_code=404, detail="Lead not found")
data = await request.json()
lead.email = data.get("email", lead.email)
lead.data = data.get("data", lead.data)
db.commit()
return {"message": "Lead updated"}
@app.delete("/leads/{lead_id}")
async def delete_lead(lead_id: int, db: Session = Depends(get_db)):
lead = db.query(models.Lead).filter(models.Lead.id == lead_id).first()
if not lead:
raise HTTPException(status_code=404, detail="Lead not found")
db.delete(lead)
db.commit()
return {"message": "Lead deleted"}
@app.post("/campaigns")
async def create_campaign(request: Request, db: Session = Depends(get_db)):
data = await request.json()
name = data.get("name")
subject = data.get("subject")
steps = data.get("steps")
user_id = data.get("user_id")
if not name or not subject or not steps:
raise HTTPException(status_code=400, detail="Missing name, subject, or steps")
# If user_id is missing, try to associate with the last active user
if not user_id:
active_user = db.query(models.User).order_by(models.User.id.desc()).first()
if active_user:
user_id = active_user.id
campaign = models.Campaign(name=name, subject=subject, steps=steps, user_id=user_id)
db.add(campaign)
db.commit()
return {"message": "Campaign created", "id": campaign.id}
@app.post("/campaigns/{campaign_id}/run")
async def run_campaign_manually(campaign_id: int, db: Session = Depends(get_db)):
try:
automation_engine(campaign_id=campaign_id)
return {"message": f"Automation engine triggered for campaign {campaign_id}"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/campaigns/{campaign_id}/test-send")
async def test_send(campaign_id: int, db: Session = Depends(get_db)):
"""Sends a preview of Step 1 to the user's own email address."""
campaign = db.query(models.Campaign).filter(models.Campaign.id == campaign_id).first()
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
user = db.query(models.User).filter(models.User.id == campaign.user_id).first()
if not user or not user.google_token:
# Fallback to the current user if campaign user_id is missing or disconnected
user = db.query(models.User).order_by(models.User.id.desc()).first()
if not user or not user.google_token:
raise HTTPException(status_code=404, detail="No connected Google account found")
from engine import get_gmail_service, send_email
try:
service = get_gmail_service(user.google_token)
first_step = campaign.steps[0]
# Sample data for preview
sample_data = {"name": "Test Name", "company": "Test Company"}
msg_body = first_step.get('html_form') or first_step.get('message', '')
msg_subject = f"[TEST] {campaign.subject}"
for key, val in sample_data.items():
placeholder = f"{{{key}}}"
msg_body = msg_body.replace(placeholder, str(val))
msg_subject = msg_subject.replace(placeholder, str(val))
send_email(service, user.email, msg_subject, f"<h3>Campaign Preview (Step 1)</h3><hr/>{msg_body}", user.email, user.sender_name, user.reply_to)
return {"message": f"Test email sent to {user.email}"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/campaigns/{campaign_id}/import-leads")
async def import_leads(campaign_id: int, request: Request, db: Session = Depends(get_db)):
data = await request.json()
sheet_url = data.get("sheet_url")
user_id = data.get("user_id")
if not sheet_url or not user_id:
raise HTTPException(status_code=400, detail="Missing sheet_url or user_id")
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user or not user.google_token:
raise HTTPException(status_code=404, detail="User not found or Google not connected")
try:
service = get_sheets_service(user.google_token)
spreadsheet_id = extract_spreadsheet_id(url=sheet_url)
if not spreadsheet_id:
raise HTTPException(status_code=400, detail="Invalid spreadsheet URL")
sheet_data = get_sheet_data(service, spreadsheet_id)
# Import to DB with Upsert logic
imported_count = 0
updated_count = 0
for row in sheet_data:
email = row.get("email")
if not email:
continue
# Check if lead already exists in this campaign
existing_lead = db.query(models.Lead).filter(
models.Lead.campaign_id == campaign_id,
models.Lead.email == email
).first()
if existing_lead:
existing_lead.data = row
updated_count += 1
else:
lead = models.Lead(
email=email,
campaign_id=campaign_id,
data=row,
status="pending"
)
db.add(lead)
imported_count += 1
db.commit()
return {
"message": f"Successfully processed leads",
"imported": imported_count,
"updated": updated_count
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/campaigns/{campaign_id}/import-manual")
async def import_leads_manual(campaign_id: int, request: Request, db: Session = Depends(get_db)):
data = await request.json()
leads_list = data.get("leads") # Expecting a list of dicts
if not leads_list or not isinstance(leads_list, list):
raise HTTPException(status_code=400, detail="Invalid leads data. Expected a list of objects.")
try:
imported_count = 0
updated_count = 0
for entry in leads_list:
email = entry.get("email")
if not email:
continue
# Upsert logic
existing_lead = db.query(models.Lead).filter(
models.Lead.campaign_id == campaign_id,
models.Lead.email == email
).first()
if existing_lead:
existing_lead.data = entry
updated_count += 1
else:
lead = models.Lead(
email=email,
campaign_id=campaign_id,
data=entry,
status="pending"
)
db.add(lead)
imported_count += 1
db.commit()
return {
"message": f"Successfully processed manual leads",
"imported": imported_count,
"updated": updated_count
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)