Spaces:
Runtime error
Runtime error
File size: 14,931 Bytes
3d4cf60 c88d2f0 3d4cf60 c88d2f0 3d4cf60 c88d2f0 3d4cf60 36808cd 3d4cf60 f295ebe 3d4cf60 c88d2f0 3d4cf60 c88d2f0 3d4cf60 c88d2f0 3d4cf60 c88d2f0 3d4cf60 c88d2f0 3d4cf60 c88d2f0 3d4cf60 ef26fa5 3d4cf60 c88d2f0 3d4cf60 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 | # cosmosConnector.py
from jsonschema import ValidationError
from langchain_openai import AzureOpenAIEmbeddings
from models.converterModels import PowerConverter
import os
from azure.cosmos import CosmosClient, exceptions
from typing import List, Optional, Dict
import logging
import os
from dotenv import load_dotenv
from semantic_kernel.functions import kernel_function
from rapidfuzz import process, fuzz
from CosmosDBHandlers.cosmosChatHistoryHandler import ChatMemoryHandler
load_dotenv()
# Initialize logging
logger = logging.getLogger(__name__)
class CosmosLampHandler:
def __init__(self, logger: Optional[logging.Logger] = None):
self.client = CosmosClient(
os.getenv("AZURE_COSMOS_DB_ENDPOINT"),
os.getenv("AZURE_COSMOS_DB_KEY")
)
self.chat_memory_handler = ChatMemoryHandler()
self.database = self.client.get_database_client("TAL_DB")
self.container = self.database.get_container_client("Converters")
self.logger = logging.Logger("test")
# self.logger = logger
self.embedding_model = AzureOpenAIEmbeddings(
azure_endpoint=os.environ["OPENAI_API_ENDPOINT"],
azure_deployment=os.environ["OPENAI_EMBEDDINGS_MODEL_DEPLOYMENT"],
api_key=os.environ["AZURE_OPENAI_KEY"]
)
def _fuzzy_match_lamp(self, query: str, targets: list[str], threshold=60) -> list:
"""Advanced partial matching"""
from rapidfuzz import process, fuzz
normalized_query = self._normalize_lamp_name(query)
normalized_targets = [self._normalize_lamp_name(t) for t in targets]
return process.extract(
normalized_query,
normalized_targets,
scorer=fuzz.token_set_ratio,
score_cutoff=threshold
)
def _normalize_lamp_name(self,name: str) -> str:
"""Standardize lamp names for matching"""
return (
name.lower()
.replace(",", ".")
.replace("-", " ")
.replace("/", " ")
.translate(str.maketrans("", "", "()"))
.strip()
)
async def _generate_embedding(self, query: str) -> List[float]:
"""Generate embedding for the given query using Azure OpenAI"""
try:
return self.embedding_model.embed_query(query)
except Exception as e:
self.logger.error(f"Embedding generation failed: {str(e)}")
raise
async def get_converter_info(self, artnr:int) -> PowerConverter:
"""Get information about a converter from its artnr"""
try:
parameters = [{"name": "@artnr", "value": artnr}]
query = "SELECT * FROM c WHERE c.artnr = @artnr"
# Collect results properly
result = self.container.query_items(
query=query,
parameters=parameters
)
if not result:
return None
else:
for r in result:
converter = PowerConverter(**r)
return converter
except Exception as e:
self.logger.error(f"Failed to retrieve converter {artnr} - {e}")
async def get_compatible_lamps(self, artnr: int) -> List[str]:
"""Get compatible lamps for a converter with fuzzy matching"""
try:
parameters = [{"name": "@artnr", "value": artnr}]
query = "SELECT * FROM c WHERE c.artnr = @artnr"
# Collect results properly
results = [item for item in list(self.container.query_items(
query=query,
parameters=parameters
))]
if not results:
return []
return list(results[0]["lamps"].keys())
except Exception as e:
self.logger.error(f"Failed to get compatible lamps: {str(e)}")
return []
async def get_converters_by_lamp_type(self, lamp_type: str, threshold: int = 75) -> List[PowerConverter]:
"""Get converters with fuzzy-matched lamp types"""
try:
# Case-insensitive search with fuzzy matching
query = """
SELECT
*
FROM c WHERE IS_DEFINED(c.lamps)"""
converters = []
results = list(self.container.query_items(
query=query,
enable_cross_partition_query=True))
for item in results:
lamp_keys = item.get("lamps", {}).keys()
lamp_type = self._normalize_lamp_name(lamp_type)
matches = self._fuzzy_match_lamp(lamp_type, lamp_keys)
if matches:
converters.append(PowerConverter(**item))
if not converters:
return []
return converters
except Exception as e:
self.logger.error(f"Lamp type search failed: {str(e)}")
return []
async def get_lamp_limits(self, artnr: int, lamp_type: str) -> Dict[str, int]:
"""Get lamp limits with typo tolerance"""
try:
parameters = [{"name": "@artnr", "value": artnr}]
query = """
SELECT c.lamps FROM c
WHERE c.artnr = @artnr
"""
results_iter = list(self.container.query_items(
query=query,
parameters=parameters
))
results = [item for item in results_iter] # Collect results asynchronously
if not results:
return {}
lamps = results[0]["lamps"]
lamp_keys = list(lamps.keys())
# Fuzzy match with normalization
matches = self._fuzzy_match_lamp(self._normalize_lamp_name(lamp_type), lamp_keys, threshold=60)
if not matches:
raise ValueError(f"No matching lamp type found for '{lamp_type}'")
# Get best match from original keys using match index
best_match = lamp_keys[matches[0][2]]
return {
"min": int(lamps[best_match]["min"]),
"max": int(lamps[best_match]["max"])
}
except Exception as e:
self.logger.error(f"Failed to get lamp limits: {str(e)}")
raise
async def get_converters_by_dimming(
self,
dimming_type: str,
voltage_current: Optional[str] = None,
lamp_type: Optional[str] = None,
threshold: int = 75
) -> List[PowerConverter]:
"""Search converters by dimming type and voltage/current/lamp_type specifications with fuzzy matching"""
try:
# Base query construction
query = "SELECT * FROM c WHERE IS_DEFINED(c.dimmability)"
results = list(self.container.query_items(
query=query,
enable_cross_partition_query=True
))
converters = []
for item in results:
# Fuzzy match converter type if specified
if voltage_current:
item_type = item.get("type", "")
item_types = item_type.split(" ")
for conv_type in item_types: # handle types like 24V DC
if fuzz.ratio(conv_type.lower(), voltage_current.lower()) < threshold:
continue
if lamp_type:
item_lamps = item.get("lamps", "")
lamp_type = self._normalize_lamp_name(lamp_type)
lamp_matches = self._fuzzy_match_lamp(lamp_type, item_lamps.keys())
if not lamp_matches:
continue
# Fuzzy match dimming types
if dimming_type!= None:
dimmability = item.get("dimmability", "")
match_types = dimmability.split('/')
match_types += dimmability.split(" ")
for option in match_types:
if fuzz.ratio(option.lower().strip(), dimming_type.lower()) >= threshold:
converters.append(PowerConverter(**item))
break
else:
converters.append(PowerConverter(**item))
break
self.logger.info(f"Found {len(converters)} converters matching criteria")
return converters
except Exception as e:
self.logger.error(f"Dimming query failed: {str(e)}")
return []
async def query_converters(self, query: str, user_input:str) -> List[PowerConverter]:
try:
print(f"Executing query: {query}")
items = list(self.container.query_items(
query=query,
enable_cross_partition_query=True
))
print(f"Query returned {len(items)} items")
items = items[:10]
items = [PowerConverter(**item) for item in items] if items else []
self.logger.info(f"Query returned {len(items)} items after conversion")
if len(items)==0:
await self.chat_memory_handler.log_sql_query(user_input, query, "null")
else:
await self.chat_memory_handler.log_sql_query(user_input, query, "success")
return str(items)
except exceptions.CosmosHttpResponseError as ex:
await self.chat_memory_handler.log_sql_query(user_input, query, "error")
print(f"Cosmos DB error: {ex}")
self.logger.error(f"Bad request SQL failed: {str(e)}")
return []
except Exception as e:
self.logger.info(f"Query failed: {str(e)}")
return f"Query failed: {str(e)}"
async def get_converters_by_voltage_current(
self,
artnr: Optional[int] = None,
current: Optional[str]=None,
input_voltage: Optional[str] = None,
output_voltage: Optional[str] = None,
lamp_type: Optional[str] = None
) -> List[PowerConverter]:
"""Query converters by voltage ranges"""
try:
# Handle ARTNR lookup
if artnr:
converter = await self.get_converter_info(artnr)
self.logger.info(f"Used converter info returned {converter}")
return [converter] if converter else []
# Parse voltage ranges
input_min, input_max = self._parse_voltage(input_voltage) if input_voltage else (None, None)
output_min, output_max = self._parse_voltage(output_voltage) if output_voltage else (None, None)
normalized_lamp_type = self._normalize_lamp_name(lamp_type) if lamp_type else None
query_parts = []
if output_min and output_max:
if output_min==output_max and output_max in [(24.0,24.0), (48.0,48.0)]:
query_parts.append(f"c.output_voltage_v.min = {output_max} AND c.output_voltage_v.max = {output_min}")
self.logger.info(f"c.nom_input_voltage_v.min = {output_max} AND c.nom_input_voltage_v.max = {output_min}")
else:
query_parts.append(f"c.output_voltage_v.min <= {output_max} AND c.output_voltage_v.max >= {output_min}")
self.logger.info(f"c.nom_input_voltage_v.min <= {output_max} AND c.nom_input_voltage_v.max >= {output_min}")
if input_min and input_max:
query_parts.append(f"c.nom_input_voltage_v.min <= {input_max} AND c.nom_input_voltage_v.max >= {input_min}")
self.logger.info(f"c.nom_input_voltage_v.min <= {input_max} AND c.nom_input_voltage_v.max >= {input_min}")
if current:
query_parts.append(f"c.type LIKE '%{current}%'")
query = "SELECT * FROM c" + (" WHERE " + " AND ".join(query_parts) if query_parts else "")
results = list(self.container.query_items(
query=query,
enable_cross_partition_query=True
))
converters = []
for item in results:
if normalized_lamp_type:
item_lamps = item.get("lamps", {})
if not item_lamps: # Skip if no lamps data
continue
lamp_matches = self._fuzzy_match_lamp(normalized_lamp_type, item_lamps.keys())
if not lamp_matches:
continue
converters.append(PowerConverter(**item))
self.logger.info(f"Found {len(converters)} matching converters")
return converters
except Exception as e:
self.logger.error(f"Voltage query failed: {str(e)}")
return []
def _parse_voltage(self, voltage_str: str) -> tuple[float, float]:
import re
voltage_str = voltage_str.strip().replace(',', '.')
voltage_str = re.sub(r'[^0-9.\-]', '', voltage_str)
match = re.match(r"^(\d+(?:\.\d+)?)(?:-+(\d+(?:\.\d+)?))?$", voltage_str)
if match:
min_v = float(match.group(1))
max_v = float(match.group(2)) if match.group(2) else min_v
return min_v, max_v
else:
return None, None
if __name__ == "__main__":
handler = CosmosLampHandler()
# Example usage
import asyncio
async def main():
# lamps = await handler.get_compatible_lamps(930573)
# print("Compatible lamps:", lamps)
# converters = await handler.get_converters_by_dimming(voltage_current="350ma",lamp_type="haloled")
# for result in converters:
# print(f"\t{result.name} (ARTNR: {result.artnr})")
# print(f"\tLamp types: {', '.join(result.lamps.keys())}\n")
conv = await handler.get_converters_by_lamp_type("boa")
print([c.artnr for c in conv])
limits = await handler.get_lamp_limits(930544, "boa")
print("Lamp limits:", limits)
# hybrid_results = await handler.hybrid_search("give me converters for boa wc which cost less than 50 ")
# print("Hybrid search results:")
# for result in hybrid_results:
# print(f"\t{result.converter_description} (ARTNR: {result.artnr})")
# print(f"\ttypes: {result.type}")
# print(f"\tprice: {result.price}")
# print(f'\tpdf_link: {result.pdf_link}\n')
asyncio.run(main()) |