CandidateExplorer / services /knowledge /extract_profile.py
ishaq101's picture
[KM-383] [CEX] [AI] Deployment AI Engine / BE
f3bdba1
import time
from uuid import uuid4
from pydantic import Field
from typing import TypedDict, List, Dict, Union
from langchain_core.prompts import ChatPromptTemplate
from services.models.data_model import AIProfile, RawProfile, Profiles
# from services.base.BaseGenerator import BaseAIGenerator, MetadataObservability
from services.base.BaseGenerator_v2 import BaseAIGenerator, MetadataObservability
from services.llms.LLM import model_5mini, model_4o_2
from utils.decorator import trace_runtime
from utils.logger import get_logger
from fastapi import HTTPException, status
logger = get_logger("profile extraction")
from sqlalchemy.ext.asyncio import AsyncSession
from externals.databases.pg_crud import (
get_file_by_filename,
get_profile_by_filename,
mark_file_extracted,
create_profile,
)
from externals.databases.pg_models import CVUser, CVProfile
from utils.logger import get_logger
from externals.storages.azure_blob import download_blob_by_filename
from services.extractor.pdf import extract_text_from_pdf_bytes
logger = get_logger("knowledge.extract")
class KnowledgeExtractService:
def __init__(self, db: AsyncSession, user: CVUser):
self.db = db
self.user = user
@trace_runtime
async def extract(self, filename: str) -> CVProfile:
# 1️⃣ Ambil metadata file
file = await get_file_by_filename(self.db,
filename=filename,
user_id=self.user.user_id)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File '{filename}' not found",
)
if file.is_extracted:
logger.info(f"ℹ️ File already extracted: {filename}")
existing = await get_profile_by_filename(self.db, filename=filename, current_user=self.user)
if existing:
return existing
# 2️⃣ Download PDF
pdf_bytes: bytes = await download_blob_by_filename(
filename=file.filename,
tenant_id=self.user.tenant_id,
user_id=self.user.user_id,
)
# 3️⃣ Extract text
text = await extract_text_from_pdf_bytes(pdf_bytes)
if not text.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="PDF contains no extractable text",
)
# 4️⃣ Build RawProfile
raw_profile = RawProfile(
filename=file.filename,
content_type=file.file_type,
content=text,
profile_id=str(uuid4())
)
# 5️⃣ Extract with AI
ai_profile: AIProfile = await self._extract_with_ai(raw_profile)
if not ai_profile:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AI extraction failed",
)
# 6️⃣ INSERT profile ke cv_profile
profile = await create_profile(
db=self.db,
# user_id=self.user.user_id,
# tenant_id=self.user.tenant_id,
filename=file.filename,
file_id=file.file_id,
profile=ai_profile,
)
# 7️⃣ UPDATE cv_file.is_extracted
await mark_file_extracted(
db=self.db,
file_id=file.file_id,
)
await self.db.commit()
logger.info(
"✅ Profile extracted & persisted",
extra={"context": {
"filename": filename,
"profile_id": profile.profile_id,
}},
)
return profile
@trace_runtime
async def _extract_with_ai(self, raw_profile: RawProfile) -> AIProfile:
"""
Extract structured profile from CV content
"""
try:
extract_one_profile_prompt = """
You are an intelligent information extraction assistant.
Your task is to read the following Curriculum Vitae (CV) text and extract structured information according to the expected output below.
----------------------------
candidate's curriculum vitae:
{cv}
----------------------------
**Expected Output**:
Follow the data schema from AIProfile
**Instructions**:
1. Read the provided CV and extract information needed based on expected output.
2. Be Careful when to extract information for gpa_edu_1, univ_edu_1, major_edu_1 or gpa_edu_2, univ_edu_2, major_edu_2 or gpa_edu_3, univ_edu_3, major_edu_3
..._edu_1 is only for bachelor/undergraduate/sarjana degree
..._edu_2 is only for master/postgraduate degree
..._edu_3 is only for doctor/phd degree
3. Reformat the extracted info using correct word spacing.
4. Do not verbose, just return the final answer.
""".strip()
prompt = ChatPromptTemplate.from_template(extract_one_profile_prompt)
input_llm = {
"cv": raw_profile.get("content")
}
# llm = model_4o_2.with_structured_output(AIProfile)
llm = model_4o_2.with_structured_output(AIProfile)
gen_ai = BaseAIGenerator(
task_name="extract profile",
prompt=prompt,
input_llm=input_llm,
llm=llm,
metadata_observability=MetadataObservability(
fullname=self.user.full_name,
task_id=str(uuid4()),
agent=self.extract.__name__,
user_id=self.user.email,
)
)
result = await gen_ai.agenerate()
# result = asyncio.run(gen_ai.agenerate())
if result:
logger.info(f"✅ extract one profile success")
return result
else:
logger.error(f"""❌ extract one profile failed, something went wrong for profile_id {raw_profile.get("profile_id")}""")
return {}
except Exception as E:
logger.error(f"""❌ extract one profile error for profile_id {raw_profile.get("profile_id")}, {E}""")
return {}
# @trace_runtime
# async def extract_bulk_profile(self, raw_profiles: List[RawProfile]) -> Profiles:
# try:
# profiles = []
# failed_id = []
# for _, cv_text in enumerate(raw_profiles):
# profile = await self.extract(cv_text.get("content"))
# time.sleep(2)
# if profile:
# logger.info(f"✅ extract bulk profile success [{_+1}/{len(raw_profiles)}]")
# profiles.append(profile)
# else:
# logger.info(f"""❌ extract bulk profile error for profile {cv_text.get("profile_id")}, [{_+1}/{len(raw_profiles)}]""")
# profiles[cv_text.get("profile_id")] = profile
# failed_id.append(cv_text.get("profile_id"))
# bulk_profile = Profiles(
# profiles=profiles
# )
# return bulk_profile
# except Exception as E:
# logger.error(f"❌ extract bulk profile error for profile_id {failed_id}, {E}")
# return Profiles(
# profiles=[]
# )