Spaces:
Sleeping
Sleeping
| import time | |
| from uuid import uuid4 | |
| from pydantic import Field | |
| from typing import TypedDict, List, Dict, Union | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from services.models.data_model import AIProfile, RawProfile, Profiles | |
| # from services.base.BaseGenerator import BaseAIGenerator, MetadataObservability | |
| from services.base.BaseGenerator_v2 import BaseAIGenerator, MetadataObservability | |
| from services.llms.LLM import model_5mini, model_4o_2 | |
| from utils.decorator import trace_runtime | |
| from utils.logger import get_logger | |
| from fastapi import HTTPException, status | |
| logger = get_logger("profile extraction") | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from externals.databases.pg_crud import ( | |
| get_file_by_filename, | |
| get_profile_by_filename, | |
| mark_file_extracted, | |
| create_profile, | |
| ) | |
| from externals.databases.pg_models import CVUser, CVProfile | |
| from utils.logger import get_logger | |
| from externals.storages.azure_blob import download_blob_by_filename | |
| from services.extractor.pdf import extract_text_from_pdf_bytes | |
| logger = get_logger("knowledge.extract") | |
| class KnowledgeExtractService: | |
| def __init__(self, db: AsyncSession, user: CVUser): | |
| self.db = db | |
| self.user = user | |
| async def extract(self, filename: str) -> CVProfile: | |
| # 1️⃣ Ambil metadata file | |
| file = await get_file_by_filename(self.db, | |
| filename=filename, | |
| user_id=self.user.user_id) | |
| if not file: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"File '{filename}' not found", | |
| ) | |
| if file.is_extracted: | |
| logger.info(f"ℹ️ File already extracted: {filename}") | |
| existing = await get_profile_by_filename(self.db, filename=filename, current_user=self.user) | |
| if existing: | |
| return existing | |
| # 2️⃣ Download PDF | |
| pdf_bytes: bytes = await download_blob_by_filename( | |
| filename=file.filename, | |
| tenant_id=self.user.tenant_id, | |
| user_id=self.user.user_id, | |
| ) | |
| # 3️⃣ Extract text | |
| text = await extract_text_from_pdf_bytes(pdf_bytes) | |
| if not text.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="PDF contains no extractable text", | |
| ) | |
| # 4️⃣ Build RawProfile | |
| raw_profile = RawProfile( | |
| filename=file.filename, | |
| content_type=file.file_type, | |
| content=text, | |
| profile_id=str(uuid4()) | |
| ) | |
| # 5️⃣ Extract with AI | |
| ai_profile: AIProfile = await self._extract_with_ai(raw_profile) | |
| if not ai_profile: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="AI extraction failed", | |
| ) | |
| # 6️⃣ INSERT profile ke cv_profile | |
| profile = await create_profile( | |
| db=self.db, | |
| # user_id=self.user.user_id, | |
| # tenant_id=self.user.tenant_id, | |
| filename=file.filename, | |
| file_id=file.file_id, | |
| profile=ai_profile, | |
| ) | |
| # 7️⃣ UPDATE cv_file.is_extracted | |
| await mark_file_extracted( | |
| db=self.db, | |
| file_id=file.file_id, | |
| ) | |
| await self.db.commit() | |
| logger.info( | |
| "✅ Profile extracted & persisted", | |
| extra={"context": { | |
| "filename": filename, | |
| "profile_id": profile.profile_id, | |
| }}, | |
| ) | |
| return profile | |
| async def _extract_with_ai(self, raw_profile: RawProfile) -> AIProfile: | |
| """ | |
| Extract structured profile from CV content | |
| """ | |
| try: | |
| extract_one_profile_prompt = """ | |
| You are an intelligent information extraction assistant. | |
| Your task is to read the following Curriculum Vitae (CV) text and extract structured information according to the expected output below. | |
| ---------------------------- | |
| candidate's curriculum vitae: | |
| {cv} | |
| ---------------------------- | |
| **Expected Output**: | |
| Follow the data schema from AIProfile | |
| **Instructions**: | |
| 1. Read the provided CV and extract information needed based on expected output. | |
| 2. Be Careful when to extract information for gpa_edu_1, univ_edu_1, major_edu_1 or gpa_edu_2, univ_edu_2, major_edu_2 or gpa_edu_3, univ_edu_3, major_edu_3 | |
| ..._edu_1 is only for bachelor/undergraduate/sarjana degree | |
| ..._edu_2 is only for master/postgraduate degree | |
| ..._edu_3 is only for doctor/phd degree | |
| 3. Reformat the extracted info using correct word spacing. | |
| 4. Do not verbose, just return the final answer. | |
| """.strip() | |
| prompt = ChatPromptTemplate.from_template(extract_one_profile_prompt) | |
| input_llm = { | |
| "cv": raw_profile.get("content") | |
| } | |
| # llm = model_4o_2.with_structured_output(AIProfile) | |
| llm = model_4o_2.with_structured_output(AIProfile) | |
| gen_ai = BaseAIGenerator( | |
| task_name="extract profile", | |
| prompt=prompt, | |
| input_llm=input_llm, | |
| llm=llm, | |
| metadata_observability=MetadataObservability( | |
| fullname=self.user.full_name, | |
| task_id=str(uuid4()), | |
| agent=self.extract.__name__, | |
| user_id=self.user.email, | |
| ) | |
| ) | |
| result = await gen_ai.agenerate() | |
| # result = asyncio.run(gen_ai.agenerate()) | |
| if result: | |
| logger.info(f"✅ extract one profile success") | |
| return result | |
| else: | |
| logger.error(f"""❌ extract one profile failed, something went wrong for profile_id {raw_profile.get("profile_id")}""") | |
| return {} | |
| except Exception as E: | |
| logger.error(f"""❌ extract one profile error for profile_id {raw_profile.get("profile_id")}, {E}""") | |
| return {} | |
| # @trace_runtime | |
| # async def extract_bulk_profile(self, raw_profiles: List[RawProfile]) -> Profiles: | |
| # try: | |
| # profiles = [] | |
| # failed_id = [] | |
| # for _, cv_text in enumerate(raw_profiles): | |
| # profile = await self.extract(cv_text.get("content")) | |
| # time.sleep(2) | |
| # if profile: | |
| # logger.info(f"✅ extract bulk profile success [{_+1}/{len(raw_profiles)}]") | |
| # profiles.append(profile) | |
| # else: | |
| # logger.info(f"""❌ extract bulk profile error for profile {cv_text.get("profile_id")}, [{_+1}/{len(raw_profiles)}]""") | |
| # profiles[cv_text.get("profile_id")] = profile | |
| # failed_id.append(cv_text.get("profile_id")) | |
| # bulk_profile = Profiles( | |
| # profiles=profiles | |
| # ) | |
| # return bulk_profile | |
| # except Exception as E: | |
| # logger.error(f"❌ extract bulk profile error for profile_id {failed_id}, {E}") | |
| # return Profiles( | |
| # profiles=[] | |
| # ) | |