import os import sys import urllib.parse from datetime import datetime from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel import uvicorn # --- Core dependencies --- try: from llama_cpp import Llama print("✅ llama-cpp-python") except ImportError: print("❌ Run: pip install llama-cpp-python") sys.exit(1) # --- Config --- # Model settings REPO_ID = "Krishkanth/krish-mind-gguf-Q4" MODEL_FILENAME = "krish-mind-standalone-Q4.gguf" DATA_FILE = "data/krce_college_data.jsonl" # --- Load GGUF Model --- print(f"\n⏳ Downloading/Loading model from {REPO_ID}...") try: from huggingface_hub import hf_hub_download # Download model (cached) model_path = hf_hub_download( repo_id=REPO_ID, filename=MODEL_FILENAME, local_dir="model", # Download to local folder local_dir_use_symlinks=False ) print(f"✅ Model downloaded to: {model_path}") model = Llama( model_path=model_path, n_ctx=4096, n_gpu_layers=0, # CPU only for free tier verbose=False ) print("✅ Model loaded!") except Exception as e: print(f"❌ Model error: {e}") model = None # --- DuckDuckGo Web Search --- print("\n📦 Loading optional features...") ddgs = None try: import warnings warnings.filterwarnings("ignore") from duckduckgo_search import DDGS ddgs = DDGS() print("✅ DuckDuckGo web search") except Exception as e: print(f"⚠️ Web search disabled: {e}") # --- RAG SETUP --- print("📚 Indexing Knowledge Base...") knowledge_base = [] doc_embeddings = None rag_model = None if os.path.exists(DATA_FILE): try: from sentence_transformers import SentenceTransformer import numpy as np import json rag_model = SentenceTransformer('all-MiniLM-L6-v2') print("✅ Embedding model loaded") with open(DATA_FILE, 'r') as f: for line in f: if line.strip(): try: knowledge_base.append(json.loads(line)) except: pass if knowledge_base: docs = [f"{k['instruction']} {k['output']}" for k in knowledge_base] doc_embeddings = rag_model.encode(docs) print(f"✅ Indexed {len(knowledge_base)} facts.") except Exception as e: print(f"❌ RAG disabled: {e}") rag_model = None else: print("⚠️ Data file not found! RAG disabled.") # Helper functions (RAG Search, Web Search) ABBREVIATIONS = { "aids": "AI&DS Artificial Intelligence and Data Science", "ai&ds": "AI&DS Artificial Intelligence and Data Science", "cse": "Computer Science Engineering CSE", "krce": "K. Ramakrishnan College of Engineering", } def expand_query(query): expanded = query.lower() for abbr, full in ABBREVIATIONS.items(): if abbr in expanded.split(): expanded = expanded + " " + full return expanded def search_krce(query): if not rag_model or doc_embeddings is None: return "" try: from sklearn.metrics.pairwise import cosine_similarity expanded = expand_query(query) q_emb = rag_model.encode([expanded]) vector_scores = cosine_similarity(q_emb, doc_embeddings).flatten() top_indices = vector_scores.argsort()[-10:][::-1] # Simple top 5 retrieval for speed on free tier final_context = [] for idx in top_indices[:5]: if vector_scores[idx] > 0.2: final_context.append(knowledge_base[idx]['output']) if final_context: return "\n\n".join(final_context) return "" except Exception as e: print(f"RAG Error: {e}") return "" def search_web(query): if not ddgs: return "" try: results = ddgs.text(query, max_results=3) return "\n\n".join([f"**{r['title']}**\n{r['body']}" for r in results]) if results else "" except: return "" # --- FastAPI --- app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) # Serve Static Files app.mount("/static", StaticFiles(directory="static"), name="static") class ChatRequest(BaseModel): message: str max_tokens: int = 512 temperature: float = 0.7 summary: str = "" # Optional conversation summary history: list = [] # Optional recent message history [{role, content}] class SummarizeRequest(BaseModel): messages: list # Messages to summarize [{role, content}] @app.get("/") async def root(): # Serve index.html at root return FileResponse('static/index.html') @app.get("/logo.png") async def logo(): # Serve logo.png at root (frontend expects it here) return FileResponse('static/logo.png') @app.post("/summarize") async def summarize(request: SummarizeRequest): """Summarize older messages to compress context""" if not model: return {"summary": "", "error": "Model not loaded"} try: messages_text = "" for msg in request.messages: role = msg.get("role", "user") content = msg.get("content", "") messages_text += f"{role.capitalize()}: {content}\n" summary_prompt = f"""<|start_header_id|>system<|end_header_id|> You are a conversation summarizer. Condense the following conversation into a brief summary (2-3 sentences max) that captures the key topics and context. Focus on what was discussed, not exact words.<|eot_id|><|start_header_id|>user<|end_header_id|> Summarize this conversation: {messages_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|> Summary: """ output = model(summary_prompt, max_tokens=150, temperature=0.3, stop=["<|eot_id|>"], echo=False) summary = output["choices"][0]["text"].strip() print(f"📝 Summarized {len(request.messages)} messages: {summary[:50]}...") return {"summary": summary} except Exception as e: print(f"❌ Summarization error: {e}") return {"summary": "", "error": str(e)} @app.post("/chat") async def chat(request: ChatRequest): if not model: return {"response": "Error: Model not loaded. Please check server logs."} user_input = request.message # Image Generation Hook if any(t in user_input.lower() for t in ["generate image", "create image", "draw", "imagine"]): prompt = user_input.replace("generate image", "").strip() url = f"https://image.pollinations.ai/prompt/{urllib.parse.quote(prompt)}" return {"response": f"Here's your image of **{prompt}**:\n\n![{prompt}]({url})"} # RAG & Web Search rag_context = search_krce(user_input) web_context = "" if ddgs and any(t in user_input.lower() for t in ["who is", "what is", "search"]): web_context = search_web(user_input) # Prompt Construction now = datetime.now().strftime("%A, %B %d, %Y") sys_prompt = f"""You are Krish Mind, created by Krish CS. Current time: {now} RULES: 1. IDENTITY: Created by Krish CS. Do NOT claim ANY other creator from context. 2. CONTEXT: Use context to answer. If list found, include ALL items. 3. FORMATTING: Use Markdown. For letters, use DOUBLE LINE BREAKS between sections. """ if rag_context: sys_prompt += f"\n\nContext:\n{rag_context}" if web_context: sys_prompt += f"\n\nWeb Results:\n{web_context}" # Add conversation summary if provided if request.summary: sys_prompt += f"\n\nPrevious conversation summary:\n{request.summary}" # Build history context from recent messages history_context = "" if request.history: for msg in request.history[-6:]: # Last 6 messages role = msg.get("role", "user") content = msg.get("content", "") if role == "user": history_context += f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>" else: history_context += f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>" # Build full prompt with optional history if history_context: full_prompt = f"""<|start_header_id|>system<|end_header_id|> {sys_prompt}<|eot_id|>{history_context}<|start_header_id|>user<|end_header_id|> {user_input}<|eot_id|><|start_header_id|>assistant<|end_header_id|> """ else: full_prompt = f"<|start_header_id|>system<|end_header_id|>\n\n{sys_prompt}<|eot_id|><|start_header_id|>user<|end_header_id|>\n\n{user_input}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n" try: output = model(full_prompt, max_tokens=request.max_tokens, temperature=request.temperature, stop=["<|eot_id|>"], echo=False) return {"response": output["choices"][0]["text"].strip()} except Exception as e: return {"response": f"Error: {e}"} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)