| import gradio as gr |
| import os |
| import json |
| import hashlib |
| import datetime |
| from typing import List, Dict, Any, Optional |
| import requests |
| import time |
| import uuid |
| from pinecone import Pinecone |
|
|
| class RAGMemorySystem: |
| """RAG system using Pinecone with integrated inference for embeddings and vector storage""" |
| |
| def __init__(self): |
| |
| self.pinecone_api_key = os.getenv("PINECONE_API_KEY", "pcsk_6Ydj5y_QqLzPNzMEh2NMJv5Crh5XVYTTTkZTHkWjQkZAiU5SDthzYZW4ZvDF2qo1g9GPUR") |
| self.pinecone_environment = os.getenv("PINECONE_ENVIRONMENT", "us-east-1") |
| |
| |
| self.index_name = os.getenv("PINECONE_INDEX_NAME", "shared-ai-experiences") |
| |
| |
| self.embedding_model = os.getenv("PINECONE_EMBEDDING_MODEL", "multilingual-e5-large") |
| self.rerank_model = os.getenv("PINECONE_RERANK_MODEL", "pinecone-rerank-v0") |
| |
| |
| self.openrouter_api_key = os.getenv("OPENROUTER_API_KEY") |
| self.model_name = os.getenv("MODEL_NAME", "meta-llama/llama-3.2-3b-instruct:free") |
| |
| |
| self.pc = None |
| self.index = None |
| |
| |
| self.init_pinecone() |
| |
| def update_model(self, new_model: str): |
| """Update the OpenRouter model""" |
| if new_model and new_model.strip(): |
| self.model_name = new_model.strip() |
| return f"✅ Model updated to: {self.model_name}" |
| return "❌ Please enter a valid model name" |
| |
| def init_pinecone(self): |
| """Initialize Pinecone connection with integrated inference""" |
| try: |
| if self.pinecone_api_key: |
| |
| self.pc = Pinecone(api_key=self.pinecone_api_key) |
| |
| print(f"Attempting to connect to Pinecone...") |
| |
| |
| try: |
| existing_indexes = [idx.name for idx in self.pc.list_indexes()] |
| print(f"Existing indexes: {existing_indexes}") |
| except Exception as list_error: |
| print(f"Error listing indexes: {list_error}") |
| existing_indexes = [] |
| |
| |
| if self.index_name not in existing_indexes: |
| print(f"Index '{self.index_name}' not found. Creating new Pinecone index with integrated inference...") |
| try: |
| |
| index_model = self.pc.create_index_for_model( |
| name=self.index_name, |
| cloud="aws", |
| region="us-east-1", |
| embed={ |
| "model": self.embedding_model, |
| "field_map": {"text": "content"} |
| } |
| ) |
| print(f"Successfully created index with integrated inference: {self.index_name}") |
| print(f"Index details: {index_model}") |
| |
| |
| print("Waiting for index to be ready...") |
| time.sleep(10) |
| |
| except Exception as create_error: |
| print(f"Error creating index with integrated inference: {create_error}") |
| |
| try: |
| print("Attempting fallback to traditional index creation...") |
| self.pc.create_index( |
| name=self.index_name, |
| dimension=1024, |
| metric="cosine", |
| spec={ |
| "serverless": { |
| "cloud": "aws", |
| "region": "us-east-1" |
| } |
| } |
| ) |
| print(f"Created fallback traditional index: {self.index_name}") |
| time.sleep(5) |
| except Exception as fallback_error: |
| print(f"Failed to create fallback index: {fallback_error}") |
| self.index = None |
| return |
| else: |
| print(f"Index '{self.index_name}' already exists. Connecting to existing index...") |
| |
| |
| try: |
| self.index = self.pc.Index(self.index_name) |
| print(f"Successfully connected to Pinecone index: {self.index_name}") |
| |
| |
| stats = self.index.describe_index_stats() |
| print(f"Index stats: {stats}") |
| |
| |
| total_vectors = stats.get('total_vector_count', 0) |
| if total_vectors > 0: |
| print(f"Found existing index with {total_vectors} stored experiences. Continuing with shared knowledge base.") |
| else: |
| print("Index is empty. Ready to start building shared knowledge base.") |
| |
| except Exception as connect_error: |
| print(f"Error connecting to index: {connect_error}") |
| self.index = None |
| |
| else: |
| print("Warning: Pinecone API key not found. Memory storage disabled.") |
| self.index = None |
| |
| except Exception as e: |
| print(f"Error initializing Pinecone: {e}") |
| self.index = None |
| |
| def create_embedding(self, text: str) -> List[float]: |
| """Create embedding using Pinecone's inference API""" |
| try: |
| if not self.pc: |
| print("Pinecone client not available, returning zero vector") |
| return [0.0] * 1024 |
| |
| |
| response = self.pc.inference.embed( |
| model=self.embedding_model, |
| inputs=[text], |
| parameters={ |
| "input_type": "passage", |
| "truncate": "END" |
| } |
| ) |
| |
| if response and len(response.data) > 0: |
| return response.data[0].values |
| else: |
| print("No embedding data received, returning zero vector") |
| return [0.0] * 1024 |
| |
| except Exception as e: |
| print(f"Error creating embedding with Pinecone inference: {e}") |
| return [0.0] * 1024 |
| |
| def create_query_embedding(self, text: str) -> List[float]: |
| """Create embedding for query using Pinecone's inference API""" |
| try: |
| if not self.pc: |
| print("Pinecone client not available, returning zero vector") |
| return [0.0] * 1024 |
| |
| |
| response = self.pc.inference.embed( |
| model=self.embedding_model, |
| inputs=[text], |
| parameters={ |
| "input_type": "query", |
| "truncate": "END" |
| } |
| ) |
| |
| if response and len(response.data) > 0: |
| return response.data[0].values |
| else: |
| print("No embedding data received, returning zero vector") |
| return [0.0] * 1024 |
| |
| except Exception as e: |
| print(f"Error creating query embedding with Pinecone inference: {e}") |
| return [0.0] * 1024 |
| |
| def store_experience(self, user_input: str, ai_response: str, context: str = "") -> str: |
| """Store conversation experience in Pinecone using integrated inference""" |
| if not self.index: |
| return "Memory storage not available (Pinecone not configured)" |
| |
| try: |
| |
| experience_id = hashlib.md5( |
| f"{user_input}_{ai_response}_{datetime.datetime.now()}_{uuid.uuid4()}".encode() |
| ).hexdigest() |
| |
| |
| combined_text = f"User: {user_input}\nAI: {ai_response}\nContext: {context}" |
| |
| |
| try: |
| |
| record = { |
| "id": experience_id, |
| "content": combined_text, |
| "metadata": { |
| "user_input": user_input[:1000], |
| "ai_response": ai_response[:1000], |
| "context": context[:500], |
| "timestamp": datetime.datetime.now().isoformat(), |
| "interaction_type": "conversation", |
| "session_id": getattr(self, 'session_id', 'shared') |
| } |
| } |
| |
| |
| self.index.upsert_records([record]) |
| return f"✅ Experience stored with integrated inference, ID: {experience_id[:8]}..." |
| |
| except Exception as integrated_error: |
| print(f"Integrated inference failed: {integrated_error}") |
| |
| |
| embedding = self.create_embedding(combined_text) |
| |
| |
| self.index.upsert([(experience_id, embedding, { |
| "user_input": user_input[:1000], |
| "ai_response": ai_response[:1000], |
| "context": context[:500], |
| "timestamp": datetime.datetime.now().isoformat(), |
| "interaction_type": "conversation", |
| "session_id": getattr(self, 'session_id', 'shared') |
| })]) |
| |
| return f"✅ Experience stored with manual embedding, ID: {experience_id[:8]}..." |
| |
| except Exception as e: |
| return f"❌ Error storing experience: {e}" |
| |
| def retrieve_relevant_experiences(self, query: str, top_k: int = 3) -> List[Dict]: |
| """Retrieve relevant past experiences based on query using Pinecone inference""" |
| if not self.index: |
| return [] |
| |
| try: |
| |
| try: |
| results = self.index.search_records( |
| query={ |
| "top_k": top_k, |
| "inputs": {"text": query} |
| }, |
| include_metadata=True |
| ) |
| |
| relevant_experiences = [] |
| if hasattr(results, 'matches'): |
| for match in results.matches: |
| if match.score > 0.3: |
| relevant_experiences.append({ |
| "score": match.score, |
| "user_input": match.metadata.get("user_input", ""), |
| "ai_response": match.metadata.get("ai_response", ""), |
| "context": match.metadata.get("context", ""), |
| "timestamp": match.metadata.get("timestamp", ""), |
| "id": match.id |
| }) |
| |
| return relevant_experiences |
| |
| except Exception as integrated_error: |
| print(f"Integrated search failed: {integrated_error}") |
| |
| |
| query_embedding = self.create_query_embedding(query) |
| |
| |
| results = self.index.query( |
| vector=query_embedding, |
| top_k=top_k, |
| include_metadata=True |
| ) |
| |
| relevant_experiences = [] |
| for match in results.matches: |
| if match.score > 0.3: |
| relevant_experiences.append({ |
| "score": match.score, |
| "user_input": match.metadata.get("user_input", ""), |
| "ai_response": match.metadata.get("ai_response", ""), |
| "context": match.metadata.get("context", ""), |
| "timestamp": match.metadata.get("timestamp", ""), |
| "id": match.id |
| }) |
| |
| return relevant_experiences |
| |
| except Exception as e: |
| print(f"Error retrieving experiences: {e}") |
| return [] |
| |
| def rerank_results(self, query: str, documents: List[str]) -> List[Dict]: |
| """Rerank results using Pinecone's reranking model""" |
| if not self.pc or not documents: |
| return [] |
| |
| try: |
| |
| response = self.pc.inference.rerank( |
| model=self.rerank_model, |
| query=query, |
| documents=documents, |
| top_k=min(len(documents), 5) |
| ) |
| |
| reranked_results = [] |
| if response and hasattr(response, 'data'): |
| for result in response.data: |
| reranked_results.append({ |
| "document": result.document.text, |
| "score": result.relevance_score, |
| "index": result.index |
| }) |
| |
| return reranked_results |
| |
| except Exception as e: |
| print(f"Error reranking results: {e}") |
| return [] |
| |
| def call_openrouter(self, messages: List[Dict], temperature: float = 0.7) -> str: |
| """Call OpenRouter API""" |
| if not self.openrouter_api_key: |
| return "Error: OpenRouter API key not configured. Please set the OPENROUTER_API_KEY environment variable." |
| |
| try: |
| headers = { |
| "Authorization": f"Bearer {self.openrouter_api_key}", |
| "Content-Type": "application/json", |
| "HTTP-Referer": "https://huggingface.co", |
| "X-Title": "AI RAG Memory System" |
| } |
| |
| data = { |
| "model": self.model_name, |
| "messages": messages, |
| "temperature": temperature, |
| "max_tokens": 1000 |
| } |
| |
| response = requests.post( |
| "https://openrouter.ai/api/v1/chat/completions", |
| headers=headers, |
| json=data, |
| timeout=30 |
| ) |
| |
| if response.status_code == 200: |
| result = response.json() |
| return result["choices"][0]["message"]["content"] |
| else: |
| return f"API Error: {response.status_code} - {response.text}" |
| |
| except Exception as e: |
| return f"Error calling OpenRouter: {e}" |
| |
| def generate_response_with_rag(self, user_input: str, conversation_history: List = None) -> tuple: |
| """Generate AI response using RAG with stored experiences and Pinecone inference""" |
| |
| relevant_experiences = self.retrieve_relevant_experiences(user_input) |
| |
| |
| context_parts = [] |
| if relevant_experiences: |
| context_parts.append("🧠 Relevant past experiences from the shared knowledge base (powered by Pinecone inference):") |
| |
| |
| documents = [f"User: {exp['user_input']} AI: {exp['ai_response']}" for exp in relevant_experiences] |
| |
| |
| reranked = self.rerank_results(user_input, documents) |
| |
| if reranked: |
| context_parts.append(f"\n🔄 Reranked results using {self.rerank_model}:") |
| for i, result in enumerate(reranked, 1): |
| context_parts.append(f"{i}. (Relevance: {result['score']:.3f}) {result['document'][:200]}...") |
| else: |
| |
| for i, exp in enumerate(relevant_experiences, 1): |
| context_parts.append(f"\n{i}. Previous interaction from shared knowledge (similarity: {exp['score']:.2f}):") |
| context_parts.append(f" 👤 User: {exp['user_input'][:200]}...") |
| context_parts.append(f" 🤖 AI: {exp['ai_response'][:200]}...") |
| context_parts.append(f" 🕒 Time: {exp['timestamp'][:19]}") |
| if exp['context']: |
| context_parts.append(f" 📝 Context: {exp['context'][:100]}...") |
| context_parts.append("") |
| else: |
| context_parts.append("🆕 No previous relevant experiences found in the shared knowledge base. This is a fresh conversation!") |
| |
| context_str = "\n".join(context_parts) |
| |
| |
| messages = [ |
| { |
| "role": "system", |
| "content": f"""You are an AI assistant with access to a shared knowledge base of past conversations and interactions through Pinecone's vector database with integrated inference. |
| |
| IMPORTANT: The context below contains conversations from OTHER USERS and previous AI responses - this is NOT your personal memory, but rather a shared knowledge base that multiple users contribute to. Each conversation you have will also be added to this shared knowledge base for future users. |
| |
| The embeddings are generated using {self.embedding_model} and results are reranked with {self.rerank_model}. |
| |
| SHARED KNOWLEDGE BASE CONTEXT: |
| {context_str} |
| |
| Guidelines for using shared knowledge: |
| - The experiences above are from OTHER USERS' conversations, not your own memories |
| - Use these shared experiences to provide helpful, informed responses |
| - When referencing past interactions, make it clear they came from the shared knowledge base |
| - Don't claim personal ownership of experiences that belong to other users |
| - Learn from the collective knowledge while maintaining your own conversational identity |
| - Be transparent that you're drawing from a shared pool of experiences |
| - Build upon the collective wisdom while providing fresh, contextual responses |
| - Acknowledge when information comes from the shared knowledge base vs. the current conversation |
| |
| Remember: You're part of a learning system where each conversation contributes to helping future users, but you should be clear about the source of your knowledge.""" |
| } |
| ] |
| |
| |
| if conversation_history: |
| for msg in conversation_history[-5:]: |
| messages.append(msg) |
| |
| |
| messages.append({"role": "user", "content": user_input}) |
| |
| |
| ai_response = self.call_openrouter(messages) |
| |
| |
| storage_result = self.store_experience(user_input, ai_response, context_str) |
| |
| return ai_response, context_str, storage_result |
|
|
| def chat_with_rag(message: str, history: List = None) -> tuple: |
| """Main chat function for Gradio interface""" |
| if not message.strip(): |
| return "Please enter a message.", "", "" |
| |
| |
| conversation_history = [] |
| if history: |
| for user_msg, ai_msg in history: |
| if user_msg: |
| conversation_history.append({"role": "user", "content": user_msg}) |
| if ai_msg: |
| conversation_history.append({"role": "assistant", "content": ai_msg}) |
| |
| |
| ai_response, context_used, storage_info = rag_system.generate_response_with_rag( |
| message, conversation_history |
| ) |
| |
| return ai_response, context_used, storage_info |
|
|
| def clear_conversation(): |
| """Clear the conversation history""" |
| return [], "", "", "" |
|
|
| def get_system_status(): |
| """Get current system status""" |
| status = [] |
| |
| |
| if rag_system.index: |
| try: |
| stats = rag_system.index.describe_index_stats() |
| total_vectors = stats.get('total_vector_count', 0) |
| status.append(f"✅ Pinecone: Connected ({total_vectors} experiences)") |
| status.append(f"🧠 Embedding: {rag_system.embedding_model}") |
| except Exception as e: |
| status.append(f"⚠️ Pinecone: Connected but stats unavailable") |
| else: |
| status.append("❌ Pinecone: Not connected") |
| |
| |
| if rag_system.openrouter_api_key: |
| status.append(f"✅ OpenRouter: {rag_system.model_name}") |
| else: |
| status.append("❌ OpenRouter: Not configured") |
| |
| return "\n".join(status) |
|
|
| |
| minimal_css = """ |
| /* Clean, minimal styling */ |
| .gradio-container { |
| max-width: 1100px !important; |
| margin: 0 auto !important; |
| } |
| |
| /* Remove excess padding and margins */ |
| .block { |
| border: none !important; |
| box-shadow: none !important; |
| } |
| |
| /* Simple header */ |
| .header { |
| text-align: center; |
| padding: 1rem; |
| background: linear-gradient(90deg, #4f46e5, #7c3aed); |
| color: white; |
| border-radius: 8px; |
| margin-bottom: 1rem; |
| } |
| |
| /* Clean chatbot styling */ |
| .chatbot { |
| border: 1px solid #e5e7eb !important; |
| border-radius: 8px !important; |
| } |
| |
| /* Simple input styling */ |
| .input-box { |
| border: 1px solid #d1d5db !important; |
| border-radius: 6px !important; |
| } |
| |
| /* Clean buttons */ |
| .primary-btn { |
| background: #4f46e5 !important; |
| border: none !important; |
| border-radius: 6px !important; |
| color: white !important; |
| } |
| |
| .secondary-btn { |
| background: #f3f4f6 !important; |
| border: 1px solid #d1d5db !important; |
| border-radius: 6px !important; |
| color: #374151 !important; |
| } |
| |
| /* Context area */ |
| .context-area { |
| background: #f9fafb !important; |
| border: 1px solid #e5e7eb !important; |
| border-radius: 6px !important; |
| font-family: monospace !important; |
| font-size: 12px !important; |
| } |
| |
| /* Status display */ |
| .status-display { |
| background: #f0f9ff !important; |
| border: 1px solid #bae6fd !important; |
| border-radius: 6px !important; |
| font-family: monospace !important; |
| font-size: 12px !important; |
| } |
| |
| /* Memory info */ |
| .memory-display { |
| background: #f0fdf4 !important; |
| border: 1px solid #bbf7d0 !important; |
| border-radius: 6px !important; |
| font-size: 12px !important; |
| } |
| |
| /* Remove default gradio styling */ |
| .gr-button { |
| font-size: 14px !important; |
| } |
| |
| .gr-textbox { |
| font-size: 14px !important; |
| } |
| |
| /* Tabs styling */ |
| .tab-nav { |
| border-bottom: 1px solid #e5e7eb; |
| } |
| |
| /* Collapsible sections */ |
| .accordion { |
| border: 1px solid #e5e7eb; |
| border-radius: 6px; |
| margin: 0.5rem 0; |
| } |
| """ |
|
|
| |
| rag_system = RAGMemorySystem() |
|
|
| |
| with gr.Blocks( |
| title="AI Assistant with RAG", |
| css=minimal_css, |
| theme=gr.themes.Soft() |
| ) as demo: |
| |
| |
| gr.HTML(""" |
| <div class="header"> |
| <h2 style="margin: 0;">🤖 AI Assistant with RAG</h2> |
| <p style="margin: 5px 0 0 0; opacity: 0.9;">Powered by Pinecone Vector Search</p> |
| </div> |
| """) |
| |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| chatbot = gr.Chatbot( |
| height=450, |
| show_label=False, |
| elem_classes=["chatbot"] |
| ) |
| |
| with gr.Row(): |
| msg = gr.Textbox( |
| placeholder="Type your message...", |
| show_label=False, |
| scale=4, |
| elem_classes=["input-box"] |
| ) |
| send_btn = gr.Button( |
| "Send", |
| variant="primary", |
| scale=1, |
| elem_classes=["primary-btn"] |
| ) |
| |
| with gr.Row(): |
| clear_btn = gr.Button( |
| "Clear Chat", |
| variant="secondary", |
| elem_classes=["secondary-btn"] |
| ) |
| |
| with gr.Column(scale=1): |
| |
| with gr.Accordion("Knowledge Context", open=False): |
| context_display = gr.Textbox( |
| lines=8, |
| interactive=False, |
| show_label=False, |
| placeholder="Retrieved context appears here...", |
| elem_classes=["context-area"] |
| ) |
| |
| |
| storage_info = gr.Textbox( |
| lines=1, |
| interactive=False, |
| show_label=False, |
| placeholder="Storage status...", |
| elem_classes=["memory-display"] |
| ) |
| |
| |
| with gr.Accordion("Settings", open=False): |
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("### Model Configuration") |
| with gr.Row(): |
| model_input = gr.Textbox( |
| label="OpenRouter Model", |
| value=rag_system.model_name, |
| placeholder="Enter model name...", |
| scale=3 |
| ) |
| update_btn = gr.Button( |
| "Update", |
| variant="primary", |
| scale=1, |
| elem_classes=["primary-btn"] |
| ) |
| |
| model_status = gr.Textbox( |
| label="Current Model", |
| value=f"Using: {rag_system.model_name}", |
| interactive=False |
| ) |
| |
| gr.Markdown(""" |
| **Free Models:** |
| - `meta-llama/llama-3.2-3b-instruct:free` |
| - `microsoft/phi-3-mini-128k-instruct:free` |
| - `google/gemma-2-9b-it:free` |
| """) |
| |
| with gr.Column(): |
| gr.Markdown("### System Status") |
| status_display = gr.Textbox( |
| value=get_system_status(), |
| lines=4, |
| interactive=False, |
| show_label=False, |
| elem_classes=["status-display"] |
| ) |
| refresh_btn = gr.Button( |
| "Refresh", |
| variant="secondary", |
| elem_classes=["secondary-btn"] |
| ) |
| |
| |
| with gr.Accordion("About", open=False): |
| gr.Markdown(""" |
| ### AI Assistant with RAG |
| |
| This application uses **Retrieval-Augmented Generation** to provide more informed responses by: |
| - Storing conversations in a **Pinecone vector database** |
| - Retrieving relevant past experiences using **semantic search** |
| - Using **multilingual-e5-large** embeddings for understanding |
| - Reranking results with **pinecone-rerank-v0** for better relevance |
| |
| **Privacy:** Conversations are stored in a shared knowledge base. No personal data is retained. |
| """) |
| |
| |
| def respond(message, history): |
| if not message: |
| return history, "", "", "" |
| |
| ai_response, context_used, storage_info_text = chat_with_rag(message, history) |
| |
| if history is None: |
| history = [] |
| history.append((message, ai_response)) |
| |
| return history, "", context_used, storage_info_text |
| |
| def update_model_handler(new_model): |
| result = rag_system.update_model(new_model) |
| status = f"Using: {rag_system.model_name}" |
| return "", status, get_system_status() |
| |
| |
| send_btn.click( |
| respond, |
| inputs=[msg, chatbot], |
| outputs=[chatbot, msg, context_display, storage_info] |
| ) |
| |
| msg.submit( |
| respond, |
| inputs=[msg, chatbot], |
| outputs=[chatbot, msg, context_display, storage_info] |
| ) |
| |
| clear_btn.click( |
| clear_conversation, |
| outputs=[chatbot, msg, context_display, storage_info] |
| ) |
| |
| update_btn.click( |
| update_model_handler, |
| inputs=[model_input], |
| outputs=[model_input, model_status, status_display] |
| ) |
| |
| refresh_btn.click( |
| get_system_status, |
| outputs=[status_display] |
| ) |
|
|
| |
| if __name__ == "__main__": |
| demo.launch( |
| share=True, |
| server_name="0.0.0.0", |
| server_port=7860, |
| show_error=True |
| ) |