import openai import os from typing import List, Dict, Any, Optional import json from datetime import datetime class RAGHandler: def __init__(self, api_key: Optional[str] = None): """Initialize OpenAI client for RAG responses""" self.client = openai.OpenAI( api_key=api_key or os.getenv('OPENAI_API_KEY') ) def generate_rag_response(self, user_query: str, retrieved_documents: List[Dict[str, Any]]) -> str: """ Generate a response using RAG (Retrieval-Augmented Generation) Args: user_query: The user's original query retrieved_documents: List of documents from vector store with similarity scores Returns: Generated response based on retrieved context """ if not retrieved_documents: return "I couldn't find any relevant information to answer your query." # Format retrieved documents for context context = self._format_context(retrieved_documents) system_prompt = """You are a helpful business assistant with access to a company's transaction history and business information. Your role is to answer user questions based on the provided context from the company's records. Guidelines: 1. Answer based ONLY on the provided context 2. If the context doesn't contain enough information, say so clearly 3. Be specific and cite relevant details from the context 4. Maintain a professional, helpful tone 5. If asked about specific dates, transactions, or events, reference the exact information from context 6. If the context contains multiple relevant items, summarize them appropriately 7. Don't make up information not present in the context Context format: Each document has a 'document' field with the actual content and 'metadata' with additional details like timestamps.""" user_prompt = f"""Based on the following business records, please answer this question: "{user_query}" Context from company records: {context} Please provide a comprehensive answer based on the available information.""" try: response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.3, max_tokens=800 ) return response.choices[0].message.content.strip() except Exception as e: return f"I encountered an error while processing your query: {str(e)}\n\nHowever, I found these relevant records:\n{self._format_fallback_response(retrieved_documents)}" def _format_context(self, documents: List[Dict[str, Any]]) -> str: """Format retrieved documents as context for the LLM""" if not documents: return "No relevant documents found." context_parts = [] for i, doc in enumerate(documents, 1): doc_content = doc.get('document', 'No content available') metadata = doc.get('metadata', {}) distance = doc.get('distance', 'Unknown') # Format document entry context_entry = f"Document {i}:\n" context_entry += f"Content: {doc_content}\n" # Add metadata if available if metadata: if 'timestamp' in metadata: try: # Format timestamp nicely timestamp = metadata['timestamp'] if isinstance(timestamp, str): date_part = timestamp[:10] if len(timestamp) >= 10 else timestamp context_entry += f"Date: {date_part}\n" except: pass if 'type' in metadata: context_entry += f"Type: {metadata['type']}\n" # Add transaction data if available if 'data' in metadata: try: data = json.loads(metadata['data']) if isinstance(metadata['data'], str) else metadata['data'] if isinstance(data, dict): relevant_fields = ['product', 'quantity', 'supplier', 'customer', 'total', 'unit_price'] data_parts = [] for field in relevant_fields: if field in data and data[field] is not None: data_parts.append(f"{field}: {data[field]}") if data_parts: context_entry += f"Details: {', '.join(data_parts)}\n" except: pass # Add similarity score if distance is not None and distance != 'Unknown': try: similarity = 1 - float(distance) # Convert distance to similarity context_entry += f"Relevance: {similarity:.2f}\n" except: pass context_parts.append(context_entry) return "\n" + "-" * 50 + "\n".join(context_parts) def _format_fallback_response(self, documents: List[Dict[str, Any]]) -> str: """Create a fallback response when LLM fails""" if not documents: return "No relevant information found." response_parts = [] for i, doc in enumerate(documents, 1): doc_content = doc.get('document', 'No content available') metadata = doc.get('metadata', {}) entry = f"{i}. {doc_content}" if metadata.get('timestamp'): try: date_part = metadata['timestamp'][:10] entry += f" (Date: {date_part})" except: pass response_parts.append(entry) return "\n".join(response_parts) def enhance_search_query(self, user_query: str) -> str: """ Enhance the user's search query for better vector retrieval Args: user_query: Original user query Returns: Enhanced query for better semantic search """ system_prompt = """You are an expert at reformulating search queries for business records retrieval. Given a user's question, create an enhanced search query that will better match relevant business documents in a vector database. Guidelines: 1. Extract key business concepts (products, suppliers, customers, dates, amounts) 2. Add relevant synonyms and related terms 3. Focus on business transaction terminology 4. Keep it concise but comprehensive 5. Don't change the core intent of the original query Examples: - "When is my meeting with George?" → "meeting George supplier customer appointment scheduled" - "Show me laptop purchases" → "laptop computer purchase buy bought supplier transaction" - "Similar sales to John" → "John customer sale sold transaction similar" Return only the enhanced query, nothing else.""" user_prompt = f'Enhance this search query for better business records retrieval: "{user_query}"' try: response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.2, max_tokens=100 ) enhanced_query = response.choices[0].message.content.strip() # Fallback to original if enhancement fails if not enhanced_query or len(enhanced_query) < 3: return user_query return enhanced_query except Exception as e: print(f"Query enhancement failed: {e}") return user_query