import io import os import requests import pandas as pd import logging from fastapi import FastAPI, UploadFile, File, HTTPException from pypdf import PdfReader from docx import Document # ========================= # LOGGING # ========================= logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger(__name__) # ========================= # ENV # ========================= N8N_WEBHOOK_URL = os.getenv("N8N_WEBHOOK_URL") if not N8N_WEBHOOK_URL: raise RuntimeError("❌ N8N_WEBHOOK_URL not set") logger.info(f"✅ N8N_WEBHOOK_URL = {N8N_WEBHOOK_URL}") # ========================= # APP # ========================= app = FastAPI(title="HF Resume Ingestion Service") # ========================= # UTILITIES # ========================= def normalize_resume_url(url: str) -> str: """ Convert Google Drive share links to direct download URLs. """ if "drive.google.com" in url and "/file/d/" in url: file_id = url.split("/file/d/")[1].split("/")[0] return f"https://drive.google.com/uc?export=download&id={file_id}" return url def download_file(url: str) -> bytes: url = normalize_resume_url(url) logger.info(f"⬇️ Downloading resume (normalized): {url}") r = requests.get(url, timeout=30) r.raise_for_status() return r.content def extract_text_from_pdf(data: bytes) -> str: reader = PdfReader(io.BytesIO(data)) return "\n".join(page.extract_text() or "" for page in reader.pages) def extract_text_from_docx(data: bytes) -> str: doc = Document(io.BytesIO(data)) return "\n".join(p.text for p in doc.paragraphs) def extract_resume_text(resume_url: str) -> str: """ Detect file type by content signature, not URL. """ data = download_file(resume_url) # PDF signature if data[:4] == b"%PDF": return extract_text_from_pdf(data) # DOCX signature (ZIP) if data[:2] == b"PK": return extract_text_from_docx(data) raise ValueError("Unsupported resume format (only pdf/docx)") def basic_skill_extraction(text: str) -> list[str]: """ Lightweight heuristic skill extraction (NO AI evaluation here). """ COMMON_SKILLS = [ "python", "java", "javascript", "react", "node", "firebase", "sql", "mongodb", "docker", "aws", "git", "linux" ] text_lower = text.lower() return sorted({skill for skill in COMMON_SKILLS if skill in text_lower}) def send_to_n8n(payload: dict): logger.info("➡️ Sending payload to n8n") logger.info(payload) try: r = requests.post( N8N_WEBHOOK_URL, json=payload, timeout=15, ) logger.info(f"⬅️ n8n response status: {r.status_code}") logger.info(f"⬅️ n8n response body: {r.text}") r.raise_for_status() except requests.exceptions.RequestException as e: logger.error("❌ Failed to send data to n8n") raise RuntimeError(str(e)) # ========================= # API # ========================= @app.post("/upload-excel") async def upload_excel(file: UploadFile = File(...)): logger.info(f"📂 Received file: {file.filename}") if not file.filename.endswith(".xlsx"): raise HTTPException(status_code=400, detail="Only .xlsx supported") # FIXED: wrap bytes in BytesIO content = await file.read() df = pd.read_excel(io.BytesIO(content)) required_columns = {"name", "email", "phone", "jobId", "resume_url"} if not required_columns.issubset(df.columns): raise HTTPException( status_code=400, detail=f"Invalid Excel format. Required columns: {required_columns}", ) report = [] for index, row in df.iterrows(): email = str(row.get("email")) logger.info(f"👤 Processing row {index + 1}: {email}") try: resume_text = extract_resume_text(str(row["resume_url"])) skills = basic_skill_extraction(resume_text) payload = { "candidate": { "name": str(row["name"]), "email": email, "phone": str(row["phone"]), "jobId": str(row["jobId"]), }, "parsed": { "skills": skills, "resumeText": resume_text[:6000], # safety cap }, } send_to_n8n(payload) report.append({ "email": email, "status": "sent", }) except Exception as e: logger.error(f"❌ Error processing {email}: {e}") report.append({ "email": email, "status": "failed", "error": str(e), }) return { "total": len(df), "processed": len(report), "results": report, }