EZOFISOCR / backend /app /auth_routes.py
Seth
update
4312115
import os
from fastapi import APIRouter, Depends, HTTPException, Body
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from .models import User, APIKey
from .auth import create_access_token, get_current_user
from .firebase_auth import verify_firebase_token
from .otp_service import request_otp, verify_otp
from .email_validator import validate_business_email, is_business_email
from .api_key_auth import generate_api_key, hash_api_key, get_api_key_prefix
from .db import SessionLocal
def get_db():
"""Database dependency."""
db = SessionLocal()
try:
yield db
finally:
db.close()
router = APIRouter()
class FirebaseLoginRequest(BaseModel):
id_token: str
class OTPRequestRequest(BaseModel):
email: EmailStr
class OTPVerifyRequest(BaseModel):
email: EmailStr
otp: str
class CreateAPIKeyRequest(BaseModel):
name: str # User-friendly name for the API key
@router.post("/api/auth/firebase/login")
async def firebase_login(
request: FirebaseLoginRequest,
db: Session = Depends(get_db)
):
"""
Login with Firebase ID token.
Validates business email and creates/updates user.
"""
try:
# Verify Firebase token
user_info = await verify_firebase_token(request.id_token)
email = user_info.get('email')
if not email:
raise HTTPException(status_code=400, detail="Email not found in Firebase token")
# Validate business email
if not is_business_email(email):
raise HTTPException(
status_code=400,
detail="Only business email addresses are allowed. Personal email accounts (Gmail, Yahoo, Outlook, etc.) are not permitted. Please use your work email address."
)
# Get or create user
user = db.query(User).filter(
(User.email == email.lower()) | (User.firebase_uid == user_info['uid'])
).first()
if not user:
user = User(
email=email.lower(),
name=user_info.get('name'),
picture=user_info.get('picture'),
firebase_uid=user_info['uid'],
auth_method='firebase',
email_verified=True
)
db.add(user)
db.commit()
db.refresh(user)
print(f"[INFO] New user created via Firebase: {email}")
# Enrich contact data from Apollo.io and update Brevo
try:
from .apollo_service import enrich_contact_by_email
from .brevo_service import create_brevo_contact, BREVO_TRIAL_LIST_ID
# Enrich contact data from Apollo.io
enriched_data = await enrich_contact_by_email(email)
# Use enriched data if available, otherwise use basic data
first_name = enriched_data.get("first_name") if enriched_data else None
last_name = enriched_data.get("last_name") if enriched_data else None
org_name = enriched_data.get("organization_name") if enriched_data else None
# Fallback to Firebase data if Apollo didn't provide it
if not first_name or not last_name:
full_name = user_info.get('name', '')
if full_name:
name_parts = full_name.strip().split(' ', 1)
first_name = first_name or (name_parts[0] if name_parts else None)
last_name = last_name or (name_parts[1] if len(name_parts) > 1 else None)
if not org_name:
org_domain = email.split('@')[1] if '@' in email else None
org_name = org_domain.split('.')[0].capitalize() if org_domain else None
# Update Brevo contact with enriched data
await create_brevo_contact(
email=email,
first_name=first_name,
last_name=last_name,
organization_name=org_name or (enriched_data.get("organization_name") if enriched_data else None),
phone_number=enriched_data.get("phone_number") if enriched_data else None,
linkedin_url=enriched_data.get("linkedin_url") if enriched_data else None,
title=enriched_data.get("title") if enriched_data else None,
headline=enriched_data.get("headline") if enriched_data else None,
organization_website=enriched_data.get("organization_website") if enriched_data else None,
organization_address=enriched_data.get("organization_address") if enriched_data else None,
list_id=BREVO_TRIAL_LIST_ID
)
except Exception as e:
# Don't fail user creation if integrations fail
print(f"[WARNING] Failed to enrich/update contact for {email}: {str(e)}")
else:
# Update user info
user.firebase_uid = user_info['uid']
user.email_verified = True
user.name = user_info.get('name', user.name)
user.picture = user_info.get('picture', user.picture)
if user.auth_method != 'firebase':
user.auth_method = 'firebase'
db.commit()
print(f"[INFO] User logged in via Firebase: {email}")
# Generate JWT token
token = create_access_token(data={"sub": user.id})
return {
"token": token,
"user": {
"id": user.id,
"email": user.email,
"name": user.name,
"picture": user.picture,
"auth_method": user.auth_method
}
}
except HTTPException:
raise
except Exception as e:
print(f"[ERROR] Firebase login failed: {str(e)}")
raise HTTPException(status_code=400, detail=f"Authentication failed: {str(e)}")
@router.post("/api/auth/otp/request")
async def request_otp_endpoint(
request: OTPRequestRequest,
db: Session = Depends(get_db)
):
"""
Request OTP for email login.
Validates business email before sending OTP.
"""
try:
# Validate business email
validate_business_email(request.email)
# Request OTP
result = await request_otp(request.email, db)
return result
except HTTPException:
raise
except Exception as e:
print(f"[ERROR] OTP request failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to send OTP: {str(e)}")
@router.post("/api/auth/otp/verify")
async def verify_otp_endpoint(
request: OTPVerifyRequest,
db: Session = Depends(get_db)
):
"""
Verify OTP and login.
Validates business email and OTP code.
"""
try:
# Validate business email
validate_business_email(request.email)
# Verify OTP
user = await verify_otp(request.email, request.otp, db)
# Generate JWT token
token = create_access_token(data={"sub": user.id})
return {
"token": token,
"user": {
"id": user.id,
"email": user.email,
"name": user.name,
"picture": user.picture,
"auth_method": user.auth_method
}
}
except HTTPException:
raise
except Exception as e:
print(f"[ERROR] OTP verification failed: {str(e)}")
raise HTTPException(status_code=400, detail=f"OTP verification failed: {str(e)}")
@router.get("/api/auth/me")
async def get_current_user_info(current_user: User = Depends(get_current_user)):
"""Get current user information."""
return {
"id": current_user.id,
"email": current_user.email,
"name": current_user.name,
"picture": current_user.picture,
"auth_method": current_user.auth_method,
}
# API Key Management Endpoints (newly added for external API access)
@router.post("/api/auth/api-key/create")
async def create_api_key(
request: CreateAPIKeyRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Create a new API key for the current user.
Returns the API key (only shown once - store it securely!).
"""
if not request.name or not request.name.strip():
raise HTTPException(status_code=400, detail="API key name is required")
# Generate new API key
api_key = generate_api_key()
key_hash = hash_api_key(api_key)
key_prefix = get_api_key_prefix(api_key)
# Create API key record
api_key_record = APIKey(
user_id=current_user.id,
name=request.name.strip(),
key_hash=key_hash,
key_prefix=key_prefix,
is_active=True
)
db.add(api_key_record)
db.commit()
db.refresh(api_key_record)
print(f"[INFO] API key created for user {current_user.email}: {key_prefix}")
return {
"success": True,
"api_key": api_key, # Only returned once - user must save this!
"key_id": api_key_record.id,
"key_prefix": key_prefix,
"name": api_key_record.name,
"created_at": api_key_record.created_at.isoformat() if api_key_record.created_at else None,
"message": "API key created successfully. Store this key securely - it will not be shown again!"
}
@router.get("/api/auth/api-keys")
async def list_api_keys(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
List all API keys for the current user.
Only shows key prefix, not the full key for security.
"""
api_keys = (
db.query(APIKey)
.filter(APIKey.user_id == current_user.id)
.order_by(APIKey.created_at.desc())
.all()
)
return {
"success": True,
"api_keys": [
{
"id": key.id,
"name": key.name,
"key_prefix": key.key_prefix,
"is_active": key.is_active,
"last_used_at": key.last_used_at.isoformat() if key.last_used_at else None,
"created_at": key.created_at.isoformat() if key.created_at else None,
}
for key in api_keys
]
}
@router.delete("/api/auth/api-key/{key_id}")
async def delete_api_key(
key_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Delete (deactivate) an API key.
"""
api_key = (
db.query(APIKey)
.filter(APIKey.id == key_id)
.filter(APIKey.user_id == current_user.id)
.first()
)
if not api_key:
raise HTTPException(status_code=404, detail="API key not found")
# Soft delete by deactivating
api_key.is_active = False
db.commit()
print(f"[INFO] API key {api_key.key_prefix} deactivated for user {current_user.email}")
return {
"success": True,
"message": "API key deactivated successfully"
}