Spaces:
Runtime error
Runtime error
File size: 5,142 Bytes
6cb456f 434bd0b b8e9f2b 9b636da abcbaa7 733d2db b8e9f2b 434bd0b 15f88b7 b8e9f2b f0110ec b8e9f2b 6cb456f 9b636da abcbaa7 9b636da abcbaa7 b8e9f2b abcbaa7 7d48373 b8e9f2b a7b1d6f b8e9f2b 733d2db 6dbe3b3 b8e9f2b 733d2db b8e9f2b 733d2db 0287eeb e5129b2 733d2db e5129b2 733d2db b8e9f2b 746a838 0ed7385 746a838 f0110ec ab38f64 f0110ec 0ed7385 f0110ec 5a5766c f0110ec b8e9f2b cdb0794 0376f20 cdb0794 0376f20 cdb0794 15f88b7 cdb0794 0376f20 f0110ec cdb0794 0376f20 cdb0794 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
from fastapi import FastAPI, File, UploadFile, Form
from pydantic import BaseModel
from typing import List
from pathlib import Path
import shutil
import tempfile
import os
import uuid
from langchain_docling import DoclingLoader
from langchain_docling.loader import ExportType
app = FastAPI()
resumes = []
jobs = []
scoring = []
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
@app.post("/upload")
async def upload_file(file: UploadFile = File(...), type: str = Form(...)):
# print(file)
# file_path = Path(file.filename)
# with file_path.open("wb") as buffer:
# shutil.copyfileobj(file.file, buffer)
# with tempfile.NamedTemporaryFile(delete=False, suffix=file.filename) as temp_file:
# # Efficiently write the uploaded file's content to the temporary file
# contents = await file.read()
# temp_file.write(contents)
# temp_file_path = temp_file.name
suffix = os.path.splitext(file.filename)[-1] or ".pdf"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, dir="/tmp") as tmp:
shutil.copyfileobj(file.file, tmp)
tmp_path = tmp.name
# At this point, tmp_path is a real file path in /tmp
# Debug: check if file is valid
size = os.path.getsize(tmp_path)
print(f"Saved {file.filename} -> {tmp_path} ({size} bytes)")
print("[TMP PATH]", str(tmp_path))
loader = DoclingLoader(file_path="" + str(tmp_path), export_type=ExportType.MARKDOWN)
docs = loader.load()
# docs = docs.model_dump()
result = docs[0].model_dump()
result["id"] = str(uuid.uuid4())
if type == "resume":
resumes.append(result)
elif type == "job":
jobs.append(result)
return {
"code":201,
"message":"Request was successful.",
"data": result
}
@app.get("/jobs")
def get_jobs():
return {
"code":200,
"message":"Request was successful.",
"data": jobs
}
@app.get("/resumes")
def get_resumes():
return {
"code":200,
"message":"Request was successful.",
"data": resumes
}
@app.get("/scoring/")
async def get_scoring():
# resume_ids = [x["id"] for x in resumes]
# job_ids = [x["id"] for x in jobs]
score_resume_ids = [x["resume_id"] for x in scoring]
score_job_ids = [x["job_id"] for x in scoring]
for resume in resumes:
for job in jobs:
if resume["id"] not in score_resume_ids or job["id"] not in score_job_ids:
rank_score = process_input(job["page_content"], [resume["page_content"]])
suggest_score = process_input_suggestion(resume["page_content"], [job["page_content"]])
scoring.append({
"resume_id":resume["id"],
"job_id":job["id"],
"rank_score":rank_score,
"suggestion_score":suggest_score
})
return {
"code":200,
"message":"Request was successful.",
"data": scoring
}
# class InputResume(BaseModel):
# content: str
# @app.post("/suggest/")
# async def suggestion(data: InputResume):
# return {
# "code":201,
# "message":"Request was successful.",
# "data": InputResume.model_dump_json()
# }
from ranker import rank_resume
from embeddings import rank_jobs
# Function to wrap the existing rank_resume
def process_input(job_description, resumes):
print("[JOB DESC]", job_description)
print("[RESUMES]", resumes)
resumes = [r for r in resumes if r and r.strip() != ""] # Remove empty
if not job_description.strip() or not resumes:
return "Please provide both job description and at least one resume."
return rank_resume(job_description, resumes)[1]
def process_input_suggestion(resume, job_descriptions):
# print("[JOB DESC]", job_description)
# print("[RESUMES]", resumes)
# resumes = [r for r in resumes if r and r.strip() != ""] # Remove empty
# if not job_description.strip() or not resumes:
# return "Please provide both resume and at least one job description."
return rank_jobs(job_descriptions, resume)[1]
# results = zip(*rank_jobs(resumes, job_description))
# formatted_output = ""
# for i, (resume, score) in enumerate(results, 1):
# formatted_output += f"Job #{i}:\nScore: {score:.2f}\nJob Description Snippet: {resume[:200]}...\n\n-------\n\n"
# return formatted_output
app.get("/")
def read_root():
return {"message": "Hello, World!"}
class InputData(BaseModel):
resumes: List[str]
job_description: str
class InputData2(BaseModel):
job_descriptions: List[str]
resume: str
# class InputData3(BaseModel):
# job_descriptions: List[str]
# resumes: List[str]
@app.post("/rank/")
async def process_data(data: InputData):
return dict(scores=process_input(data.job_description, data.resumes))
@app.post("/suggest/")
async def suggestion(data: InputData2):
return {
"scores":process_input_suggestion(data.resume, data.job_descriptions)
}
|