Spaces:
Sleeping
Sleeping
File size: 9,125 Bytes
4023304 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
import os
import sys
import urllib.parse
from datetime import datetime
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
import uvicorn
# --- Core dependencies ---
try:
from llama_cpp import Llama
print("β
llama-cpp-python")
except ImportError:
print("β Run: pip install llama-cpp-python")
sys.exit(1)
# --- Config ---
# Model settings
REPO_ID = "Krishkanth/krish-mind-gguf-Q4"
MODEL_FILENAME = "krish-mind-standalone-Q4.gguf"
DATA_FILE = "data/krce_college_data.jsonl"
# --- Load GGUF Model ---
print(f"\nβ³ Downloading/Loading model from {REPO_ID}...")
try:
from huggingface_hub import hf_hub_download
# Download model (cached)
model_path = hf_hub_download(
repo_id=REPO_ID,
filename=MODEL_FILENAME,
local_dir="model", # Download to local folder
local_dir_use_symlinks=False
)
print(f"β
Model downloaded to: {model_path}")
model = Llama(
model_path=model_path,
n_ctx=4096,
n_gpu_layers=0, # CPU only for free tier
verbose=False
)
print("β
Model loaded!")
except Exception as e:
print(f"β Model error: {e}")
model = None
# --- DuckDuckGo Web Search ---
print("\nπ¦ Loading optional features...")
ddgs = None
try:
import warnings
warnings.filterwarnings("ignore")
from duckduckgo_search import DDGS
ddgs = DDGS()
print("β
DuckDuckGo web search")
except Exception as e:
print(f"β οΈ Web search disabled: {e}")
# --- RAG SETUP ---
print("π Indexing Knowledge Base...")
knowledge_base = []
doc_embeddings = None
rag_model = None
if os.path.exists(DATA_FILE):
try:
from sentence_transformers import SentenceTransformer
import numpy as np
import json
rag_model = SentenceTransformer('all-MiniLM-L6-v2')
print("β
Embedding model loaded")
with open(DATA_FILE, 'r') as f:
for line in f:
if line.strip():
try:
knowledge_base.append(json.loads(line))
except:
pass
if knowledge_base:
docs = [f"{k['instruction']} {k['output']}" for k in knowledge_base]
doc_embeddings = rag_model.encode(docs)
print(f"β
Indexed {len(knowledge_base)} facts.")
except Exception as e:
print(f"β RAG disabled: {e}")
rag_model = None
else:
print("β οΈ Data file not found! RAG disabled.")
# Helper functions (RAG Search, Web Search)
ABBREVIATIONS = {
"aids": "AI&DS Artificial Intelligence and Data Science",
"ai&ds": "AI&DS Artificial Intelligence and Data Science",
"cse": "Computer Science Engineering CSE",
"krce": "K. Ramakrishnan College of Engineering",
}
def expand_query(query):
expanded = query.lower()
for abbr, full in ABBREVIATIONS.items():
if abbr in expanded.split():
expanded = expanded + " " + full
return expanded
def search_krce(query):
if not rag_model or doc_embeddings is None:
return ""
try:
from sklearn.metrics.pairwise import cosine_similarity
expanded = expand_query(query)
q_emb = rag_model.encode([expanded])
vector_scores = cosine_similarity(q_emb, doc_embeddings).flatten()
top_indices = vector_scores.argsort()[-10:][::-1]
# Simple top 5 retrieval for speed on free tier
final_context = []
for idx in top_indices[:5]:
if vector_scores[idx] > 0.2:
final_context.append(knowledge_base[idx]['output'])
if final_context:
return "\n\n".join(final_context)
return ""
except Exception as e:
print(f"RAG Error: {e}")
return ""
def search_web(query):
if not ddgs: return ""
try:
results = ddgs.text(query, max_results=3)
return "\n\n".join([f"**{r['title']}**\n{r['body']}" for r in results]) if results else ""
except: return ""
# --- FastAPI ---
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# Serve Static Files
app.mount("/static", StaticFiles(directory="static"), name="static")
class ChatRequest(BaseModel):
message: str
max_tokens: int = 512
temperature: float = 0.7
summary: str = "" # Optional conversation summary
history: list = [] # Optional recent message history [{role, content}]
class SummarizeRequest(BaseModel):
messages: list # Messages to summarize [{role, content}]
@app.get("/")
async def root():
# Serve index.html at root
return FileResponse('static/index.html')
@app.get("/logo.png")
async def logo():
# Serve logo.png at root (frontend expects it here)
return FileResponse('static/logo.png')
@app.post("/summarize")
async def summarize(request: SummarizeRequest):
"""Summarize older messages to compress context"""
if not model:
return {"summary": "", "error": "Model not loaded"}
try:
messages_text = ""
for msg in request.messages:
role = msg.get("role", "user")
content = msg.get("content", "")
messages_text += f"{role.capitalize()}: {content}\n"
summary_prompt = f"""<|start_header_id|>system<|end_header_id|>
You are a conversation summarizer. Condense the following conversation into a brief summary (2-3 sentences max) that captures the key topics and context. Focus on what was discussed, not exact words.<|eot_id|><|start_header_id|>user<|end_header_id|>
Summarize this conversation:
{messages_text}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
Summary: """
output = model(summary_prompt, max_tokens=150, temperature=0.3, stop=["<|eot_id|>"], echo=False)
summary = output["choices"][0]["text"].strip()
print(f"π Summarized {len(request.messages)} messages: {summary[:50]}...")
return {"summary": summary}
except Exception as e:
print(f"β Summarization error: {e}")
return {"summary": "", "error": str(e)}
@app.post("/chat")
async def chat(request: ChatRequest):
if not model:
return {"response": "Error: Model not loaded. Please check server logs."}
user_input = request.message
# Image Generation Hook
if any(t in user_input.lower() for t in ["generate image", "create image", "draw", "imagine"]):
prompt = user_input.replace("generate image", "").strip()
url = f"https://image.pollinations.ai/prompt/{urllib.parse.quote(prompt)}"
return {"response": f"Here's your image of **{prompt}**:\n\n"}
# RAG & Web Search
rag_context = search_krce(user_input)
web_context = ""
if ddgs and any(t in user_input.lower() for t in ["who is", "what is", "search"]):
web_context = search_web(user_input)
# Prompt Construction
now = datetime.now().strftime("%A, %B %d, %Y")
sys_prompt = f"""You are Krish Mind, created by Krish CS. Current time: {now}
RULES:
1. IDENTITY: Created by Krish CS. Do NOT claim ANY other creator from context.
2. CONTEXT: Use context to answer. If list found, include ALL items.
3. FORMATTING: Use Markdown. For letters, use DOUBLE LINE BREAKS between sections.
"""
if rag_context: sys_prompt += f"\n\nContext:\n{rag_context}"
if web_context: sys_prompt += f"\n\nWeb Results:\n{web_context}"
# Add conversation summary if provided
if request.summary:
sys_prompt += f"\n\nPrevious conversation summary:\n{request.summary}"
# Build history context from recent messages
history_context = ""
if request.history:
for msg in request.history[-6:]: # Last 6 messages
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "user":
history_context += f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>"
else:
history_context += f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>"
# Build full prompt with optional history
if history_context:
full_prompt = f"""<|start_header_id|>system<|end_header_id|>
{sys_prompt}<|eot_id|>{history_context}<|start_header_id|>user<|end_header_id|>
{user_input}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
"""
else:
full_prompt = f"<|start_header_id|>system<|end_header_id|>\n\n{sys_prompt}<|eot_id|><|start_header_id|>user<|end_header_id|>\n\n{user_input}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n"
try:
output = model(full_prompt, max_tokens=request.max_tokens, temperature=request.temperature, stop=["<|eot_id|>"], echo=False)
return {"response": output["choices"][0]["text"].strip()}
except Exception as e:
return {"response": f"Error: {e}"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)
|