from externals.databases.pg_crud import get_profile_by_filename, get_profile_by_id, get_profiles_by_criteria_id from externals.databases.pg_models import CVUser, CVProfile from services.models.data_model import Profile from sqlalchemy.ext.asyncio import AsyncSession from typing import List, Dict, Generator, Union, Any from config.constant import AzureBlobConstants from utils.logger import get_logger logger = get_logger("get profile") class KnowledgeGetProfileService: def __init__(self, db: AsyncSession, user: CVUser): self.db = db self.user = user async def get_by_filename(self, filename: str) -> CVProfile | None: """ Retrieve extracted profile securely """ logger.info(f"📄 Fetching profile: {filename}") profile = await get_profile_by_filename(self.db, filename, current_user=self.user) if not profile: return None return profile def add_sas_blob(self, url: str) -> str: sas_token = AzureBlobConstants.BLOB_SAS_KEY if '?' in url: return f"{url}&{sas_token}" else: return f"{url}?{sas_token}" async def get_profile(self, profile_id: str) -> CVProfile: try: profile = await get_profile_by_id(db=self.db, profile_id=profile_id) profile["url"] = self.add_sas_blob(profile["url"]) return profile except Exception as E: logger.error(f"❌ get profile error for profile_id {profile_id}, {E}") return {} async def get_profile_generator(self, profile_id: str): try: profile = await get_profile_by_id(db=self.db, profile_id=profile_id) yield profile except Exception as E: logger.error(f"❌ get profile generator error for profile_id {profile_id}, {E}") return async def get_profiles_generator(self, profile_ids: List[str]) -> Dict[str, Union[Generator, Any]]: try: profiles = {} failed_ids = [] for profile_id in profile_ids: temp_profile_generators = self.get_profile_generator(profile_id=profile_id) if temp_profile_generators: # if the variable is a generator profiles[profile_id] = temp_profile_generators else: profiles[profile_id] = temp_profile_generators failed_ids.append(profile_id) if failed_ids: logger.warning(f"⚠️ get profile success [{len(profile_ids)-len(failed_ids)}/{len(failed_ids)}] success, error for profile_id {failed_ids}") return profiles else: logger.info(f"✅ get profiles success") return profiles except Exception as E: logger.error(f"❌ get profile error, {E}") return {} async def get_profiles_by_criteria(self, criteria_id: str) -> List[CVProfile]: try: profiles: List[CVProfile] = await get_profiles_by_criteria_id(self.db, criteria_id=criteria_id, current_user=self.user) logger.info(f"get_profiles_by_criteria/profiles: {profiles}") return profiles except Exception as E: logger.error(f"❌ get profiles by criteria id '{criteria_id}' error, {E}") return []