import time from uuid import uuid4 from pydantic import Field from typing import TypedDict, List, Dict, Union from langchain_core.prompts import ChatPromptTemplate from services.models.data_model import AIProfile, RawProfile, Profiles # from services.base.BaseGenerator import BaseAIGenerator, MetadataObservability from services.base.BaseGenerator_v2 import BaseAIGenerator, MetadataObservability from services.llms.LLM import model_5mini, model_4o_2 from utils.decorator import trace_runtime from utils.logger import get_logger from fastapi import HTTPException, status logger = get_logger("profile extraction") from sqlalchemy.ext.asyncio import AsyncSession from externals.databases.pg_crud import ( get_file_by_filename, get_profile_by_filename, mark_file_extracted, create_profile, ) from externals.databases.pg_models import CVUser, CVProfile from utils.logger import get_logger from externals.storages.azure_blob import download_blob_by_filename from services.extractor.pdf import extract_text_from_pdf_bytes logger = get_logger("knowledge.extract") class KnowledgeExtractService: def __init__(self, db: AsyncSession, user: CVUser): self.db = db self.user = user @trace_runtime async def extract(self, filename: str) -> CVProfile: # 1️⃣ Ambil metadata file file = await get_file_by_filename(self.db, filename=filename, user_id=self.user.user_id) if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"File '{filename}' not found", ) if file.is_extracted: logger.info(f"ℹ️ File already extracted: {filename}") existing = await get_profile_by_filename(self.db, filename=filename, current_user=self.user) if existing: return existing # 2️⃣ Download PDF pdf_bytes: bytes = await download_blob_by_filename( filename=file.filename, tenant_id=self.user.tenant_id, user_id=self.user.user_id, ) # 3️⃣ Extract text text = await extract_text_from_pdf_bytes(pdf_bytes) if not text.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="PDF contains no extractable text", ) # 4️⃣ Build RawProfile raw_profile = RawProfile( filename=file.filename, content_type=file.file_type, content=text, profile_id=str(uuid4()) ) # 5️⃣ Extract with AI ai_profile: AIProfile = await self._extract_with_ai(raw_profile) if not ai_profile: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="AI extraction failed", ) # 6️⃣ INSERT profile ke cv_profile profile = await create_profile( db=self.db, # user_id=self.user.user_id, # tenant_id=self.user.tenant_id, filename=file.filename, file_id=file.file_id, profile=ai_profile, ) # 7️⃣ UPDATE cv_file.is_extracted await mark_file_extracted( db=self.db, file_id=file.file_id, ) await self.db.commit() logger.info( "✅ Profile extracted & persisted", extra={"context": { "filename": filename, "profile_id": profile.profile_id, }}, ) return profile @trace_runtime async def _extract_with_ai(self, raw_profile: RawProfile) -> AIProfile: """ Extract structured profile from CV content """ try: extract_one_profile_prompt = """ You are an intelligent information extraction assistant. Your task is to read the following Curriculum Vitae (CV) text and extract structured information according to the expected output below. ---------------------------- candidate's curriculum vitae: {cv} ---------------------------- **Expected Output**: Follow the data schema from AIProfile **Instructions**: 1. Read the provided CV and extract information needed based on expected output. 2. Be Careful when to extract information for gpa_edu_1, univ_edu_1, major_edu_1 or gpa_edu_2, univ_edu_2, major_edu_2 or gpa_edu_3, univ_edu_3, major_edu_3 ..._edu_1 is only for bachelor/undergraduate/sarjana degree ..._edu_2 is only for master/postgraduate degree ..._edu_3 is only for doctor/phd degree 3. Reformat the extracted info using correct word spacing. 4. Do not verbose, just return the final answer. """.strip() prompt = ChatPromptTemplate.from_template(extract_one_profile_prompt) input_llm = { "cv": raw_profile.get("content") } # llm = model_4o_2.with_structured_output(AIProfile) llm = model_4o_2.with_structured_output(AIProfile) gen_ai = BaseAIGenerator( task_name="extract profile", prompt=prompt, input_llm=input_llm, llm=llm, metadata_observability=MetadataObservability( fullname=self.user.full_name, task_id=str(uuid4()), agent=self.extract.__name__, user_id=self.user.email, ) ) result = await gen_ai.agenerate() # result = asyncio.run(gen_ai.agenerate()) if result: logger.info(f"✅ extract one profile success") return result else: logger.error(f"""❌ extract one profile failed, something went wrong for profile_id {raw_profile.get("profile_id")}""") return {} except Exception as E: logger.error(f"""❌ extract one profile error for profile_id {raw_profile.get("profile_id")}, {E}""") return {} # @trace_runtime # async def extract_bulk_profile(self, raw_profiles: List[RawProfile]) -> Profiles: # try: # profiles = [] # failed_id = [] # for _, cv_text in enumerate(raw_profiles): # profile = await self.extract(cv_text.get("content")) # time.sleep(2) # if profile: # logger.info(f"✅ extract bulk profile success [{_+1}/{len(raw_profiles)}]") # profiles.append(profile) # else: # logger.info(f"""❌ extract bulk profile error for profile {cv_text.get("profile_id")}, [{_+1}/{len(raw_profiles)}]""") # profiles[cv_text.get("profile_id")] = profile # failed_id.append(cv_text.get("profile_id")) # bulk_profile = Profiles( # profiles=profiles # ) # return bulk_profile # except Exception as E: # logger.error(f"❌ extract bulk profile error for profile_id {failed_id}, {E}") # return Profiles( # profiles=[] # )