Spaces:
Sleeping
Sleeping
File size: 7,771 Bytes
3d4cf60 1e6c696 3d4cf60 1e6c696 3d4cf60 1e6c696 3d4cf60 1e6c696 3d4cf60 1e6c696 3d4cf60 92406dd 3d4cf60 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# cosmosConnector.py
from azure.cosmos import exceptions
from datetime import datetime, timedelta, timezone
import uuid
from langchain_openai import AzureOpenAIEmbeddings
import os
from azure.cosmos import CosmosClient, PartitionKey
from typing import List, Optional, Dict
import logging
import os
from dotenv import load_dotenv
load_dotenv()
# Initialize Cosmos DB containers
class ChatMemoryHandler():
def __init__(self, logger: Optional[logging.Logger] = None):
self.cosmos_client = CosmosClient(
os.getenv("AZURE_COSMOS_DB_ENDPOINT"),
os.getenv("AZURE_COSMOS_DB_KEY")
)
self.logger = logger
self.indexing_policy = {
"indexingMode": "consistent",
"includedPaths": [{"path": "/*"}], # Indexes all properties, including nested
"excludedPaths": [
{
"path": '/"_etag"/?'
},
{
"path": "/embedding/*"
}
],
}
self.vector_embedding_policy = {
"vectorEmbeddings": [
{
"path": "/embedding",
"dataType": "float32",
"distanceFunction": "cosine",
"dimensions": 1536,
}
]
}
self.embedding_model = AzureOpenAIEmbeddings(
azure_endpoint=os.environ["OPENAI_API_ENDPOINT"],
azure_deployment=os.environ["OPENAI_EMBEDDINGS_MODEL_DEPLOYMENT"],
api_key=os.environ["AZURE_OPENAI_KEY"]
)
self.database = self.cosmos_client.create_database_if_not_exists("TAL_ChatData")
# Container for chat history
self.chat_container = self.database.create_container_if_not_exists(
id="ChatHistory",
partition_key=PartitionKey(path="/functionUsed"),
indexing_policy=self.indexing_policy,
vector_embedding_policy=self.vector_embedding_policy
)
# Container for SQL queries
self.sql_container = self.database.create_container_if_not_exists(
id="GeneratedQueries",
partition_key=PartitionKey(path="/state")
)
async def _generate_embedding(self, query: str) -> List[float]:
"""Generate embedding for the given query using Azure OpenAI"""
try:
return self.embedding_model.embed_query(query)
except Exception as e:
self.logger.error(f"Embedding generation failed: {str(e)}")
raise
async def log_interaction(self, session_id: str, question: str, function_used: str, answer: str):
try:
chat_item = {
"id": str(uuid.uuid4()),
"sessionId": session_id,
"question": question,
"functionUsed": function_used,
"answer": answer,
"timestamp": datetime.now(timezone.utc).isoformat(),
"embedding": await self._generate_embedding(question)
}
self.chat_container.create_item(body=chat_item)
except Exception as e:
self.logger.error(f"Failed to log chat interaction: {str(e)}")
async def log_sql_query(self, original_question: str, generated_sql: str, state: str="success"):
try:
sql_item = {
"id": str(uuid.uuid4()),
"originalQuestion": original_question,
"generatedSql": generated_sql,
"state": state,
"timestamp": datetime.now(timezone.utc).isoformat()
}
self.sql_container.create_item(body=sql_item)
except Exception as e:
self.logger.error(f"Failed to log SQL query: {str(e)}")
async def get_semantic_faqs(self, limit: int = 6, threshold: float = 0.1) -> List[Dict]:
"""Retrieve FAQs using vector embeddings for semantic similarity"""
try:
query = """
SELECT c.question FROM c
"""
raw_results = list(self.chat_container.query_items(
query=query,
enable_cross_partition_query=True,
max_item_count=-1
))
from collections import Counter
question_counts = Counter(item['question'] for item in raw_results)
top_questions = question_counts.most_common(limit)
# Generate embeddings for top questions
faq_embeddings = {}
for question_text, count in top_questions:
embedding = await self._generate_embedding(question_text)
faq_embeddings[question_text] = {
'embedding': embedding,
'count': count
}
# Cluster similar questions
clustered_faqs = []
processed = set()
for text, data in faq_embeddings.items():
if text in processed:
continue
query = """
SELECT TOP 50 c.question, VectorDistance(c.embedding, @embedding) as distance
FROM c
ORDER BY VectorDistance(c.embedding, @embedding)
"""
parameters = [{"name": "@embedding", "value": data['embedding']}]
similar_results = list(self.chat_container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
similarity_threshold = threshold
filtered_results = []
for item in similar_results:
similarity = 1 - item['distance'] # Convert distance to similarity
if similarity <= similarity_threshold:
filtered_results.append(item['question'])
# Count occurrences of similar questions
similar_question_counts = Counter(filtered_results)
cluster_count = sum(similar_question_counts.values())
clustered_faqs.append({
"representative_question": text,
"similar_questions": list(similar_question_counts.keys()),
"total_occurrences": cluster_count,
"similarity_scores": {q: 1 - item['distance'] for item in similar_results for q in [item['question']] if 1 - item['distance'] >= similarity_threshold}
})
# Mark all similar questions as processed
processed.update(filtered_results)
clustered_faqs.append({
"representative_question": text,
"similar_questions": [text],
"total_occurrences": data['count'],
"similarity_scores": {text: 1.0}
})
processed.add(text)
return sorted(clustered_faqs[:limit], key=lambda x: x['total_occurrences'], reverse=True)
except exceptions.CosmosHttpResponseError as ex:
print(f"Cosmos DB error: {ex}")
self.logger.error(f"Semantic FAQ retrieval failed: {str(e)}")
return []
except Exception as e:
if self.logger:
self.logger.error(f"Semantic FAQ retrieval failed: {str(e)}")
return []
import asyncio
handler = ChatMemoryHandler()
async def main():
faqs = await handler.get_semantic_faqs()
for faq in faqs:
print("\n",faq["representative_question"],faq["similar_questions"],"\n")
if __name__ == "__main__":
asyncio.run(main())
|