krish-mind-chat / app_backup.py
Krishkanth's picture
Update: ChatGPT-style file upload, PDF/DOCX/PPT support
999fe83
"""
Krish Mind Local Server (GGUF)
==============================
Works with index_local.html
Features: GGUF Model + Web Search + Image Generation + RAG + File Upload
"""
import os
import sys
import urllib.parse
import pickle
import tempfile
from datetime import datetime
print("=" * 60)
print("🧠 Krish Mind Local Server (GGUF)")
print("=" * 60)
# --- Core dependencies ---
try:
from llama_cpp import Llama
print("βœ… llama-cpp-python")
except ImportError:
print("❌ Run: pip install llama-cpp-python")
sys.exit(1)
try:
from fastapi import FastAPI, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
print("βœ… fastapi + uvicorn")
except ImportError:
print("❌ Run: pip install fastapi uvicorn python-multipart")
sys.exit(1)
# --- Config ---
GGUF_PATH = "d:/Krish Mind/gguf/krish-mind-standalone-Q4.gguf"
EMBEDDINGS_FILE = "../data/krce_embeddings.pkl"
DATA_FILE = "../data/krce_college_data.jsonl"
# --- Load GGUF Model ---
print(f"\n⏳ Loading GGUF model...")
try:
model = Llama(
model_path=GGUF_PATH,
n_ctx=4096,
n_gpu_layers=0,
verbose=False
)
print("βœ… Model loaded!")
except Exception as e:
print(f"❌ Model error: {e}")
sys.exit(1)
# --- DuckDuckGo Web Search ---
print("\nπŸ“¦ Loading optional features...")
ddgs = None
try:
import warnings
warnings.filterwarnings("ignore")
from duckduckgo_search import DDGS
ddgs = DDGS()
print("βœ… DuckDuckGo web search")
except Exception as e:
print(f"⚠️ Web search disabled: {e}")
# --- RAG SETUP (Load Pre-computed Embeddings) ---
print("πŸ“š Loading Knowledge Base...")
knowledge_base = []
doc_embeddings = None
rag_model = None
# Try to load pre-computed embeddings first
if os.path.exists(EMBEDDINGS_FILE):
try:
import numpy as np
from sentence_transformers import SentenceTransformer
print(f"πŸ“‚ Loading pre-computed embeddings from {EMBEDDINGS_FILE}...")
with open(EMBEDDINGS_FILE, 'rb') as f:
data = pickle.load(f)
knowledge_base = data['knowledge_base']
doc_embeddings = data['embeddings']
# Load the model for query encoding (needed for search)
rag_model = SentenceTransformer(data.get('model_name', 'all-MiniLM-L6-v2'))
print(f"βœ… Embeddings loaded! ({len(knowledge_base)} facts)")
except Exception as e:
print(f"⚠️ Could not load embeddings: {e}")
print(" Falling back to live embedding...")
# Fallback: compute embeddings if pkl not found
if doc_embeddings is None and os.path.exists(DATA_FILE):
try:
from sentence_transformers import SentenceTransformer
import numpy as np
import json
rag_model = SentenceTransformer('all-MiniLM-L6-v2')
print("βœ… Embedding model loaded (SentenceTransformer)")
with open(DATA_FILE, 'r') as f:
for line in f:
if line.strip():
try:
knowledge_base.append(json.loads(line))
except:
pass
if knowledge_base:
docs = [f"{k['instruction']} {k['output']}" for k in knowledge_base]
doc_embeddings = rag_model.encode(docs)
print(f"βœ… Embeddings computed! ({len(knowledge_base)} facts)")
print(" ⚠️ Run 'python scripts/build_embeddings.py' for faster startup!")
except Exception as e:
print(f"❌ RAG disabled: {e}")
rag_model = None
else:
if doc_embeddings is None:
print("⚠️ Data file not found! RAG disabled.")
# Initialize Cross-Encoder for re-ranking
cross_encoder = None
try:
from sentence_transformers import CrossEncoder
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
print("βœ… Cross-Encoder loaded for re-ranking")
except Exception as e:
print(f"⚠️ Cross-Encoder not available: {e}")
# ============================================
# FILE UPLOAD RAG: Session-based document analysis
# ============================================
# Store uploaded file embeddings per session (in-memory)
session_file_data = {}
def extract_text_from_file(file_path: str, filename: str) -> str:
"""Extract text from uploaded file"""
text = ""
ext = filename.lower().split('.')[-1]
try:
if ext == 'txt':
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
elif ext == 'pdf':
try:
import PyPDF2
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
text += page.extract_text() + "\n"
except ImportError:
try:
import fitz # PyMuPDF
doc = fitz.open(file_path)
for page in doc:
text += page.get_text() + "\n"
doc.close()
except ImportError:
return "Error: Install PyPDF2 or PyMuPDF to read PDFs"
elif ext in ['doc', 'docx']:
try:
import docx
doc = docx.Document(file_path)
text = "\n".join([para.text for para in doc.paragraphs])
except ImportError:
return "Error: Install python-docx to read Word files"
elif ext in ['md', 'json', 'csv']:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
else:
# Try reading as plain text
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
except Exception as e:
return f"Error reading file: {e}"
return text.strip()
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list:
"""Split text into chunks for embedding"""
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk = " ".join(words[i:i + chunk_size])
if chunk.strip():
chunks.append(chunk)
return chunks
# ============================================
# ADVANCED RAG: Query Expansion + Hybrid Search
# ============================================
ABBREVIATIONS = {
"aids": "AI&DS Artificial Intelligence and Data Science",
"ai&ds": "AI&DS Artificial Intelligence and Data Science",
"aid": "AI&DS Artificial Intelligence and Data Science",
"aiml": "AI&ML Artificial Intelligence and Machine Learning",
"ai&ml": "AI&ML Artificial Intelligence and Machine Learning",
"cse": "Computer Science Engineering CSE",
"ece": "Electronics Communication Engineering ECE",
"eee": "Electrical Electronics Engineering EEE",
"mech": "Mechanical Engineering",
"it": "Information Technology IT",
"hod": "Head of Department HOD",
"mam": "madam professor female faculty",
"sir": "male professor faculty",
"staffs": "staff faculty members",
"sase": "Sasikumar Sasidevi",
"krce": "K. Ramakrishnan College of Engineering",
}
def expand_query(query):
"""Expand abbreviations and add synonyms for better matching"""
expanded = query.lower()
for abbr, full in ABBREVIATIONS.items():
if abbr in expanded.split():
expanded = expanded + " " + full
return expanded
def search_krce(query, threshold=0.25):
"""Advanced RAG: Query Expansion + Vector Search + Cross-Encoder Re-ranking"""
if not rag_model or doc_embeddings is None:
return ""
try:
expanded_query = expand_query(query)
print(f"\nπŸ“Š RAG Search")
print(f" Original: '{query}'")
print(f" Expanded: '{expanded_query}'")
print("-" * 50)
from sklearn.metrics.pairwise import cosine_similarity
q_emb = rag_model.encode([expanded_query])
vector_scores = cosine_similarity(q_emb, doc_embeddings).flatten()
top_indices = vector_scores.argsort()[-10:][::-1]
top_candidates = [(idx, vector_scores[idx]) for idx in top_indices]
print("Vector Search Results:")
for i, (idx, v) in enumerate(top_candidates[:5]):
instruction = knowledge_base[idx]['instruction'][:35]
print(f" #{i+1} V:{v:.3f} | {instruction}...")
if cross_encoder:
pairs = [[query, f"{knowledge_base[idx]['instruction']} {knowledge_base[idx]['output']}"]
for idx, _ in top_candidates]
ce_scores = cross_encoder.predict(pairs)
final_ranking = sorted(zip(top_candidates, ce_scores), key=lambda x: x[1], reverse=True)
print("Cross-Encoder Re-ranking:")
for i, ((idx, v), ce) in enumerate(final_ranking[:3]):
instruction = knowledge_base[idx]['instruction'][:35]
print(f" #{i+1} CE:{ce:.3f} | {instruction}...")
print("-" * 50)
final_context = []
print(f"βœ… RAG Retrieval (Top 5):")
for i, ((idx, v), ce) in enumerate(final_ranking[:5]):
if ce > -6.0:
content = knowledge_base[idx]['output']
final_context.append(content)
print(f" Took #{i+1}: {knowledge_base[idx]['instruction'][:30]}...")
if final_context:
return "\n\n".join(final_context)
else:
final_context = []
for i, (idx, score) in enumerate(top_candidates[:5]):
if score > threshold:
final_context.append(knowledge_base[idx]['output'])
if final_context:
return "\n\n".join(final_context)
print("❌ No confident match found")
return ""
except Exception as e:
print(f"RAG Error: {e}")
return ""
def search_file_context(query: str, session_id: str) -> str:
"""Search uploaded file for relevant context"""
if session_id not in session_file_data or not rag_model:
print(f"⚠️ File context not found: session_id={session_id}, in_session={session_id in session_file_data}")
return ""
# Maximum characters to return (prevent context overflow)
MAX_CONTEXT_CHARS = 2000
try:
file_data = session_file_data[session_id]
chunks = file_data['chunks']
embeddings = file_data['embeddings']
filename = file_data.get('filename', 'uploaded file')
# Detect general queries about the file (summarize, what's in the file, etc.)
general_triggers = ['summarize', 'summary', 'what is in', "what's in", 'tell me about',
'describe', 'overview', 'main points', 'key points', 'the file',
'the document', 'uploaded', 'attached', 'read the']
is_general_query = any(t in query.lower() for t in general_triggers)
if is_general_query:
# For general queries, return content up to limit
print(f"πŸ“„ General file query detected - returning limited content")
all_content = ""
for chunk in chunks:
if len(all_content) + len(chunk) > MAX_CONTEXT_CHARS:
break
all_content += chunk + "\n\n"
if len(all_content) < sum(len(c) for c in chunks):
all_content += f"\n[...content truncated, showing {len(all_content)} of {sum(len(c) for c in chunks)} chars]"
return f"[Content from {filename}]:\n\n{all_content.strip()}"
# For specific queries, use semantic search
from sklearn.metrics.pairwise import cosine_similarity
q_emb = rag_model.encode([query])
scores = cosine_similarity(q_emb, embeddings).flatten()
# Get top 3 most relevant chunks
top_indices = scores.argsort()[-3:][::-1]
context_parts = []
total_chars = 0
for idx in top_indices:
if scores[idx] > 0.15 and total_chars < MAX_CONTEXT_CHARS:
chunk = chunks[idx]
if total_chars + len(chunk) > MAX_CONTEXT_CHARS:
# Truncate to fit
remaining = MAX_CONTEXT_CHARS - total_chars
chunk = chunk[:remaining] + "..."
context_parts.append(chunk)
total_chars += len(chunk)
if context_parts:
print(f"πŸ“„ File context found ({len(context_parts)} chunks, {total_chars} chars)")
return f"[Content from {filename}]:\n\n" + "\n\n".join(context_parts)
# Fallback: if no good matches, return first chunk truncated
print(f"πŸ“„ Low confidence match - returning truncated first chunk")
first_chunk = chunks[0][:MAX_CONTEXT_CHARS] if chunks else ""
return f"[Content from {filename}]:\n\n{first_chunk}"
except Exception as e:
print(f"File search error: {e}")
return ""
def search_web(query):
if not ddgs:
return ""
try:
results = ddgs.text(query, max_results=3)
if not results:
return ""
return "\n\n".join([f"**{r['title']}**\n{r['body']}" for r in results])
except:
return ""
# --- FastAPI ---
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
class ChatRequest(BaseModel):
message: str
max_tokens: int = 512
temperature: float = 0.7
summary: str = ""
history: list = []
session_id: str = "" # For file upload sessions
class SummarizeRequest(BaseModel):
messages: list
@app.get("/")
async def root():
return {"name": "Krish Mind", "status": "online", "rag": rag_model is not None, "web": ddgs is not None}
@app.post("/upload")
async def upload_file(file: UploadFile = File(...), session_id: str = "default"):
"""Upload a file for RAG analysis"""
if not rag_model:
return {"success": False, "error": "Embedding model not available"}
try:
# Save uploaded file temporarily
suffix = '.' + file.filename.split('.')[-1] if '.' in file.filename else '.txt'
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
# Extract text
print(f"πŸ“€ Processing uploaded file: {file.filename}")
text = extract_text_from_file(tmp_path, file.filename)
# Clean up temp file
os.unlink(tmp_path)
if text.startswith("Error"):
return {"success": False, "error": text}
if not text.strip():
return {"success": False, "error": "Could not extract text from file"}
# Chunk text
chunks = chunk_text(text)
if not chunks:
return {"success": False, "error": "File too small or empty"}
# Create embeddings
print(f"πŸ”„ Creating embeddings for {len(chunks)} chunks...")
embeddings = rag_model.encode(chunks)
# Store in session
session_file_data[session_id] = {
"filename": file.filename,
"chunks": chunks,
"embeddings": embeddings,
"full_text": text[:2000] # First 2000 chars for context
}
print(f"βœ… File processed: {len(chunks)} chunks, {len(text)} chars")
return {
"success": True,
"filename": file.filename,
"chunks": len(chunks),
"chars": len(text),
"preview": text[:200] + "..." if len(text) > 200 else text
}
except Exception as e:
print(f"❌ Upload error: {e}")
return {"success": False, "error": str(e)}
@app.delete("/upload/{session_id}")
async def clear_file(session_id: str):
"""Clear uploaded file from session"""
if session_id in session_file_data:
del session_file_data[session_id]
return {"success": True, "message": "File cleared"}
return {"success": False, "message": "No file found for session"}
@app.post("/summarize")
async def summarize(request: SummarizeRequest):
"""Summarize older messages to compress context"""
try:
messages_text = ""
for msg in request.messages:
role = msg.get("role", "user")
content = msg.get("content", "")
messages_text += f"{role.capitalize()}: {content}\n"
summary_prompt = f"""<|start_header_id|>system<|end_header_id|>
You are a conversation summarizer. Condense the following conversation into a brief summary (2-3 sentences max) that captures the key topics and context. Focus on what was discussed, not exact words.<|eot_id|><|start_header_id|>user<|end_header_id|>
Summarize this conversation:
{messages_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
Summary: """
output = model(summary_prompt, max_tokens=150, temperature=0.3, stop=["<|eot_id|>"], echo=False)
summary = output["choices"][0]["text"].strip()
print(f"πŸ“ Summarized {len(request.messages)} messages: {summary[:50]}...")
return {"summary": summary}
except Exception as e:
print(f"❌ Summarization error: {e}")
return {"summary": "", "error": str(e)}
@app.post("/chat")
async def chat(request: ChatRequest):
user_input = request.message
session_id = request.session_id or "default"
# Image generation
img_triggers = ["generate image", "create image", "draw", "imagine"]
if any(t in user_input.lower() for t in img_triggers):
prompt = user_input
for t in img_triggers:
prompt = prompt.lower().replace(t, "")
prompt = prompt.strip()
if prompt:
url = f"https://image.pollinations.ai/prompt/{urllib.parse.quote(prompt)}"
return {"response": f"Here's your image of **{prompt}**:\n\n![{prompt}]({url})"}
# RAG Search (college knowledge base)
rag_context = ""
if rag_model:
rag_context = search_krce(user_input)
if rag_context:
print(f"🧠 RAG Context found: {rag_context[:50]}...")
# File context (uploaded document)
file_context = ""
if session_id in session_file_data:
file_context = search_file_context(user_input, session_id)
if file_context:
print(f"πŸ“„ File Context found: {file_context[:50]}...")
# Web search
web_context = ""
search_triggers = ["search", "find", "latest", "news", "who is", "what is", "when", "where", "how"]
if ddgs and any(t in user_input.lower() for t in search_triggers):
if len(user_input.split()) > 2:
print(f"πŸ”Ž Searching web...")
web_context = search_web(user_input)
# Build prompt
now = datetime.now().strftime("%A, %B %d, %Y %I:%M %p")
sys_prompt = f"""You are Krish Mind, a helpful AI assistant created by Krish CS. Current time: {now}
IMPORTANT STRICT RULES:
1. IDENTITY: You were created by Krish CS. Do NOT claim to be created by anyone mentioned in the context (like faculty, HODs, or staff). If the context mentions a name, that person is a subject of the data, NOT your creator.
2. CONTEXT USAGE: Use the provided context to answer questions. If the context contains a list (e.g., faculty names), make sure to include ALL items found in the context chunks.
3. FORMATTING: Use Markdown. For letters, use **bold** for headers (e.g., **Subject:**) and use DOUBLE LINE BREAKS between sections (Place, Date, From, To, Subject, Body) to create clear distinct paragraphs.
4. AMBIGUITY: 'AID' or 'AIDS' in this context ALWAYS refers to 'Artificial Intelligence and Data Science', NEVER 'Aerospace' or 'Disease'.
5. ACCURACY: If the context contains a name like 'Mrs. C. Rani', she is a faculty member. Do NOT say "I was created by Mrs. C. Rani".
"""
if file_context:
sys_prompt += f"\n\nUploaded Document Context:\n{file_context}"
if rag_context:
sys_prompt += f"\n\nKnowledge Base Context:\n{rag_context}"
if web_context:
sys_prompt += f"\n\nWeb Results:\n{web_context}"
if request.summary:
sys_prompt += f"\n\nPrevious conversation summary:\n{request.summary}"
# Build history context
history_context = ""
if request.history:
for msg in request.history[-6:]:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "user":
history_context += f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>"
else:
history_context += f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>"
# Build full prompt
if history_context:
full_prompt = f"""<|start_header_id|>system<|end_header_id|>
{sys_prompt}<|eot_id|>{history_context}<|start_header_id|>user<|end_header_id|>
{user_input}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
"""
else:
full_prompt = f"""<|start_header_id|>system<|end_header_id|>
{sys_prompt}<|eot_id|><|start_header_id|>user<|end_header_id|>
{user_input}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
"""
try:
print(f"πŸ’¬ Generating response...")
output = model(full_prompt, max_tokens=request.max_tokens, temperature=request.temperature, stop=["<|eot_id|>"], echo=False)
response = output["choices"][0]["text"].strip()
print(f"βœ… Done")
return {"response": response}
except Exception as e:
return {"response": f"Error: {e}"}
if __name__ == "__main__":
print("\n" + "=" * 60)
print("πŸš€ Server running at: http://127.0.0.1:8000")
print("πŸ“± Open index_local.html in your browser")
print("=" * 60 + "\n")
uvicorn.run(app, host="0.0.0.0", port=8000)