# cosmosConnector.py from jsonschema import ValidationError from langchain_openai import AzureOpenAIEmbeddings from models.converterModels import PowerConverter import os from azure.cosmos import CosmosClient, exceptions from typing import List, Optional, Dict import logging import os from dotenv import load_dotenv from semantic_kernel.functions import kernel_function from rapidfuzz import process, fuzz from CosmosDBHandlers.cosmosChatHistoryHandler import ChatMemoryHandler load_dotenv() # Initialize logging logger = logging.getLogger(__name__) class CosmosLampHandler: def __init__(self, logger: Optional[logging.Logger] = None): self.client = CosmosClient( os.getenv("AZURE_COSMOS_DB_ENDPOINT"), os.getenv("AZURE_COSMOS_DB_KEY") ) self.chat_memory_handler = ChatMemoryHandler() self.database = self.client.get_database_client("TAL_DB") self.container = self.database.get_container_client("Converters") self.logger = logging.Logger("test") # self.logger = logger self.embedding_model = AzureOpenAIEmbeddings( azure_endpoint=os.environ["OPENAI_API_ENDPOINT"], azure_deployment=os.environ["OPENAI_EMBEDDINGS_MODEL_DEPLOYMENT"], api_key=os.environ["AZURE_OPENAI_KEY"] ) def _fuzzy_match_lamp(self, query: str, targets: list[str], threshold=60) -> list: """Advanced partial matching""" from rapidfuzz import process, fuzz normalized_query = self._normalize_lamp_name(query) normalized_targets = [self._normalize_lamp_name(t) for t in targets] return process.extract( normalized_query, normalized_targets, scorer=fuzz.token_set_ratio, score_cutoff=threshold ) def _normalize_lamp_name(self,name: str) -> str: """Standardize lamp names for matching""" return ( name.lower() .replace(",", ".") .replace("-", " ") .replace("/", " ") .translate(str.maketrans("", "", "()")) .strip() ) async def _generate_embedding(self, query: str) -> List[float]: """Generate embedding for the given query using Azure OpenAI""" try: return self.embedding_model.embed_query(query) except Exception as e: self.logger.error(f"Embedding generation failed: {str(e)}") raise async def get_converter_info(self, artnr:int) -> PowerConverter: """Get information about a converter from its artnr""" try: parameters = [{"name": "@artnr", "value": artnr}] query = "SELECT * FROM c WHERE c.artnr = @artnr" # Collect results properly result = self.container.query_items( query=query, parameters=parameters ) if not result: return None else: for r in result: converter = PowerConverter(**r) return converter except Exception as e: self.logger.error(f"Failed to retrieve converter {artnr} - {e}") async def get_compatible_lamps(self, artnr: int) -> List[str]: """Get compatible lamps for a converter with fuzzy matching""" try: parameters = [{"name": "@artnr", "value": artnr}] query = "SELECT * FROM c WHERE c.artnr = @artnr" # Collect results properly results = [item for item in list(self.container.query_items( query=query, parameters=parameters ))] if not results: return [] return list(results[0]["lamps"].keys()) except Exception as e: self.logger.error(f"Failed to get compatible lamps: {str(e)}") return [] async def get_converters_by_lamp_type(self, lamp_type: str, threshold: int = 75) -> List[PowerConverter]: """Get converters with fuzzy-matched lamp types""" try: # Case-insensitive search with fuzzy matching query = """ SELECT * FROM c WHERE IS_DEFINED(c.lamps)""" converters = [] results = list(self.container.query_items( query=query, enable_cross_partition_query=True)) for item in results: lamp_keys = item.get("lamps", {}).keys() lamp_type = self._normalize_lamp_name(lamp_type) matches = self._fuzzy_match_lamp(lamp_type, lamp_keys) if matches: converters.append(PowerConverter(**item)) if not converters: return [] return converters except Exception as e: self.logger.error(f"Lamp type search failed: {str(e)}") return [] async def get_lamp_limits(self, artnr: int, lamp_type: str) -> Dict[str, int]: """Get lamp limits with typo tolerance""" try: parameters = [{"name": "@artnr", "value": artnr}] query = """ SELECT c.lamps FROM c WHERE c.artnr = @artnr """ results_iter = list(self.container.query_items( query=query, parameters=parameters )) results = [item for item in results_iter] # Collect results asynchronously if not results: return {} lamps = results[0]["lamps"] lamp_keys = list(lamps.keys()) # Fuzzy match with normalization matches = self._fuzzy_match_lamp(self._normalize_lamp_name(lamp_type), lamp_keys, threshold=60) if not matches: raise ValueError(f"No matching lamp type found for '{lamp_type}'") # Get best match from original keys using match index best_match = lamp_keys[matches[0][2]] return { "min": int(lamps[best_match]["min"]), "max": int(lamps[best_match]["max"]) } except Exception as e: self.logger.error(f"Failed to get lamp limits: {str(e)}") raise async def get_converters_by_dimming( self, dimming_type: str, voltage_current: Optional[str] = None, lamp_type: Optional[str] = None, threshold: int = 75 ) -> List[PowerConverter]: """Search converters by dimming type and voltage/current/lamp_type specifications with fuzzy matching""" try: # Base query construction query = "SELECT * FROM c WHERE IS_DEFINED(c.dimmability)" results = list(self.container.query_items( query=query, enable_cross_partition_query=True )) converters = [] for item in results: # Fuzzy match converter type if specified if voltage_current: item_type = item.get("type", "") item_types = item_type.split(" ") for conv_type in item_types: # handle types like 24V DC if fuzz.ratio(conv_type.lower(), voltage_current.lower()) < threshold: continue if lamp_type: item_lamps = item.get("lamps", "") lamp_type = self._normalize_lamp_name(lamp_type) lamp_matches = self._fuzzy_match_lamp(lamp_type, item_lamps.keys()) if not lamp_matches: continue # Fuzzy match dimming types if dimming_type!= None: dimmability = item.get("dimmability", "") match_types = dimmability.split('/') match_types += dimmability.split(" ") for option in match_types: if fuzz.ratio(option.lower().strip(), dimming_type.lower()) >= threshold: converters.append(PowerConverter(**item)) break else: converters.append(PowerConverter(**item)) break self.logger.info(f"Found {len(converters)} converters matching criteria") return converters except Exception as e: self.logger.error(f"Dimming query failed: {str(e)}") return [] async def query_converters(self, query: str, user_input:str) -> List[PowerConverter]: try: print(f"Executing query: {query}") items = list(self.container.query_items( query=query, enable_cross_partition_query=True )) print(f"Query returned {len(items)} items") items = items[:10] items = [PowerConverter(**item) for item in items] if items else [] self.logger.info(f"Query returned {len(items)} items after conversion") if len(items)==0: await self.chat_memory_handler.log_sql_query(user_input, query, "null") else: await self.chat_memory_handler.log_sql_query(user_input, query, "success") return str(items) except exceptions.CosmosHttpResponseError as ex: await self.chat_memory_handler.log_sql_query(user_input, query, "error") print(f"Cosmos DB error: {ex}") self.logger.error(f"Bad request SQL failed: {str(e)}") return [] except Exception as e: self.logger.info(f"Query failed: {str(e)}") return f"Query failed: {str(e)}" async def get_converters_by_voltage_current( self, artnr: Optional[int] = None, current: Optional[str]=None, input_voltage: Optional[str] = None, output_voltage: Optional[str] = None, lamp_type: Optional[str] = None ) -> List[PowerConverter]: """Query converters by voltage ranges""" try: # Handle ARTNR lookup if artnr: converter = await self.get_converter_info(artnr) self.logger.info(f"Used converter info returned {converter}") return [converter] if converter else [] # Parse voltage ranges input_min, input_max = self._parse_voltage(input_voltage) if input_voltage else (None, None) output_min, output_max = self._parse_voltage(output_voltage) if output_voltage else (None, None) normalized_lamp_type = self._normalize_lamp_name(lamp_type) if lamp_type else None query_parts = [] if output_min and output_max: if output_min==output_max and output_max in [(24.0,24.0), (48.0,48.0)]: query_parts.append(f"c.output_voltage_v.min = {output_max} AND c.output_voltage_v.max = {output_min}") self.logger.info(f"c.nom_input_voltage_v.min = {output_max} AND c.nom_input_voltage_v.max = {output_min}") else: query_parts.append(f"c.output_voltage_v.min <= {output_max} AND c.output_voltage_v.max >= {output_min}") self.logger.info(f"c.nom_input_voltage_v.min <= {output_max} AND c.nom_input_voltage_v.max >= {output_min}") if input_min and input_max: query_parts.append(f"c.nom_input_voltage_v.min <= {input_max} AND c.nom_input_voltage_v.max >= {input_min}") self.logger.info(f"c.nom_input_voltage_v.min <= {input_max} AND c.nom_input_voltage_v.max >= {input_min}") if current: query_parts.append(f"c.type LIKE '%{current}%'") query = "SELECT * FROM c" + (" WHERE " + " AND ".join(query_parts) if query_parts else "") results = list(self.container.query_items( query=query, enable_cross_partition_query=True )) converters = [] for item in results: if normalized_lamp_type: item_lamps = item.get("lamps", {}) if not item_lamps: # Skip if no lamps data continue lamp_matches = self._fuzzy_match_lamp(normalized_lamp_type, item_lamps.keys()) if not lamp_matches: continue converters.append(PowerConverter(**item)) self.logger.info(f"Found {len(converters)} matching converters") return converters except Exception as e: self.logger.error(f"Voltage query failed: {str(e)}") return [] def _parse_voltage(self, voltage_str: str) -> tuple[float, float]: import re voltage_str = voltage_str.strip().replace(',', '.') voltage_str = re.sub(r'[^0-9.\-]', '', voltage_str) match = re.match(r"^(\d+(?:\.\d+)?)(?:-+(\d+(?:\.\d+)?))?$", voltage_str) if match: min_v = float(match.group(1)) max_v = float(match.group(2)) if match.group(2) else min_v return min_v, max_v else: return None, None if __name__ == "__main__": handler = CosmosLampHandler() # Example usage import asyncio async def main(): # lamps = await handler.get_compatible_lamps(930573) # print("Compatible lamps:", lamps) # converters = await handler.get_converters_by_dimming(voltage_current="350ma",lamp_type="haloled") # for result in converters: # print(f"\t{result.name} (ARTNR: {result.artnr})") # print(f"\tLamp types: {', '.join(result.lamps.keys())}\n") conv = await handler.get_converters_by_lamp_type("boa") print([c.artnr for c in conv]) limits = await handler.get_lamp_limits(930544, "boa") print("Lamp limits:", limits) # hybrid_results = await handler.hybrid_search("give me converters for boa wc which cost less than 50 ") # print("Hybrid search results:") # for result in hybrid_results: # print(f"\t{result.converter_description} (ARTNR: {result.artnr})") # print(f"\ttypes: {result.type}") # print(f"\tprice: {result.price}") # print(f'\tpdf_link: {result.pdf_link}\n') asyncio.run(main())