visionextract-crm / rag_core.py
Jash Doshi
Fix delete all persistence - use bulk delete for ChromaDB metadata and documents
b0e5f42
# rag_core.py - Chroma Cloud Integration
import os
import sys
import numpy as np
import json
from sentence_transformers import SentenceTransformer, CrossEncoder
import hashlib
import requests
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.stem import PorterStemmer
from typing import List, Dict, Tuple
import time
from dotenv import load_dotenv
import chromadb
# Load environment variables
load_dotenv()
# --- ROBUST NLTK SETUP ---
# Point NLTK to the local 'nltk_data' directory if it exists.
# On Render, this is created during the build step by download_nltk.py
local_nltk_data_path = os.path.join(os.path.dirname(__file__), 'nltk_data')
if os.path.exists(local_nltk_data_path):
nltk.data.path.insert(0, local_nltk_data_path)
# If nltk_data doesn't exist locally, NLTK will use default paths or download on-demand
# --- END SETUP ---
# Model configuration - matching app.py
MODEL_MAP = {
'gemini': 'google/gemma-3-4b-it:free',
'deepseek': 'google/gemma-3-27b-it:free',
'qwen': 'mistralai/mistral-small-3.1-24b-instruct:free',
'nvidia': 'nvidia/nemotron-nano-12b-v2-vl:free',
'amazon': 'amazon/nova-2-lite-v1:free'
}
# Best → fallback order (OCR strength)
FALLBACK_ORDER = [
'gemini',
'deepseek',
'qwen',
'nvidia',
'amazon'
]
# Chroma Cloud configuration
CHROMA_TENANT = os.getenv("CHROMA_TENANT")
CHROMA_DATABASE = os.getenv("CHROMA_DATABASE")
CHROMA_API_KEY = os.getenv("CHROMA_API_KEY")
embedding_model = None
reranker_model = None
chroma_client = None
collections: Dict[str, chromadb.Collection] = {}
keyword_indexes: Dict[str, Dict[str, Dict]] = {}
EMBEDDING_DIM = 768
CHUNK_SIZE = 300
CHUNK_OVERLAP = 50
# Track if RAG system is properly initialized
_rag_system_available = False
# Initialize components
def initialize_rag_system():
"""
Loads the embedding model, reranker, and connects to Chroma Cloud.
Returns True if successful, False otherwise.
"""
global embedding_model, reranker_model, chroma_client, _rag_system_available
print("RAG Core: Initializing Advanced RAG System with Chroma Cloud...")
# Validate Chroma Cloud credentials - graceful handling
if not all([CHROMA_TENANT, CHROMA_DATABASE, CHROMA_API_KEY]):
print("WARNING: Chroma Cloud credentials not found. RAG system will be disabled.")
print(" Set CHROMA_TENANT, CHROMA_DATABASE, and CHROMA_API_KEY to enable RAG.")
_rag_system_available = False
return False
try:
# Connect to Chroma Cloud
print("RAG Core: Connecting to Chroma Cloud...")
chroma_client = chromadb.CloudClient(
tenant=CHROMA_TENANT,
database=CHROMA_DATABASE,
api_key=CHROMA_API_KEY
)
print("RAG Core: Successfully connected to Chroma Cloud!")
print("RAG Core: Loading advanced embedding model...")
embedding_model = SentenceTransformer('all-mpnet-base-v2')
print("RAG Core: Loading cross-encoder reranker...")
reranker_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
print("RAG Core: Advanced models loaded successfully.")
_rag_system_available = True
return True
except Exception as e:
print(f"ERROR: Failed to initialize RAG system: {e}")
print(" RAG system will be disabled. The app will still work for OCR.")
_rag_system_available = False
return False
def is_rag_available():
"""Check if RAG system is available."""
return _rag_system_available
def _call_openrouter_api_with_fallback(api_key, selected_model_key, prompt):
"""
Calls OpenRouter API with fallback support for text-only requests.
"""
# Start with the selected model, then try others in fallback order
models_to_try = [selected_model_key]
for model in FALLBACK_ORDER:
if model != selected_model_key:
models_to_try.append(model)
last_error = None
for model_key in models_to_try:
model_name = MODEL_MAP.get(model_key)
if not model_name:
continue
print(f"RAG: Attempting API call with model: {model_name}...")
try:
response = requests.post(
url="https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={
"model": model_name,
"messages": [{"role": "user", "content": prompt}]
},
timeout=15 # Add timeout for faster failure recovery
)
response.raise_for_status()
api_response = response.json()
if 'choices' not in api_response or not api_response['choices']:
print(f"RAG: Model {model_name} returned unexpected response format")
last_error = f"Model {model_name} returned unexpected response format"
continue
result = api_response['choices'][0]['message']['content']
print(f"RAG: Successfully processed with model: {model_name}")
return result
except requests.exceptions.HTTPError as http_err:
error_msg = f"RAG: HTTP error for model {model_name}: {http_err}"
if hasattr(response, 'text'):
error_msg += f"\nResponse: {response.text}"
print(error_msg)
last_error = f"API request failed for {model_name} with status {response.status_code}."
continue
except Exception as e:
print(f"RAG: Error with model {model_name}: {e}")
last_error = f"An unexpected error occurred with model {model_name}."
continue
# If all models failed, return a user-friendly error
return f"I'm having trouble connecting to the AI models right now. Please check your API key and try again. Last error: {last_error}"
def _get_collection_name(user_api_key, mode):
"""
Creates a unique collection name for a user based on a hash of their API key.
"""
user_hash = hashlib.sha256(user_api_key.encode()).hexdigest()[:16]
return f"{user_hash}_{mode}"
def _get_or_create_collection(user_api_key, mode):
"""
Gets or creates a ChromaDB collection for the user/mode combination.
"""
collection_name = _get_collection_name(user_api_key, mode)
if collection_name in collections:
return collections[collection_name]
print(f"RAG Core: Getting/creating collection '{collection_name}' in Chroma Cloud")
collection = chroma_client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"} # Use cosine similarity
)
collections[collection_name] = collection
# Load keyword index from collection if exists
_load_keyword_index(user_api_key, mode)
return collection
def _load_keyword_index(user_api_key, mode):
"""
Loads keyword index from Chroma Cloud collection metadata.
"""
collection_name = _get_collection_name(user_api_key, mode)
if mode not in keyword_indexes:
keyword_indexes[mode] = {}
if user_api_key in keyword_indexes[mode]:
return
try:
collection = collections.get(collection_name)
if collection:
# Try to get keyword index document
results = collection.get(
ids=["__keyword_index__"],
include=["documents"]
)
if results and results['documents'] and results['documents'][0]:
keyword_indexes[mode][user_api_key] = json.loads(results['documents'][0])
print(f"RAG Core: Loaded keyword index from Chroma Cloud")
else:
keyword_indexes[mode][user_api_key] = {"documents": {}, "vocabulary": {}, "entities": {}}
else:
keyword_indexes[mode][user_api_key] = {"documents": {}, "vocabulary": {}, "entities": {}}
except Exception as e:
print(f"RAG Core: Could not load keyword index: {e}")
keyword_indexes[mode][user_api_key] = {"documents": {}, "vocabulary": {}, "entities": {}}
def _save_keyword_index(user_api_key, mode):
"""
Saves keyword index to Chroma Cloud collection.
"""
collection_name = _get_collection_name(user_api_key, mode)
collection = collections.get(collection_name)
if not collection or mode not in keyword_indexes or user_api_key not in keyword_indexes[mode]:
return
keyword_data = json.dumps(keyword_indexes[mode][user_api_key])
try:
# Upsert the keyword index document
collection.upsert(
ids=["__keyword_index__"],
documents=[keyword_data],
metadatas=[{"type": "keyword_index"}]
)
print("RAG Core: Saved keyword index to Chroma Cloud")
except Exception as e:
print(f"RAG Core: Error saving keyword index: {e}")
def _smart_chunking(text, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP):
"""
Intelligent chunking that preserves context and meaning.
"""
if not isinstance(text, str) or not text.strip():
return []
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
chunks = []
current_chunk = ""
for paragraph in paragraphs:
if len(current_chunk) + len(paragraph) <= chunk_size:
if current_chunk:
current_chunk += "\n\n" + paragraph
else:
current_chunk = paragraph
else:
if current_chunk:
chunks.append(current_chunk.strip())
if len(paragraph) > chunk_size:
sentences = nltk.sent_tokenize(paragraph)
temp_chunk = ""
for sentence in sentences:
if len(temp_chunk) + len(sentence) <= chunk_size:
temp_chunk += " " + sentence if temp_chunk else sentence
else:
if temp_chunk:
chunks.append(temp_chunk.strip())
temp_chunk = sentence
current_chunk = temp_chunk
else:
current_chunk = paragraph
if current_chunk:
chunks.append(current_chunk.strip())
final_chunks = []
for i, chunk in enumerate(chunks):
if i > 0 and chunk_overlap > 0:
prev_words = chunks[i-1].split()[-chunk_overlap:]
if prev_words:
chunk = " ".join(prev_words) + " " + chunk
final_chunks.append(chunk)
return final_chunks
def _enhanced_query_expansion(query: str) -> List[str]:
"""
Advanced query expansion with business context awareness.
"""
query_lower = query.lower()
expanded_queries = {query}
business_expansions = {
r"\bgeneral manager\b": ["GM", "manager", "head", "director", "chief"],
r"\bCEO\b": ["chief executive officer", "president", "director"],
r"\bCFO\b": ["chief financial officer", "finance director"],
r"\blocation\b": ["address", "located", "office", "headquarters", "branch"],
r"\boffice\b": ["location", "branch", "headquarters", "situated"],
r"\bservices\b": ["offerings", "products", "solutions", "business"],
r"\bcompany\b": ["business", "organization", "firm", "corporation", "enterprise"],
r"\bcontact\b": ["reach", "get in touch", "communicate"],
r"\bbranch\b": ["office", "location", "division", "subsidiary"],
r"\bheadquarters\b": ["main office", "head office", "corporate office"],
}
location_patterns = {
r"\bhong\s*kong\b": ["HK", "hongkong"],
r"\bsingapore\b": ["SG", "sing"],
r"\bunited\s*states\b": ["USA", "US", "America"],
r"\bunited\s*kingdom\b": ["UK", "Britain"],
}
for pattern, replacements in business_expansions.items():
if re.search(pattern, query_lower):
for replacement in replacements:
expanded_query = re.sub(pattern, replacement, query, flags=re.IGNORECASE)
expanded_queries.add(expanded_query)
for pattern, replacements in location_patterns.items():
if re.search(pattern, query_lower):
for replacement in replacements:
expanded_query = re.sub(pattern, replacement, query, flags=re.IGNORECASE)
expanded_queries.add(expanded_query)
return list(expanded_queries)
def _build_enhanced_keyword_index(text, doc_id, user_api_key, mode):
"""
Build an enhanced keyword index with business context awareness.
"""
if not isinstance(text, str) or not text.strip():
return
if mode not in keyword_indexes:
keyword_indexes[mode] = {}
if user_api_key not in keyword_indexes[mode]:
keyword_indexes[mode][user_api_key] = {"documents": {}, "vocabulary": {}, "entities": {}}
keyword_index = keyword_indexes[mode][user_api_key]
words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower())
stop_words = set(stopwords.words('english'))
ps = PorterStemmer()
business_entities = re.findall(r'\b[A-Z][a-zA-Z&\s]{1,30}(?:Ltd|Inc|Corp|Company|Group|Holdings|Limited|Corporation|Enterprise|Solutions)\b', text)
locations = re.findall(r'\b[A-Z][a-zA-Z\s]{2,20}(?:Street|Road|Avenue|Lane|Drive|Plaza|Square|Center|Centre|Building|Tower|Floor)\b', text)
for word in words:
if word not in stop_words and len(word) > 2:
stemmed = ps.stem(word)
if stemmed not in keyword_index["vocabulary"]:
keyword_index["vocabulary"][stemmed] = []
if doc_id not in keyword_index["vocabulary"][stemmed]:
keyword_index["vocabulary"][stemmed].append(doc_id)
if "entities" not in keyword_index:
keyword_index["entities"] = {}
for entity in business_entities + locations:
entity_key = entity.lower()
if entity_key not in keyword_index["entities"]:
keyword_index["entities"][entity_key] = []
if doc_id not in keyword_index["entities"][entity_key]:
keyword_index["entities"][entity_key].append(doc_id)
keyword_index["documents"][doc_id] = {
"text": text,
"length": len(text),
"word_count": len(words),
"entities": business_entities + locations
}
def _enhanced_keyword_search(query, user_api_key, mode, top_k=10):
"""
Enhanced keyword search with business context awareness.
"""
if mode not in keyword_indexes or user_api_key not in keyword_indexes[mode]:
return []
keyword_index = keyword_indexes[mode][user_api_key]
ps = PorterStemmer()
query_terms = [ps.stem(term) for term in query.lower().split()
if term not in stopwords.words('english') and len(term) > 2]
entity_matches = []
if "entities" in keyword_index:
for entity, docs in keyword_index["entities"].items():
if any(term in entity for term in query.lower().split()):
entity_matches.extend(docs)
doc_scores: Dict[str, float] = {}
for term in query_terms:
if term in keyword_index.get("vocabulary", {}):
for doc_id in keyword_index["vocabulary"][term]:
if doc_id not in doc_scores:
doc_scores[doc_id] = 0
doc_scores[doc_id] += 1.0
for doc_id in entity_matches:
if doc_id not in doc_scores:
doc_scores[doc_id] = 0
doc_scores[doc_id] += 2.0
final_scores = {}
for doc_id, score in doc_scores.items():
if doc_id in keyword_index.get("documents", {}):
doc_length = keyword_index["documents"][doc_id].get("word_count", 1)
final_scores[doc_id] = score / (1 + np.log(1 + doc_length))
sorted_docs = sorted(final_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
return [doc_id for doc_id, score in sorted_docs]
def add_document_to_knowledge_base(user_api_key, document_text, document_id, mode):
"""
Processes a document's text and adds it to the knowledge base with Chroma Cloud.
"""
try:
print(f"\nRAG: Adding document '{document_id}' to Chroma Cloud...")
collection = _get_or_create_collection(user_api_key, mode)
chunks = _smart_chunking(document_text)
print(f"RAG: Created {len(chunks)} intelligent chunks")
_build_enhanced_keyword_index(document_text, document_id, user_api_key, mode)
print("RAG: Built enhanced keyword index")
if not chunks:
print("RAG: No chunks to vectorize, saving keyword index only")
_save_keyword_index(user_api_key, mode)
return
chunk_embeddings = embedding_model.encode(chunks, normalize_embeddings=True)
print("RAG: Generated embeddings")
# Prepare data for Chroma
ids = [f"{document_id}_chunk_{i}" for i in range(len(chunks))]
metadatas = [
{
"source_doc": document_id,
"chunk_id": i,
"length": len(chunk),
"type": "document_chunk"
}
for i, chunk in enumerate(chunks)
]
# Add to Chroma Cloud
collection.upsert(
ids=ids,
embeddings=chunk_embeddings.tolist(),
documents=chunks,
metadatas=metadatas
)
# Save keyword index
_save_keyword_index(user_api_key, mode)
print(f"RAG: Successfully indexed document to Chroma Cloud. Total chunks: {len(chunks)}")
except Exception as e:
print(f"CRITICAL ERROR in add_document_to_knowledge_base: {e}")
import traceback
traceback.print_exc()
raise e
def remove_document_from_knowledge_base(user_api_key, document_id, mode):
"""
Removes all chunks associated with a document from Chroma Cloud.
"""
try:
collection = _get_or_create_collection(user_api_key, mode)
# Delete all chunks from this document using where filter
collection.delete(
where={"source_doc": document_id}
)
# Update keyword index
if mode in keyword_indexes and user_api_key in keyword_indexes[mode]:
keyword_index = keyword_indexes[mode][user_api_key]
# Remove document from vocabulary
if "vocabulary" in keyword_index:
for term in list(keyword_index["vocabulary"].keys()):
if document_id in keyword_index["vocabulary"][term]:
keyword_index["vocabulary"][term].remove(document_id)
if not keyword_index["vocabulary"][term]:
del keyword_index["vocabulary"][term]
# Remove document from entities
if "entities" in keyword_index:
for entity in list(keyword_index["entities"].keys()):
if document_id in keyword_index["entities"][entity]:
keyword_index["entities"][entity].remove(document_id)
if not keyword_index["entities"][entity]:
del keyword_index["entities"][entity]
# Remove document metadata
if "documents" in keyword_index and document_id in keyword_index["documents"]:
del keyword_index["documents"][document_id]
_save_keyword_index(user_api_key, mode)
print(f"RAG: Removed document '{document_id}' from Chroma Cloud")
except Exception as e:
print(f"Error removing document: {e}")
import traceback
traceback.print_exc()
def _advanced_hybrid_search(query, user_api_key, mode, top_k=10):
"""
Advanced hybrid search using Chroma Cloud query.
"""
collection = _get_or_create_collection(user_api_key, mode)
# Check if collection has documents
try:
count = collection.count()
if count == 0:
return []
except:
return []
# Vector search with Chroma Cloud
expanded_queries = _enhanced_query_expansion(query)
all_results = {}
for q in expanded_queries[:3]: # Limit to avoid too much noise
query_embedding = embedding_model.encode([q], normalize_embeddings=True)
try:
results = collection.query(
query_embeddings=query_embedding.tolist(),
n_results=min(top_k * 2, count),
where={"type": "document_chunk"},
include=["documents", "metadatas", "distances"]
)
if results and results['ids'] and results['ids'][0]:
for i, (doc_id, doc, metadata, distance) in enumerate(zip(
results['ids'][0],
results['documents'][0],
results['metadatas'][0],
results['distances'][0]
)):
# Convert distance to similarity score (Chroma returns L2 distance for cosine)
score = 1 - distance if distance else 0
if doc_id not in all_results or all_results[doc_id]['score'] < score:
all_results[doc_id] = {
'text': doc,
'source_doc': metadata.get('source_doc', ''),
'chunk_id': metadata.get('chunk_id', 0),
'length': metadata.get('length', 0),
'score': score
}
except Exception as e:
print(f"RAG: Search error: {e}")
continue
# Enhanced keyword search boost
keyword_doc_ids = set(_enhanced_keyword_search(query, user_api_key, mode, top_k=top_k*2))
# Add keyword boost to scores
for doc_id, result in all_results.items():
if result.get('source_doc') in keyword_doc_ids:
result['score'] = result.get('score', 0) + 0.4
# Sort and return top results
sorted_results = sorted(all_results.items(), key=lambda x: x[1]['score'], reverse=True)[:top_k]
return [result for doc_id, result in sorted_results]
def _intelligent_rerank(query, candidate_chunks, top_k=5):
"""
Intelligent reranking that considers both relevance and context completeness.
"""
if not candidate_chunks or not reranker_model:
return candidate_chunks[:top_k]
# Use cross-encoder for initial scoring
pairs = [(query, chunk["text"]) for chunk in candidate_chunks]
cross_encoder_scores = reranker_model.predict(pairs)
# Additional scoring based on content completeness
enhanced_scores = []
for i, (chunk, ce_score) in enumerate(zip(candidate_chunks, cross_encoder_scores)):
text = chunk["text"]
# Bonus for chunks that seem to contain complete information
completeness_bonus = 0
if any(marker in text.lower() for marker in ["located", "address", "office", "branch"]):
completeness_bonus += 0.1
if any(marker in text.lower() for marker in ["manager", "director", "ceo", "head"]):
completeness_bonus += 0.1
if any(marker in text.lower() for marker in ["company", "business", "organization"]):
completeness_bonus += 0.05
final_score = ce_score + completeness_bonus
enhanced_scores.append((chunk, final_score))
# Sort by enhanced scores and return top results
reranked = sorted(enhanced_scores, key=lambda x: x[1], reverse=True)
return [chunk for chunk, score in reranked[:top_k]]
def query_knowledge_base(user_api_key, query_text, mode, selected_model_key):
"""
Advanced query processing with human-like response generation using selected model with fallback.
"""
collection = _get_or_create_collection(user_api_key, mode)
try:
count = collection.count()
# Exclude keyword index from count
if count <= 1:
return "I don't have any documents in my knowledge base yet. Please upload some brochures or business cards first, and I'll be happy to help you find information from them!"
except:
return "I don't have any documents in my knowledge base yet. Please upload some brochures or business cards first, and I'll be happy to help you find information from them!"
print(f"RAG: Processing query: '{query_text}' with model: {selected_model_key}")
# Optimized search - use only 2 query variations for speed
expanded_queries = _enhanced_query_expansion(query_text)
print(f"RAG: Expanded to {len(expanded_queries)} query variations")
all_candidates = []
seen_texts = set()
for query in expanded_queries[:2]: # Reduced from 3 to 2 for speed
candidates = _advanced_hybrid_search(query, user_api_key, mode, top_k=5) # Reduced from 8 to 5
for candidate in candidates:
text = candidate.get('text', '')
if text and text not in seen_texts:
seen_texts.add(text)
all_candidates.append(candidate)
# Intelligent reranking - reduced to 3 chunks for faster LLM response
top_chunks = _intelligent_rerank(query_text, all_candidates, top_k=3)
if not top_chunks:
return f"I couldn't find specific information about '{query_text}' in the uploaded documents. Could you try rephrasing your question or check if the information might be in a document that hasn't been uploaded yet?"
# Prepare context for AI model
context = "\n\n---DOCUMENT SECTION---\n\n".join([chunk["text"] for chunk in top_chunks])
print(f"RAG: Found {len(top_chunks)} relevant sections. Generating response with {selected_model_key}...")
try:
prompt = f"""You are a world-class AI assistant providing beautifully formatted, accurate answers based on document data.
**FORMATTING RULES (CRITICAL):**
- Use **bold** for names, companies, and important terms
- Use bullet points (•) for lists of items
- Use numbered lists (1. 2. 3.) for steps or rankings
- Keep responses concise but complete - aim for 2-4 sentences unless more detail is needed
- Structure longer responses with clear sections
- For contact info, format cleanly: **Name** - email@example.com - +1234567890
**ACCURACY RULES:**
- Only state facts found in the documents below
- Be direct and specific - give the exact answer first, then context
- If asked "who is X" or "what is X's role", lead with the answer immediately
**USER QUESTION:** {query_text}
**DOCUMENT DATA:**
{context}
**YOUR RESPONSE (formatted beautifully with markdown):**"""
response = _call_openrouter_api_with_fallback(user_api_key, selected_model_key, prompt)
return response
except Exception as e:
print(f"RAG: An unexpected error occurred during response generation: {e}")
import traceback
traceback.print_exc()
return "I found relevant information but ran into an unexpected error while processing it. Please try again."
# ============================================
# METADATA PERSISTENCE FUNCTIONS
# ============================================
def save_metadata_to_chroma(user_api_key, mode, document_id, metadata_dict):
"""
Save contact/brochure metadata to ChromaDB for persistence across restarts.
Stores the full metadata as a JSON document with a special ID prefix.
"""
if not _rag_system_available:
print("RAG: System not available, cannot save metadata to ChromaDB")
return False
try:
collection = _get_or_create_collection(user_api_key, mode)
metadata_id = f"__metadata__{document_id}"
# Store metadata as JSON string in document field
metadata_json = json.dumps(metadata_dict, ensure_ascii=False)
collection.upsert(
ids=[metadata_id],
documents=[metadata_json],
embeddings=[[0.0] * EMBEDDING_DIM], # Dummy embedding to match collection dimension
metadatas=[{
"type": "metadata",
"mode": mode,
"document_id": document_id,
"timestamp": str(time.time())
}]
)
print(f"RAG: Saved metadata for {document_id} to ChromaDB")
return True
except Exception as e:
print(f"RAG: Error saving metadata to ChromaDB: {e}")
return False
def load_all_metadata_from_chroma(user_api_key, mode):
"""
Load all saved metadata for a user/mode from ChromaDB.
Returns a list of metadata dictionaries.
"""
if not _rag_system_available:
print("RAG: System not available, cannot load metadata from ChromaDB")
return []
try:
collection = _get_or_create_collection(user_api_key, mode)
# Query for all metadata documents
results = collection.get(
where={"type": "metadata"},
include=["documents", "metadatas"]
)
if not results or not results['documents']:
return []
metadata_list = []
for doc, meta in zip(results['documents'], results['metadatas']):
try:
if doc and meta.get('type') == 'metadata':
parsed = json.loads(doc)
metadata_list.append(parsed)
except json.JSONDecodeError:
continue
# Sort by timestamp if available (newest first)
metadata_list.sort(key=lambda x: x.get('_timestamp', 0), reverse=True)
print(f"RAG: Loaded {len(metadata_list)} metadata records from ChromaDB for {mode}")
return metadata_list
except Exception as e:
print(f"RAG: Error loading metadata from ChromaDB: {e}")
return []
def delete_metadata_from_chroma(user_api_key, mode, document_id):
"""
Delete metadata document from ChromaDB.
"""
if not _rag_system_available:
return False
try:
collection = _get_or_create_collection(user_api_key, mode)
metadata_id = f"__metadata__{document_id}"
collection.delete(ids=[metadata_id])
print(f"RAG: Deleted metadata for {document_id} from ChromaDB")
return True
except Exception as e:
print(f"RAG: Error deleting metadata from ChromaDB: {e}")
return False
def delete_all_metadata_from_chroma(user_api_key, mode):
"""
Delete ALL metadata documents from ChromaDB for a user/mode.
This is used for the 'delete all' feature.
Returns the count of deleted items.
"""
if not _rag_system_available:
print("RAG: System not available, cannot delete metadata from ChromaDB")
return 0
try:
collection = _get_or_create_collection(user_api_key, mode)
# Get all metadata document IDs
results = collection.get(
where={"type": "metadata"},
include=["metadatas"]
)
if not results or not results['ids']:
print(f"RAG: No metadata to delete for {mode}")
return 0
deleted_count = len(results['ids'])
# Delete all metadata documents
collection.delete(ids=results['ids'])
print(f"RAG: Deleted {deleted_count} metadata records from ChromaDB for {mode}")
return deleted_count
except Exception as e:
print(f"RAG: Error deleting all metadata from ChromaDB: {e}")
import traceback
traceback.print_exc()
return 0
def delete_all_documents_from_chroma(user_api_key, mode):
"""
Delete ALL document chunks from ChromaDB for a user/mode.
This removes RAG knowledge base entries.
Returns the count of deleted chunks.
"""
if not _rag_system_available:
print("RAG: System not available, cannot delete documents from ChromaDB")
return 0
try:
collection = _get_or_create_collection(user_api_key, mode)
# Get all document chunk IDs
results = collection.get(
where={"type": "document_chunk"},
include=["metadatas"]
)
if not results or not results['ids']:
print(f"RAG: No document chunks to delete for {mode}")
return 0
deleted_count = len(results['ids'])
# Delete all document chunks
collection.delete(ids=results['ids'])
print(f"RAG: Deleted {deleted_count} document chunks from ChromaDB for {mode}")
# Clear keyword index
if mode in keyword_indexes and user_api_key in keyword_indexes[mode]:
keyword_indexes[mode][user_api_key] = {"documents": {}, "vocabulary": {}, "entities": {}}
_save_keyword_index(user_api_key, mode)
return deleted_count
except Exception as e:
print(f"RAG: Error deleting all documents from ChromaDB: {e}")
import traceback
traceback.print_exc()
return 0
def update_metadata_in_chroma(user_api_key, mode, document_id, field, value, contact_id=None):
"""
Update a specific field in the metadata stored in ChromaDB.
For brochures with contact_id, updates the specific contact.
"""
if not _rag_system_available:
return False
try:
collection = _get_or_create_collection(user_api_key, mode)
metadata_id = f"__metadata__{document_id}"
# Get existing metadata
results = collection.get(ids=[metadata_id], include=["documents"])
if not results or not results['documents'] or not results['documents'][0]:
print(f"RAG: Metadata not found for {document_id}")
return False
metadata = json.loads(results['documents'][0])
# Update the field
if mode == 'cards':
metadata[field] = value
elif mode == 'brochures' and contact_id:
# Find and update the contact
for contact in metadata.get('contacts', []):
if contact.get('id') == contact_id:
contact[field] = value
break
# Save updated metadata
return save_metadata_to_chroma(user_api_key, mode, document_id, metadata)
except Exception as e:
print(f"RAG: Error updating metadata in ChromaDB: {e}")
return False
# ============================================
# CHAT MEMORY FUNCTIONS
# ============================================
def save_chat_message(user_api_key, mode, role, content):
"""
Save a chat message to ChromaDB for conversation memory.
Role should be 'user' or 'assistant'.
"""
if not _rag_system_available:
return False
try:
collection = _get_or_create_collection(user_api_key, mode)
# Create unique ID for this message
message_id = f"__chat__{mode}_{int(time.time() * 1000)}"
message_data = {
"role": role,
"content": content,
"timestamp": time.time()
}
collection.upsert(
ids=[message_id],
documents=[json.dumps(message_data, ensure_ascii=False)],
embeddings=[[0.0] * EMBEDDING_DIM], # Dummy embedding to match collection dimension
metadatas=[{
"type": "chat_message",
"mode": mode,
"role": role,
"timestamp": str(time.time())
}]
)
return True
except Exception as e:
print(f"RAG: Error saving chat message: {e}")
return False
def get_chat_history(user_api_key, mode, limit=10):
"""
Get recent chat history from ChromaDB.
Returns list of {role, content, timestamp} dictionaries.
"""
if not _rag_system_available:
return []
try:
collection = _get_or_create_collection(user_api_key, mode)
results = collection.get(
where={"type": "chat_message"},
include=["documents", "metadatas"]
)
if not results or not results['documents']:
return []
messages = []
for doc, meta in zip(results['documents'], results['metadatas']):
try:
if doc and meta.get('type') == 'chat_message':
parsed = json.loads(doc)
messages.append(parsed)
except json.JSONDecodeError:
continue
# Sort by timestamp (oldest first for conversation flow)
messages.sort(key=lambda x: x.get('timestamp', 0))
# Return last N messages
return messages[-limit:]
except Exception as e:
print(f"RAG: Error loading chat history: {e}")
return []
def clear_chat_history(user_api_key, mode):
"""
Clear all chat messages for a user/mode from ChromaDB.
"""
if not _rag_system_available:
return False
try:
collection = _get_or_create_collection(user_api_key, mode)
# Get all chat message IDs
results = collection.get(
where={"type": "chat_message"},
include=["metadatas"]
)
if results and results['ids']:
collection.delete(ids=results['ids'])
print(f"RAG: Cleared {len(results['ids'])} chat messages for {mode}")
return True
except Exception as e:
print(f"RAG: Error clearing chat history: {e}")
return False