Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import urllib.parse | |
| from datetime import datetime | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| import uvicorn | |
| # --- Core dependencies --- | |
| try: | |
| from llama_cpp import Llama | |
| print("β llama-cpp-python") | |
| except ImportError: | |
| print("β Run: pip install llama-cpp-python") | |
| sys.exit(1) | |
| # --- Config --- | |
| # Model settings | |
| REPO_ID = "Krishkanth/krish-mind-gguf-Q4" | |
| MODEL_FILENAME = "krish-mind-standalone-Q4.gguf" | |
| DATA_FILE = "data/krce_college_data.jsonl" | |
| # --- Load GGUF Model --- | |
| print(f"\nβ³ Downloading/Loading model from {REPO_ID}...") | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| # Download model (cached) | |
| model_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=MODEL_FILENAME, | |
| local_dir="model", # Download to local folder | |
| local_dir_use_symlinks=False | |
| ) | |
| print(f"β Model downloaded to: {model_path}") | |
| model = Llama( | |
| model_path=model_path, | |
| n_ctx=4096, | |
| n_gpu_layers=0, # CPU only for free tier | |
| verbose=False | |
| ) | |
| print("β Model loaded!") | |
| except Exception as e: | |
| print(f"β Model error: {e}") | |
| model = None | |
| # --- DuckDuckGo Web Search --- | |
| print("\nπ¦ Loading optional features...") | |
| ddgs = None | |
| try: | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| from duckduckgo_search import DDGS | |
| ddgs = DDGS() | |
| print("β DuckDuckGo web search") | |
| except Exception as e: | |
| print(f"β οΈ Web search disabled: {e}") | |
| # --- RAG SETUP --- | |
| print("π Indexing Knowledge Base...") | |
| knowledge_base = [] | |
| doc_embeddings = None | |
| rag_model = None | |
| if os.path.exists(DATA_FILE): | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| import numpy as np | |
| import json | |
| rag_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| print("β Embedding model loaded") | |
| with open(DATA_FILE, 'r') as f: | |
| for line in f: | |
| if line.strip(): | |
| try: | |
| knowledge_base.append(json.loads(line)) | |
| except: | |
| pass | |
| if knowledge_base: | |
| docs = [f"{k['instruction']} {k['output']}" for k in knowledge_base] | |
| doc_embeddings = rag_model.encode(docs) | |
| print(f"β Indexed {len(knowledge_base)} facts.") | |
| except Exception as e: | |
| print(f"β RAG disabled: {e}") | |
| rag_model = None | |
| else: | |
| print("β οΈ Data file not found! RAG disabled.") | |
| # Helper functions (RAG Search, Web Search) | |
| ABBREVIATIONS = { | |
| "aids": "AI&DS Artificial Intelligence and Data Science", | |
| "ai&ds": "AI&DS Artificial Intelligence and Data Science", | |
| "cse": "Computer Science Engineering CSE", | |
| "krce": "K. Ramakrishnan College of Engineering", | |
| } | |
| def expand_query(query): | |
| expanded = query.lower() | |
| for abbr, full in ABBREVIATIONS.items(): | |
| if abbr in expanded.split(): | |
| expanded = expanded + " " + full | |
| return expanded | |
| def search_krce(query): | |
| if not rag_model or doc_embeddings is None: | |
| return "" | |
| try: | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| expanded = expand_query(query) | |
| q_emb = rag_model.encode([expanded]) | |
| vector_scores = cosine_similarity(q_emb, doc_embeddings).flatten() | |
| top_indices = vector_scores.argsort()[-10:][::-1] | |
| # Simple top 5 retrieval for speed on free tier | |
| final_context = [] | |
| for idx in top_indices[:5]: | |
| if vector_scores[idx] > 0.2: | |
| final_context.append(knowledge_base[idx]['output']) | |
| if final_context: | |
| return "\n\n".join(final_context) | |
| return "" | |
| except Exception as e: | |
| print(f"RAG Error: {e}") | |
| return "" | |
| def search_web(query): | |
| if not ddgs: return "" | |
| try: | |
| results = ddgs.text(query, max_results=3) | |
| return "\n\n".join([f"**{r['title']}**\n{r['body']}" for r in results]) if results else "" | |
| except: return "" | |
| # --- FastAPI --- | |
| app = FastAPI() | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| # Serve Static Files | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| class ChatRequest(BaseModel): | |
| message: str | |
| max_tokens: int = 512 | |
| temperature: float = 0.7 | |
| summary: str = "" # Optional conversation summary | |
| history: list = [] # Optional recent message history [{role, content}] | |
| class SummarizeRequest(BaseModel): | |
| messages: list # Messages to summarize [{role, content}] | |
| async def root(): | |
| # Serve index.html at root | |
| return FileResponse('static/index.html') | |
| async def logo(): | |
| # Serve logo.png at root (frontend expects it here) | |
| return FileResponse('static/logo.png') | |
| async def summarize(request: SummarizeRequest): | |
| """Summarize older messages to compress context""" | |
| if not model: | |
| return {"summary": "", "error": "Model not loaded"} | |
| try: | |
| messages_text = "" | |
| for msg in request.messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| messages_text += f"{role.capitalize()}: {content}\n" | |
| summary_prompt = f"""<|start_header_id|>system<|end_header_id|> | |
| You are a conversation summarizer. Condense the following conversation into a brief summary (2-3 sentences max) that captures the key topics and context. Focus on what was discussed, not exact words.<|eot_id|><|start_header_id|>user<|end_header_id|> | |
| Summarize this conversation: | |
| {messages_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|> | |
| Summary: """ | |
| output = model(summary_prompt, max_tokens=150, temperature=0.3, stop=["<|eot_id|>"], echo=False) | |
| summary = output["choices"][0]["text"].strip() | |
| print(f"π Summarized {len(request.messages)} messages: {summary[:50]}...") | |
| return {"summary": summary} | |
| except Exception as e: | |
| print(f"β Summarization error: {e}") | |
| return {"summary": "", "error": str(e)} | |
| async def chat(request: ChatRequest): | |
| if not model: | |
| return {"response": "Error: Model not loaded. Please check server logs."} | |
| user_input = request.message | |
| # Image Generation Hook | |
| if any(t in user_input.lower() for t in ["generate image", "create image", "draw", "imagine"]): | |
| prompt = user_input.replace("generate image", "").strip() | |
| url = f"https://image.pollinations.ai/prompt/{urllib.parse.quote(prompt)}" | |
| return {"response": f"Here's your image of **{prompt}**:\n\n"} | |
| # RAG & Web Search | |
| rag_context = search_krce(user_input) | |
| web_context = "" | |
| if ddgs and any(t in user_input.lower() for t in ["who is", "what is", "search"]): | |
| web_context = search_web(user_input) | |
| # Prompt Construction | |
| now = datetime.now().strftime("%A, %B %d, %Y") | |
| sys_prompt = f"""You are Krish Mind, created by Krish CS. Current time: {now} | |
| RULES: | |
| 1. IDENTITY: Created by Krish CS. Do NOT claim ANY other creator from context. | |
| 2. CONTEXT: Use context to answer. If list found, include ALL items. | |
| 3. FORMATTING: Use Markdown. For letters, use DOUBLE LINE BREAKS between sections. | |
| """ | |
| if rag_context: sys_prompt += f"\n\nContext:\n{rag_context}" | |
| if web_context: sys_prompt += f"\n\nWeb Results:\n{web_context}" | |
| # Add conversation summary if provided | |
| if request.summary: | |
| sys_prompt += f"\n\nPrevious conversation summary:\n{request.summary}" | |
| # Build history context from recent messages | |
| history_context = "" | |
| if request.history: | |
| for msg in request.history[-6:]: # Last 6 messages | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if role == "user": | |
| history_context += f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>" | |
| else: | |
| history_context += f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>" | |
| # Build full prompt with optional history | |
| if history_context: | |
| full_prompt = f"""<|start_header_id|>system<|end_header_id|> | |
| {sys_prompt}<|eot_id|>{history_context}<|start_header_id|>user<|end_header_id|> | |
| {user_input}<|eot_id|><|start_header_id|>assistant<|end_header_id|> | |
| """ | |
| else: | |
| full_prompt = f"<|start_header_id|>system<|end_header_id|>\n\n{sys_prompt}<|eot_id|><|start_header_id|>user<|end_header_id|>\n\n{user_input}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n" | |
| try: | |
| output = model(full_prompt, max_tokens=request.max_tokens, temperature=request.temperature, stop=["<|eot_id|>"], echo=False) | |
| return {"response": output["choices"][0]["text"].strip()} | |
| except Exception as e: | |
| return {"response": f"Error: {e}"} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |