Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, File, UploadFile, Form | |
| from pydantic import BaseModel | |
| from typing import List | |
| from pathlib import Path | |
| import shutil | |
| import tempfile | |
| import os | |
| import uuid | |
| from langchain_docling import DoclingLoader | |
| from langchain_docling.loader import ExportType | |
| app = FastAPI() | |
| resumes = [] | |
| jobs = [] | |
| scoring = [] | |
| UPLOAD_DIR = Path("uploads") | |
| UPLOAD_DIR.mkdir(exist_ok=True) | |
| async def upload_file(file: UploadFile = File(...), type: str = Form(...)): | |
| # print(file) | |
| # file_path = Path(file.filename) | |
| # with file_path.open("wb") as buffer: | |
| # shutil.copyfileobj(file.file, buffer) | |
| # with tempfile.NamedTemporaryFile(delete=False, suffix=file.filename) as temp_file: | |
| # # Efficiently write the uploaded file's content to the temporary file | |
| # contents = await file.read() | |
| # temp_file.write(contents) | |
| # temp_file_path = temp_file.name | |
| suffix = os.path.splitext(file.filename)[-1] or ".pdf" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, dir="/tmp") as tmp: | |
| shutil.copyfileobj(file.file, tmp) | |
| tmp_path = tmp.name | |
| # At this point, tmp_path is a real file path in /tmp | |
| # Debug: check if file is valid | |
| size = os.path.getsize(tmp_path) | |
| print(f"Saved {file.filename} -> {tmp_path} ({size} bytes)") | |
| print("[TMP PATH]", str(tmp_path)) | |
| loader = DoclingLoader(file_path="" + str(tmp_path), export_type=ExportType.MARKDOWN) | |
| docs = loader.load() | |
| # docs = docs.model_dump() | |
| result = docs[0].model_dump() | |
| result["id"] = str(uuid.uuid4()) | |
| if type == "resume": | |
| resumes.append(result) | |
| elif type == "job": | |
| jobs.append(result) | |
| return { | |
| "code":201, | |
| "message":"Request was successful.", | |
| "data": result | |
| } | |
| def get_jobs(): | |
| return { | |
| "code":200, | |
| "message":"Request was successful.", | |
| "data": jobs | |
| } | |
| def get_resumes(): | |
| return { | |
| "code":200, | |
| "message":"Request was successful.", | |
| "data": resumes | |
| } | |
| async def get_scoring(): | |
| # resume_ids = [x["id"] for x in resumes] | |
| # job_ids = [x["id"] for x in jobs] | |
| score_resume_ids = [x["resume_id"] for x in scoring] | |
| score_job_ids = [x["job_id"] for x in scoring] | |
| for resume in resumes: | |
| for job in jobs: | |
| if resume["id"] not in score_resume_ids or job["id"] not in score_job_ids: | |
| rank_score = process_input(job["page_content"], [resume["page_content"]]) | |
| suggest_score = process_input_suggestion(resume["page_content"], [job["page_content"]]) | |
| scoring.append({ | |
| "resume_id":resume["id"], | |
| "job_id":job["id"], | |
| "rank_score":rank_score, | |
| "suggestion_score":suggest_score | |
| }) | |
| return { | |
| "code":200, | |
| "message":"Request was successful.", | |
| "data": scoring | |
| } | |
| # class InputResume(BaseModel): | |
| # content: str | |
| # @app.post("/suggest/") | |
| # async def suggestion(data: InputResume): | |
| # return { | |
| # "code":201, | |
| # "message":"Request was successful.", | |
| # "data": InputResume.model_dump_json() | |
| # } | |
| from ranker import rank_resume | |
| from embeddings import rank_jobs | |
| # Function to wrap the existing rank_resume | |
| def process_input(job_description, resumes): | |
| print("[JOB DESC]", job_description) | |
| print("[RESUMES]", resumes) | |
| resumes = [r for r in resumes if r and r.strip() != ""] # Remove empty | |
| if not job_description.strip() or not resumes: | |
| return "Please provide both job description and at least one resume." | |
| return rank_resume(job_description, resumes)[1] | |
| def process_input_suggestion(resume, job_descriptions): | |
| # print("[JOB DESC]", job_description) | |
| # print("[RESUMES]", resumes) | |
| # resumes = [r for r in resumes if r and r.strip() != ""] # Remove empty | |
| # if not job_description.strip() or not resumes: | |
| # return "Please provide both resume and at least one job description." | |
| return rank_jobs(job_descriptions, resume)[1] | |
| # results = zip(*rank_jobs(resumes, job_description)) | |
| # formatted_output = "" | |
| # for i, (resume, score) in enumerate(results, 1): | |
| # formatted_output += f"Job #{i}:\nScore: {score:.2f}\nJob Description Snippet: {resume[:200]}...\n\n-------\n\n" | |
| # return formatted_output | |
| app.get("/") | |
| def read_root(): | |
| return {"message": "Hello, World!"} | |
| class InputData(BaseModel): | |
| resumes: List[str] | |
| job_description: str | |
| class InputData2(BaseModel): | |
| job_descriptions: List[str] | |
| resume: str | |
| # class InputData3(BaseModel): | |
| # job_descriptions: List[str] | |
| # resumes: List[str] | |
| async def process_data(data: InputData): | |
| return dict(scores=process_input(data.job_description, data.resumes)) | |
| async def suggestion(data: InputData2): | |
| return { | |
| "scores":process_input_suggestion(data.resume, data.job_descriptions) | |
| } | |