app / api.py
CareerAI-app's picture
Deploy CareerAI to HuggingFace Spaces
b7934cd
"""
🚀 CareerAI — FastAPI Backend
Connects the Claude-style frontend with the existing RAG + Groq + ChromaDB engine.
Run: uvicorn api:app --reload --port 8000
"""
import os
import sys
import json
import asyncio
from datetime import datetime
from typing import List, Dict, Optional
from contextlib import asynccontextmanager
from dotenv import load_dotenv
# Load .env file
load_dotenv()
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Query, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import (
StreamingResponse,
FileResponse,
Response,
JSONResponse,
)
from pydantic import BaseModel
# Add project root to path
sys.path.insert(0, os.path.dirname(__file__))
from src.rag_engine import RAGEngine, EMBEDDING_MODELS
from src.career_assistant import CareerAssistant
from src.document_processor import DocumentProcessor
from src.exporter import (
export_to_pdf,
export_to_docx,
export_to_html,
export_to_txt,
get_smart_filename,
export_conversation_to_pdf,
export_conversation_to_docx,
export_conversation_to_html,
)
from src.profile_extractor import (
extract_profile_from_text,
generate_dashboard_insights,
skills_by_category,
skills_by_level,
experience_for_timeline,
)
# Import Auth routers
from src.auth import router as auth_router, conv_router, get_user_or_session_id
# ======================== STATE ========================
class AppState:
"""Global application state (shared across requests)."""
def __init__(self):
self.rag_engine: Optional[RAGEngine] = None
self.assistant: Optional[CareerAssistant] = None
self.api_key: str = ""
self.model: str = "llama-3.3-70b-versatile"
self.api_configured: bool = False
# Embedding model: configurable via env var for production (e.g. "gte-multilingual")
self.embedding_model: str = os.environ.get("EMBEDDING_MODEL", "bge-m3")
# Reranking: disable in production to save RAM (set ENABLE_RERANKING=false)
self.enable_reranking: bool = os.environ.get("ENABLE_RERANKING", "true").lower() in ("true", "1", "yes")
self.enable_hybrid: bool = True
def get_rag(self) -> RAGEngine:
if self.rag_engine is None:
self.rag_engine = RAGEngine(
embedding_key=self.embedding_model,
enable_reranking=self.enable_reranking,
enable_hybrid=self.enable_hybrid,
)
return self.rag_engine
def reset_rag(self):
"""Reset RAG engine (e.g. when embedding model changes)."""
self.rag_engine = None
def init_assistant(self, api_key: str, model: str):
self.assistant = CareerAssistant(api_key=api_key, model=model)
self.api_key = api_key
self.model = model
self.api_configured = True
state = AppState()
# ======================== AUTO-LOAD API KEY ========================
def _auto_load_api_key():
"""Try to load API key from environment or secrets.toml."""
# 1. Environment variable
key = os.environ.get("GROQ_API_KEY", "")
if key:
return key
# 2. .streamlit/secrets.toml
try:
import re as _re
secrets_path = os.path.join(os.path.dirname(__file__), ".streamlit", "secrets.toml")
if os.path.exists(secrets_path):
with open(secrets_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("GROQ_API_KEY"):
m = _re.search(r'"(.+?)"', line)
if m:
return m.group(1)
except Exception:
pass
return ""
# ======================== STARTUP ========================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize on startup."""
# Auto-configure API key
key = _auto_load_api_key()
if key:
try:
state.init_assistant(key, state.model)
print(f"✅ Auto-connected with API key (model: {state.model})")
except Exception as e:
print(f"⚠️ Could not auto-connect: {e}")
# Pre-initialize RAG engine
try:
rag = state.get_rag()
stats = rag.get_stats()
print(f"✅ RAG engine ready ({stats['total_documents']} docs, {stats['total_chunks']} chunks)")
except Exception as e:
print(f"⚠️ RAG engine init: {e}")
yield
print("🔴 CareerAI API shutting down")
# ======================== APP ========================
app = FastAPI(
title="CareerAI API",
description="Backend API for CareerAI Assistant",
version="1.0.0",
docs_url="/docs",
redoc_url=None,
lifespan=lifespan,
)
# Register specialized routers
app.include_router(auth_router)
app.include_router(conv_router)
# CORS — allow frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Serve frontend static files
frontend_dir = os.path.join(os.path.dirname(__file__), "frontend")
if os.path.isdir(frontend_dir):
app.mount("/static", StaticFiles(directory=frontend_dir), name="static")
# ======================== MODELS ========================
class ChatRequest(BaseModel):
query: str
chat_history: List[Dict[str, str]] = []
mode: str = "auto" # "auto", "general", "job_match", "cover_letter", "skills_gap", "interview"
class ConfigRequest(BaseModel):
api_key: str
model: str = "llama-3.3-70b-versatile"
class RAGConfigRequest(BaseModel):
embedding_model: str = "bge-m3"
enable_reranking: bool = True
enable_hybrid: bool = True
class ExportRequest(BaseModel):
content: str
format: str = "pdf" # "pdf", "docx", "html", "txt"
class ConversationExportRequest(BaseModel):
messages: List[Dict[str, str]]
format: str = "pdf"
# ======================== ROUTES: FRONTEND ========================
@app.get("/")
async def serve_frontend():
"""Serve the main frontend page."""
index_path = os.path.join(frontend_dir, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"message": "CareerAI API is running. Frontend not found at /frontend/"}
# ======================== ROUTES: CONFIG ========================
@app.get("/api/status")
async def get_status(user_id: str = Depends(get_user_or_session_id)):
"""Get current API configuration status."""
rag = state.get_rag()
stats = rag.get_stats(user_id=user_id)
return {
"api_configured": state.api_configured,
"model": state.model,
"embedding_model": state.embedding_model,
"enable_reranking": state.enable_reranking,
"enable_hybrid": state.enable_hybrid,
"documents": stats["documents"],
"total_chunks": stats["total_chunks"],
"total_documents": stats["total_documents"],
}
# ======================== ROUTES: JOB SEARCH ========================
JSEARCH_API_KEY = os.environ.get("JSEARCH_API_KEY", "")
@app.get("/api/jobs")
async def search_jobs(
query: str = Query(..., description="Job search terms, e.g. 'Python developer remote'"),
country: str = Query("worldwide", description="Country code, e.g. 'ar', 'es', 'us'"),
date_posted: str = Query("month", description="Filter: all, today, 3days, week, month"),
employment_type: str = Query("", description="FULLTIME, PARTTIME, CONTRACTOR, INTERN (comma separated)"),
remote_only: bool = Query(False, description="Only remote jobs"),
num_pages: int = Query(1, description="Number of result pages (1 page = 10 jobs)"),
):
"""Search worldwide job listings via JSearch (LinkedIn, Indeed, Glassdoor, etc.)."""
import httpx
headers = {
"x-rapidapi-host": "jsearch.p.rapidapi.com",
"x-rapidapi-key": JSEARCH_API_KEY,
}
params = {
"query": query,
"page": "1",
"num_pages": str(num_pages),
"date_posted": date_posted,
}
if country and country != "worldwide":
params["country"] = country
if remote_only:
params["remote_jobs_only"] = "true"
if employment_type:
params["employment_types"] = employment_type
try:
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.get(
"https://jsearch.p.rapidapi.com/search",
headers=headers,
params=params,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
raise HTTPException(status_code=502, detail=f"Error consultando JSearch: {str(e)}")
jobs = data.get("data", [])
formatted = []
for j in jobs:
salary_min = j.get("job_min_salary")
salary_max = j.get("job_max_salary")
salary_currency = j.get("job_salary_currency", "")
salary_period = j.get("job_salary_period", "")
if salary_min and salary_max:
salary_str = f"{salary_currency} {int(salary_min):,}{int(salary_max):,} / {salary_period}"
elif salary_min:
salary_str = f"{salary_currency} {int(salary_min):,}+ / {salary_period}"
else:
salary_str = None
formatted.append({
"id": j.get("job_id", ""),
"title": j.get("job_title", ""),
"company": j.get("employer_name", ""),
"company_logo": j.get("employer_logo", ""),
"location": f"{j.get('job_city', '') or ''} {j.get('job_state', '') or ''} {j.get('job_country', '') or ''}".strip(),
"employment_type": j.get("job_employment_type", ""),
"is_remote": j.get("job_is_remote", False),
"description_snippet": (j.get("job_description", "")[:220] + "…") if j.get("job_description") else "",
"salary": salary_str,
"posted_at": j.get("job_posted_at_datetime_utc", ""),
"apply_link": j.get("job_apply_link", "#"),
"publisher": j.get("job_publisher", ""),
})
return {"total": len(formatted), "jobs": formatted}
@app.post("/api/config")
async def configure_api(config: ConfigRequest):
"""Configure the Groq API key and model."""
try:
state.init_assistant(config.api_key, config.model)
return {
"success": True,
"message": f"Conectado con {config.model}",
"model": config.model,
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/api/config/rag")
async def configure_rag(config: RAGConfigRequest):
"""Update RAG engine settings."""
changed = False
if config.embedding_model != state.embedding_model:
state.embedding_model = config.embedding_model
changed = True
if config.enable_reranking != state.enable_reranking:
state.enable_reranking = config.enable_reranking
changed = True
if config.enable_hybrid != state.enable_hybrid:
state.enable_hybrid = config.enable_hybrid
changed = True
if changed:
state.reset_rag()
rag = state.get_rag()
stats = rag.get_stats()
return {
"success": True,
"embedding_model": state.embedding_model,
"enable_reranking": state.enable_reranking,
"enable_hybrid": state.enable_hybrid,
"stats": stats,
}
@app.get("/api/models")
async def list_models():
"""List available LLM models."""
models = {
"llama-3.3-70b-versatile": {"name": "CareerAI Pro", "description": "Recomendado · Máxima calidad"},
"llama-3.1-8b-instant": {"name": "CareerAI Flash", "description": "Ultra rápido · Respuestas al instante"},
}
return {"models": models, "current": state.model}
@app.get("/api/embedding-models")
async def list_embedding_models():
"""List available embedding models."""
result = {}
for key, info in EMBEDDING_MODELS.items():
result[key] = {
"display": info["display"],
"description": info.get("description", ""),
"size": info.get("size", ""),
"languages": info.get("languages", ""),
"performance": info.get("performance", ""),
}
return {"models": result, "current": state.embedding_model}
@app.post("/api/model")
async def change_model(model: str = Query(...)):
"""Change the active LLM model."""
if not state.api_configured:
raise HTTPException(status_code=400, detail="API key not configured")
try:
state.init_assistant(state.api_key, model)
return {"success": True, "model": model}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# ======================== ROUTES: CHAT ========================
@app.post("/api/chat")
async def chat(request: ChatRequest, user_id: str = Depends(get_user_or_session_id)):
"""Send a message and get AI response (non-streaming)."""
if not state.api_configured:
raise HTTPException(
status_code=400,
detail="API key not configured. Use POST /api/config first.",
)
# Auto-detect mode
mode = request.mode
if mode == "auto":
mode = state.assistant.detect_mode(request.query)
# Get RAG context
rag = state.get_rag()
context = rag.get_context(request.query, k=8, user_id=user_id)
# Get response
try:
response = state.assistant.chat(
query=request.query,
context=context,
chat_history=request.chat_history,
mode=mode,
)
return {
"response": response,
"mode": mode,
"model": state.model,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest, user_id: str = Depends(get_user_or_session_id)):
"""Send a message and get AI response via Server-Sent Events (streaming)."""
if not state.api_configured:
raise HTTPException(
status_code=400,
detail="API key not configured",
)
# Auto-detect mode
mode = request.mode
if mode == "auto":
mode = state.assistant.detect_mode(request.query)
# Get RAG context
rag = state.get_rag()
context = rag.get_context(request.query, k=8, user_id=user_id)
async def event_generator():
"""Stream response as SSE."""
try:
# Send mode info first
yield f"data: {json.dumps({'type': 'mode', 'mode': mode})}\n\n"
# Stream tokens
for chunk in state.assistant.stream_chat(
query=request.query,
context=context,
chat_history=request.chat_history,
mode=mode,
):
yield f"data: {json.dumps({'type': 'token', 'content': chunk})}\n\n"
# Done signal
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
# ======================== ROUTES: DOCUMENTS ========================
@app.post("/api/documents/upload")
async def upload_document(
file: UploadFile = File(...),
doc_type: str = Form("cv"),
user_id: str = Depends(get_user_or_session_id)
):
"""Upload and process a document through the RAG pipeline."""
# Validate file type
valid_extensions = [".pdf", ".txt", ".docx", ".doc", ".jpg", ".jpeg", ".png", ".webp"]
ext = os.path.splitext(file.filename)[1].lower()
if ext not in valid_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type: {ext}. Supported: {', '.join(valid_extensions)}",
)
# Check if already indexed
rag = state.get_rag()
existing_docs = rag.get_document_list(user_id=user_id)
if file.filename in existing_docs:
return {
"success": True,
"already_indexed": True,
"message": f"{file.filename} ya está indexado",
"filename": file.filename,
}
# Save file
upload_dir = os.path.join(os.path.dirname(__file__), "data", "uploads")
os.makedirs(upload_dir, exist_ok=True)
file_path = os.path.join(upload_dir, file.filename)
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
# Extract text
try:
api_key = state.api_key if state.api_configured else ""
text = DocumentProcessor.extract_text(file_path, groq_api_key=api_key)
if not text.strip():
raise ValueError("No se pudo extraer texto del documento")
# Chunk
chunks = DocumentProcessor.chunk_text(text, chunk_size=400, overlap=80)
# Key info
info = DocumentProcessor.extract_key_info(text)
# Add to RAG
metadata = {
"filename": file.filename,
"doc_type": doc_type,
"upload_date": datetime.now().isoformat(),
"word_count": str(info["word_count"]),
}
num_chunks = rag.add_document(chunks, metadata, user_id=user_id)
return {
"success": True,
"already_indexed": False,
"filename": file.filename,
"doc_type": doc_type,
"text_length": len(text),
"word_count": info["word_count"],
"num_chunks": num_chunks,
"message": f"{file.filename} procesado: {info['word_count']:,} palabras, {num_chunks} chunks",
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/documents")
async def list_documents(user_id: str = Depends(get_user_or_session_id)):
"""List all indexed documents for user."""
rag = state.get_rag()
stats = rag.get_stats(user_id=user_id)
return {
"documents": stats["documents"],
"total_documents": stats["total_documents"],
"total_chunks": stats["total_chunks"],
}
@app.delete("/api/documents/{filename}")
async def delete_document(
filename: str,
user_id: str = Depends(get_user_or_session_id)
):
"""Delete a document from the index."""
try:
rag = state.get_rag()
rag.delete_document(filename, user_id=user_id)
return {"success": True, "message": f"{filename} eliminado"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ======================== ROUTES: EXPORT ========================
@app.post("/api/export")
async def export_content(request: ExportRequest):
"""Export a single message/content to PDF, DOCX, HTML, or TXT."""
fmt = request.format.lower()
filename = get_smart_filename(request.content, fmt)
try:
if fmt == "pdf":
data = export_to_pdf(request.content)
mime = "application/pdf"
elif fmt == "docx":
data = export_to_docx(request.content)
mime = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
elif fmt == "html":
data = export_to_html(request.content)
mime = "text/html"
elif fmt == "txt":
data = export_to_txt(request.content)
mime = "text/plain"
else:
raise HTTPException(status_code=400, detail=f"Unsupported format: {fmt}")
return Response(
content=data,
media_type=mime,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/export/conversation")
async def export_conversation(request: ConversationExportRequest):
"""Export full conversation history."""
fmt = request.format.lower()
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
filename = f"CareerAI_Chat_{timestamp}.{fmt}"
try:
if fmt == "pdf":
data = export_conversation_to_pdf(request.messages)
mime = "application/pdf"
elif fmt == "docx":
data = export_conversation_to_docx(request.messages)
mime = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
elif fmt == "html":
data = export_conversation_to_html(request.messages)
mime = "text/html"
else:
raise HTTPException(status_code=400, detail=f"Unsupported format: {fmt}")
return Response(
content=data,
media_type=mime,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ======================== ROUTES: DETECT MODE ========================
@app.get("/api/detect-mode")
async def detect_mode(query: str = Query(...)):
"""Auto-detect the best assistant mode for a query."""
if not state.api_configured:
return {"mode": "general"}
mode = state.assistant.detect_mode(query)
return {"mode": mode}
# ======================== ROUTES: DASHBOARD ========================
@app.get("/api/dashboard")
async def dashboard_data(user_id: str = Depends(get_user_or_session_id)):
"""Extract profile data from documents for dashboard charts and insights."""
if not state.api_configured:
return {
"has_data": False,
"error": "API not configured",
}
rag = state.get_rag()
all_text = rag.get_all_text(user_id=user_id)
if not all_text.strip():
return {
"has_data": False,
"error": "No documents indexed",
}
try:
# Extract profile from documents
profile = extract_profile_from_text(all_text, state.assistant.llm)
skills = profile.get("skills", [])
experience = profile.get("experience", [])
summary = profile.get("summary", {})
# Build chart data
cat_data = skills_by_category(skills)
level_data = skills_by_level(skills)
timeline = experience_for_timeline(experience)
# Generate insights
insights = generate_dashboard_insights(profile, state.assistant.llm)
return {
"has_data": True,
"summary": summary,
"skills": skills,
"skills_by_category": cat_data,
"skills_by_level": level_data,
"experience_timeline": timeline,
"insights": insights,
"total_skills": len(skills),
"total_experience": len(experience),
}
except Exception as e:
return {
"has_data": False,
"error": str(e),
}
# ======================== HEALTH ========================
@app.get("/api/health")
async def health():
return {
"status": "ok",
"timestamp": datetime.now().isoformat(),
"api_configured": state.api_configured,
"model": state.model,
}
# ======================== RUN ========================
if __name__ == "__main__":
import uvicorn
uvicorn.run("api:app", host="0.0.0.0", port=8000, reload=True)