Spaces:
Sleeping
Sleeping
| from externals.databases.pg_crud import get_profile_by_filename, get_profile_by_id, get_profiles_by_criteria_id | |
| from externals.databases.pg_models import CVUser, CVProfile | |
| from services.models.data_model import Profile | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from typing import List, Dict, Generator, Union, Any | |
| from config.constant import AzureBlobConstants | |
| from utils.logger import get_logger | |
| logger = get_logger("get profile") | |
| class KnowledgeGetProfileService: | |
| def __init__(self, db: AsyncSession, user: CVUser): | |
| self.db = db | |
| self.user = user | |
| async def get_by_filename(self, filename: str) -> CVProfile | None: | |
| """ | |
| Retrieve extracted profile securely | |
| """ | |
| logger.info(f"π Fetching profile: {filename}") | |
| profile = await get_profile_by_filename(self.db, filename, current_user=self.user) | |
| if not profile: | |
| return None | |
| return profile | |
| def add_sas_blob(self, url: str) -> str: | |
| sas_token = AzureBlobConstants.BLOB_SAS_KEY | |
| if '?' in url: | |
| return f"{url}&{sas_token}" | |
| else: | |
| return f"{url}?{sas_token}" | |
| async def get_profile(self, profile_id: str) -> CVProfile: | |
| try: | |
| profile = await get_profile_by_id(db=self.db, profile_id=profile_id) | |
| profile["url"] = self.add_sas_blob(profile["url"]) | |
| return profile | |
| except Exception as E: | |
| logger.error(f"β get profile error for profile_id {profile_id}, {E}") | |
| return {} | |
| async def get_profile_generator(self, profile_id: str): | |
| try: | |
| profile = await get_profile_by_id(db=self.db, profile_id=profile_id) | |
| yield profile | |
| except Exception as E: | |
| logger.error(f"β get profile generator error for profile_id {profile_id}, {E}") | |
| return | |
| async def get_profiles_generator(self, profile_ids: List[str]) -> Dict[str, Union[Generator, Any]]: | |
| try: | |
| profiles = {} | |
| failed_ids = [] | |
| for profile_id in profile_ids: | |
| temp_profile_generators = self.get_profile_generator(profile_id=profile_id) | |
| if temp_profile_generators: # if the variable is a generator | |
| profiles[profile_id] = temp_profile_generators | |
| else: | |
| profiles[profile_id] = temp_profile_generators | |
| failed_ids.append(profile_id) | |
| if failed_ids: | |
| logger.warning(f"β οΈ get profile success [{len(profile_ids)-len(failed_ids)}/{len(failed_ids)}] success, error for profile_id {failed_ids}") | |
| return profiles | |
| else: | |
| logger.info(f"β get profiles success") | |
| return profiles | |
| except Exception as E: | |
| logger.error(f"β get profile error, {E}") | |
| return {} | |
| async def get_profiles_by_criteria(self, criteria_id: str) -> List[CVProfile]: | |
| try: | |
| profiles: List[CVProfile] = await get_profiles_by_criteria_id(self.db, criteria_id=criteria_id, current_user=self.user) | |
| logger.info(f"get_profiles_by_criteria/profiles: {profiles}") | |
| return profiles | |
| except Exception as E: | |
| logger.error(f"β get profiles by criteria id '{criteria_id}' error, {E}") | |
| return [] |