# app.py
import os
import re
import uvicorn
import uuid
import json
import base64
import unicodedata
from datetime import datetime
from typing import List, Optional, Dict, Any, Tuple
from dotenv import load_dotenv
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request, Query, Body
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse
# Carrega variáveis do .env
load_dotenv(override=True)
# Módulos do projeto
from models_schemas import AnalyzeResponse, AnalyzeBatchRequest
from llm_client import analyze_cv_with_llm
from parsers import extract_text_from_pdf
# -----------------------------------------------------------------------------
# Config dinâmica (pesos do score)
# -----------------------------------------------------------------------------
def _get_weights() -> Tuple[float, float]:
try:
fit_w = float(os.getenv("FIT_WEIGHT", "0.7"))
except ValueError:
fit_w = 0.7
fit_w = max(0.0, min(1.0, fit_w))
base_w = 1.0 - fit_w
return fit_w, base_w
# -----------------------------------------------------------------------------
# Helpers de persistência (usa /data)
# -----------------------------------------------------------------------------
DATA_DIR = os.path.join(os.getcwd(), "data")
JOBS_PATH = os.path.join(DATA_DIR, "jobs.json")
CVS_PATH = os.path.join(DATA_DIR, "cvs.json")
def _ensure_data_dir():
os.makedirs(DATA_DIR, exist_ok=True)
def _read_json(path: str):
_ensure_data_dir()
if not os.path.exists(path):
return []
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return []
def _write_json(path: str, data: Any):
_ensure_data_dir()
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# --- Jobs --------------------------------------------------------------------
def _list_jobs() -> List[Dict[str, Any]]:
data = _read_json(JOBS_PATH)
return data if isinstance(data, list) else [data]
def _write_jobs(jobs: List[Dict[str, Any]]):
_write_json(JOBS_PATH, jobs)
def _get_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Aceita tanto UUID quanto IDs numéricos/strings (como '1', 2)."""
if job_id is None:
return None
job_id_str = str(job_id).strip()
for j in _list_jobs():
if str(j.get("id")).strip() == job_id_str:
return j
return None
def _create_job(
title: str, description: str, details: str, requirements: Optional[List[str]] = None
) -> Dict[str, Any]:
jobs = _list_jobs()
# Se o caller passar um id fixo no payload, respeitamos. Senão, UUID.
rec = {
"id": str(uuid.uuid4()),
"title": title,
"description": description,
"details": details,
"requirements": requirements or [],
"created_at": datetime.utcnow().isoformat(),
}
jobs.append(rec)
_write_jobs(jobs)
return rec
def _update_job(job_id: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
jobs = _list_jobs()
job_id_str = str(job_id).strip()
updated = None
for j in jobs:
if str(j.get("id")).strip() == job_id_str:
# Atualiza somente campos permitidos
if "title" in data: j["title"] = (data["title"] or "").strip()
if "description" in data: j["description"] = (data["description"] or "").strip()
if "details" in data: j["details"] = (data["details"] or "").strip()
if "requirements" in data:
reqs = data["requirements"]
if isinstance(reqs, str):
reqs = [r.strip() for r in reqs.split(",") if r.strip()]
j["requirements"] = reqs or []
updated = j
break
if updated is not None:
_write_jobs(jobs)
return updated
def _delete_job(job_id: str) -> bool:
jobs = _list_jobs()
new_jobs = [j for j in jobs if str(j.get("id")).strip() != str(job_id).strip()]
if len(new_jobs) == len(jobs):
return False
_write_jobs(new_jobs)
return True
# --- CVs ---------------------------------------------------------------------
def _list_cvs() -> List[Dict[str, Any]]:
data = _read_json(CVS_PATH)
return data if isinstance(data, list) else [data]
def _write_cvs(cvs: List[Dict[str, Any]]):
_write_json(CVS_PATH, cvs)
def _save_cv_result(result: Dict[str, Any], job: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
cvs = _list_cvs()
rec = {
"id": str(uuid.uuid4()),
"name": result.get("name") or "",
"area": result.get("area") or "",
"summary": result.get("summary") or "",
"skills": result.get("skills") or [],
"education": result.get("education") or "",
"interview_questions": result.get("interview_questions") or [],
"strengths": result.get("strengths") or [],
"areas_for_development": result.get("areas_for_development") or [],
"important_considerations": result.get("important_considerations") or [],
"final_recommendations": result.get("final_recommendations") or "",
"score": float(result.get("score") or 0.0), # score base do LLM (0..10 ou 0..100)
"created_at": datetime.utcnow().isoformat(),
"job_id": job.get("id") if job else None,
"job_title": job.get("title") if job else None,
}
cvs.append(rec)
_write_cvs(cvs)
return rec
def _get_cv(cv_id: str) -> Optional[Dict[str, Any]]:
for c in _list_cvs():
if str(c.get("id")) == str(cv_id):
return c
return None
def _delete_cv(cv_id: str) -> bool:
cvs = _list_cvs()
new_cvs = [c for c in cvs if str(c.get("id")) != str(cv_id)]
if len(new_cvs) == len(cvs):
return False
_write_cvs(new_cvs)
return True
# -----------------------------------------------------------------------------
# Normalização / Match (fit por requisitos) — mais robusto
# -----------------------------------------------------------------------------
CANON_EQUIV = {
"js": "javascript",
"nodejs": "node",
"node.js": "node",
"typescript": "ts",
"postgre": "postgresql",
"postgres": "postgresql",
"rest": "api",
"apis": "api",
"api rest": "api",
"apis rest": "api",
"ci/cd": "cicd",
"ci cd": "cicd",
"ci": "cicd",
"cd": "cicd",
"docker compose": "docker",
"k8s": "kubernetes",
}
def _deaccent(s: str) -> str:
if not s:
return ""
nfkd = unicodedata.normalize("NFKD", s)
return "".join([c for c in nfkd if not unicodedata.combining(c)])
def _clean(s: str) -> str:
s = _deaccent(s.lower())
s = re.sub(r"[^a-z0-9+#./ ]+", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def _canonize(s: str) -> str:
s = _clean(s)
s = s.replace(".", " ")
return CANON_EQUIV.get(s, s)
def _tokenize_rich(text_or_array: Any) -> List[str]:
parts = text_or_array if isinstance(text_or_array, list) else [text_or_array]
bag: set = set()
for p in parts:
t = _canonize(str(p or ""))
if not t:
continue
words = [w for w in t.split(" ") if w]
# termos individuais
for w in words:
bag.add(CANON_EQUIV.get(w, w))
# bigramas
for i in range(len(words) - 1):
bi = f"{words[i]} {words[i+1]}".strip()
if len(bi) > 2:
bag.add(CANON_EQUIV.get(bi, bi))
# versão colada (ex.: "ci cd" -> "cicd")
if len(words) > 1:
bag.add("".join(words))
return list(bag)
def _includes_match(a: str, b: str) -> bool:
return a == b or a.find(b) >= 0 or b.find(a) >= 0
def requirement_fit(requirements: List[str], candidate: Dict[str, Any]) -> int:
"""
Fit = % de requisitos batidos no perfil (skills + area + summary).
"""
reqs = [_canonize(r) for r in (requirements or []) if str(r).strip()]
if not reqs:
return 0
candidate_bag = set(
_tokenize_rich(
(candidate.get("skills") or [])
+ [candidate.get("area") or "", candidate.get("summary") or ""]
)
)
hits = 0
for req in reqs:
req_tokens = set(_tokenize_rich(req))
ok = any(_includes_match(ct, rt) for rt in req_tokens for ct in candidate_bag)
if ok:
hits += 1
return round(100 * (hits / len(reqs))) if reqs else 0
def normalize_base_score(score: float) -> int:
"""
Normaliza score base vindo do LLM:
- Se <= 10, assume escala 0..10 e multiplica por 10.
- Garante inteiro de 0..100.
"""
try:
n = float(score)
except Exception:
n = 0.0
if n <= 10.0:
n *= 10.0
return max(0, min(100, round(n)))
def combined_score(base_score: float, fit: int) -> int:
base = normalize_base_score(base_score)
fw, bw = _get_weights()
return max(0, min(100, round(fw * fit + bw * base)))
# -----------------------------------------------------------------------------
# App & Middlewares
# -----------------------------------------------------------------------------
app = FastAPI(
title="RecrAI API",
version="1.4.0",
description="API de triagem e análise de currículos com LLM (Groq/local) compatível com o front.",
)
# CORS: regex para localhost e 127.0.0.1, além de permitir domínio público (via env)
app.add_middleware(
CORSMiddleware,
allow_origin_regex=r"https?://(localhost|127\.0\.0\.1)(:\d+)?$",
allow_origins=[os.getenv("ALLOWED_ORIGIN", "https://viniciuskhan-recrai-backend.hf.space")],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# -----------------------------------------------------------------------------
# Rotas auxiliares (home, docs redirect, health, info)
# -----------------------------------------------------------------------------
@app.get("/", include_in_schema=False)
def home():
"""Página inicial simples."""
html = """
RecrAI API
RecrAI API
API online ✅ — documentação: /docs | /redoc
Endpoints
GET /health
GET /info
GET /jobs • GET /jobs/{id} • POST /jobs • PUT /jobs/{id} • DELETE /jobs/{id}
GET /jobs/{job_id}/candidates (ranking por vaga)
GET /jobs/{job_id}/fit/{cv_id} (fit de um talento para a vaga)
GET /cvs [ ?job_id=... ] • GET /cvs/{id} • DELETE /cvs/{id}
POST /analyze_cv (1 currículo)
POST /analyze_cv_batch_multipart (vários PDFs)
POST /analyze_cv_batch (JSON)
"""
return HTMLResponse(content=html, status_code=200)
@app.get("/docs-redirect", include_in_schema=False)
def docs_redirect():
return RedirectResponse(url="/docs")
@app.get("/health")
def health():
return {"status": "ok"}
@app.get("/info")
def info():
"""Informações não sensíveis do runtime."""
fw, bw = _get_weights()
return {
"app": "RecrAI API",
"version": "1.4.0",
"provider": os.getenv("PROVIDER", "groq"),
"model_id": os.getenv("GROQ_MODEL_ID", "deepseek-r1-distill-llama-70b"),
"temperature": float(os.getenv("TEMPERATURE", "0.7")),
"env": "spaces" if os.getenv("HF_SPACE_ID") else "local",
"fit_weight": fw,
"base_weight": bw,
}
# -----------------------------------------------------------------------------
# Handlers de erro (melhor DX)
# -----------------------------------------------------------------------------
@app.exception_handler(HTTPException)
async def http_exception_handler(_: Request, exc: HTTPException):
return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail})
@app.exception_handler(Exception)
async def unhandled_exception_handler(_: Request, exc: Exception):
# Evita vazar stacktrace pro cliente, mas loga no servidor
print(f"[ERROR] {exc}")
return JSONResponse(status_code=500, content={"detail": "Erro interno do servidor."})
# -----------------------------------------------------------------------------
# Jobs (CRUD total no jobs.json)
# -----------------------------------------------------------------------------
@app.get("/jobs", summary="Lista vagas cadastradas")
def list_jobs():
return _list_jobs()
@app.get("/jobs/{job_id}", summary="Detalhe de uma vaga")
def get_job(job_id: str):
job = _get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Vaga não encontrada.")
return job
@app.post("/jobs", summary="Cria uma vaga")
def create_job(payload: Dict[str, Any] = Body(...)):
title = (payload.get("title") or "").strip()
description = (payload.get("description") or "").strip()
details = (payload.get("details") or "").strip()
requirements = payload.get("requirements") or []
if not title or not description or not details:
raise HTTPException(status_code=400, detail="Campos obrigatórios: title, description, details.")
if isinstance(requirements, str):
requirements = [r.strip() for r in requirements.split(",") if r.strip()]
rec = _create_job(title, description, details, requirements)
return {"message": "Vaga criada com sucesso.", "job": rec}
@app.put("/jobs/{job_id}", summary="Atualiza uma vaga")
def update_job(job_id: str, payload: Dict[str, Any] = Body(...)):
job = _update_job(job_id, payload or {})
if not job:
raise HTTPException(status_code=404, detail="Vaga não encontrada.")
return {"message": "Vaga atualizada com sucesso.", "job": job}
@app.delete("/jobs/{job_id}", summary="Exclui uma vaga")
def delete_job(job_id: str):
ok = _delete_job(job_id)
if not ok:
raise HTTPException(status_code=404, detail="Vaga não encontrada.")
return {"message": "Vaga excluída com sucesso."}
# Ranking e fit
@app.get("/jobs/{job_id}/candidates", summary="Ranking de talentos para a vaga")
def job_candidates_ranking(
job_id: str,
order_by: str = Query("combined", description="combined|fit|score"),
desc: bool = Query(True),
limit: int = Query(50),
):
job = _get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Vaga não encontrada.")
cvs = _list_cvs()
fw, bw = _get_weights()
enriched = []
for cv in cvs:
base = normalize_base_score(cv.get("score", 0.0))
fit = requirement_fit(job.get("requirements") or [], cv)
comb = combined_score(base, fit)
enriched.append(
{
**cv,
"score_base": base,
"fit": fit,
"combined": comb,
"fit_weight": fw,
"base_weight": bw,
}
)
key = {"combined": "combined", "fit": "fit", "score": "score_base"}.get(order_by, "combined")
enriched.sort(key=lambda x: x.get(key, 0), reverse=bool(desc))
if limit and limit > 0:
enriched = enriched[:limit]
return {"job": job, "candidates": enriched}
@app.get("/jobs/{job_id}/fit/{cv_id}", summary="Fit de um talento para uma vaga")
def job_fit_for_candidate(job_id: str, cv_id: str):
job = _get_job(job_id)
cv = _get_cv(cv_id)
if not job or not cv:
raise HTTPException(status_code=404, detail="Vaga ou talento não encontrado.")
base = normalize_base_score(cv.get("score", 0.0))
fit = requirement_fit(job.get("requirements") or [], cv)
return {
"job": job,
"candidate": {"id": cv["id"], "name": cv.get("name", "")},
"score_base": base,
"fit": fit,
"combined": combined_score(base, fit),
"weights": {"fit_weight": _get_weights()[0], "base_weight": _get_weights()[1]},
}
# -----------------------------------------------------------------------------
# CVs (listagem/detalhe/remoção)
# -----------------------------------------------------------------------------
@app.get("/cvs", summary="Lista currículos analisados")
def list_cvs(job_id: Optional[str] = Query(None, description="Filtra CVs associados a uma vaga")):
cvs = _list_cvs()
if job_id is not None:
cvs = [c for c in cvs if str(c.get("job_id")) == str(job_id)]
out = []
for c in cvs:
out.append(
{
"id": c.get("id"),
"name": c.get("name", ""),
"area": c.get("area", ""),
"summary": c.get("summary", ""),
"score": c.get("score", 0.0),
"created_at": c.get("created_at"),
"job_id": c.get("job_id"),
"job_title": c.get("job_title"),
}
)
return out
@app.get("/cvs/{cv_id}", summary="Detalhe do currículo analisado", response_model=AnalyzeResponse)
def get_cv(cv_id: str):
c = _get_cv(cv_id)
if not c:
raise HTTPException(status_code=404, detail="Currículo não encontrado.")
return AnalyzeResponse(**c)
@app.delete("/cvs/{cv_id}", summary="Exclui um currículo do banco")
def delete_cv(cv_id: str):
ok = _delete_cv(cv_id)
if not ok:
raise HTTPException(status_code=404, detail="Currículo não encontrado.")
return {"message": "Currículo excluído com sucesso."}
# -----------------------------------------------------------------------------
# Analyze (single) — aceita job_id OU job texto
# -----------------------------------------------------------------------------
@app.post("/analyze_cv", response_model=AnalyzeResponse, summary="Analisa um currículo (PDF ou texto)")
@app.post("/analyze_cv/") # aceita barra final também
async def analyze_cv_endpoint(
job: Optional[str] = Form(None, description="Descrição completa da vaga (alternativa a job_id)"),
job_id: Optional[str] = Form(None, description="ID de uma vaga previamente criada"),
cv_text: Optional[str] = Form(None, description="Texto do currículo (alternativa a PDF)"),
file: Optional[UploadFile] = File(None, description="Arquivo PDF do currículo"),
):
"""
Envie **cv_text** OU **file** (PDF). Se enviar ambos, retorna 422.
Informe **job_id** (id existente em /jobs) OU **job** (texto). Se nenhum for informado, a análise é genérica.
"""
if bool(cv_text) == bool(file):
raise HTTPException(status_code=422, detail="Envie apenas um: 'cv_text' OU 'file' (PDF).")
# Resolve job details
job_rec = None
job_details = "Vaga não especificada."
if job_id:
job_rec = _get_job(job_id)
if not job_rec:
raise HTTPException(status_code=404, detail="job_id não encontrado.")
job_details = (
f"**Vaga: {job_rec.get('title')}**\n\n"
f"**Descrição:**\n{job_rec.get('description')}\n\n"
f"**Detalhes:**\n{job_rec.get('details')}\n\n"
f"**Requisitos:**\n{', '.join(job_rec.get('requirements') or [])}"
)
elif job:
job_details = job
# Extrai texto do PDF se necessário
if file:
if not file.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=415, detail="Apenas PDF é suportado no 'file'.")
pdf_bytes = await file.read()
cv_text = extract_text_from_pdf(pdf_bytes)
if not cv_text or not cv_text.strip():
raise HTTPException(status_code=422, detail="Não foi possível extrair texto do currículo.")
# Chama LLM
try:
result = analyze_cv_with_llm(cv_text=cv_text, job_details=job_details)
except Exception as e:
msg = str(e)
if "GROQ_API_KEY" in msg or "ausente" in msg.lower():
raise HTTPException(status_code=503, detail="Provider indisponível: verifique GROQ_API_KEY/PROVIDER.")
raise
saved = _save_cv_result(result.dict(), job=job_rec)
return AnalyzeResponse(**saved)
# -----------------------------------------------------------------------------
# Analyze (batch) — multipart: files[] (compatível com o front)
# -----------------------------------------------------------------------------
@app.post("/analyze_cv_batch_multipart", summary="Analisa vários PDFs via multipart (files[])")
@app.post("/analyze_cv_batch_multipart/") # aceita barra final
async def analyze_cv_batch_multipart(
job_id: Optional[str] = Form(None),
job: Optional[str] = Form(None),
files: List[UploadFile] = File(...),
):
job_rec = _get_job(job_id) if job_id else None
if job_id and not job_rec:
raise HTTPException(status_code=404, detail="job_id não encontrado.")
if not files:
raise HTTPException(status_code=400, detail="Envie 'files[]' com pelo menos 1 PDF.")
if job_rec:
job_details = (
f"**Vaga: {job_rec.get('title')}**\n\n"
f"**Descrição:**\n{job_rec.get('description')}\n\n"
f"**Detalhes:**\n{job_rec.get('details')}\n\n"
f"**Requisitos:**\n{', '.join(job_rec.get('requirements') or [])}"
)
else:
job_details = job or "Vaga não especificada."
results = []
for f in files:
if not f.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=415, detail=f"Apenas PDF é suportado. Arquivo inválido: {f.filename}")
pdf_bytes = await f.read()
cv_text = extract_text_from_pdf(pdf_bytes)
if not cv_text.strip():
continue
try:
res = analyze_cv_with_llm(cv_text=cv_text, job_details=job_details)
except Exception as e:
msg = str(e)
if "GROQ_API_KEY" in msg or "ausente" in msg.lower():
raise HTTPException(status_code=503, detail="Provider indisponível: verifique GROQ_API_KEY/PROVIDER.")
raise
saved = _save_cv_result(res.dict(), job=job_rec)
results.append(saved)
return {"message": f"Analisados {len(results)} currículo(s).", "results": results}
# -----------------------------------------------------------------------------
# Analyze (batch) — JSON (compat.)
# -----------------------------------------------------------------------------
@app.post("/analyze_cv_batch", response_model=List[AnalyzeResponse], summary="Analisa múltiplos currículos (JSON)")
async def analyze_cv_batch_endpoint(payload: AnalyzeBatchRequest):
results: List[AnalyzeResponse] = []
for item in payload.items:
if not item.cv_text and not item.cv_pdf_b64:
raise HTTPException(status_code=400, detail="Cada item precisa de cv_text ou cv_pdf_b64.")
cv_text = item.cv_text
if not cv_text and item.cv_pdf_b64:
try:
pdf_bytes = base64.b64decode(item.cv_pdf_b64)
cv_text = extract_text_from_pdf(pdf_bytes)
except Exception:
raise HTTPException(status_code=422, detail="cv_pdf_b64 inválido (base64).")
if not cv_text or not cv_text.strip():
raise HTTPException(status_code=422, detail="Não foi possível extrair texto de um dos currículos.")
try:
res = analyze_cv_with_llm(cv_text=cv_text, job_details=item.job or "Vaga não especificada.")
except Exception as e:
msg = str(e)
if "GROQ_API_KEY" in msg or "ausente" in msg.lower():
raise HTTPException(status_code=503, detail="Provider indisponível: verifique GROQ_API_KEY/PROVIDER.")
raise
saved = _save_cv_result(res.dict(), job=None)
results.append(AnalyzeResponse(**saved))
return results
# --- DEBUG: listar rotas no startup ---
@app.on_event("startup")
async def _print_routes():
# Não cria nenhuma vaga automaticamente — jobs.json é a fonte da verdade.
print("\n=== ROTAS CARREGADAS ===")
for r in app.router.routes:
methods = getattr(r, "methods", [])
path = getattr(r, "path", "")
if methods and path:
print(f"{sorted(list(methods))} {path}")
print("========================\n")
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
if __name__ == "__main__":
# Você também pode rodar: uvicorn app:app --reload --port 7860 --env-file .env
uvicorn.run("app:app", host=os.getenv("HOST", "0.0.0.0"), port=int(os.getenv("PORT", "7860")))