import os import json import re import requests import threading import gc from datetime import datetime from bs4 import BeautifulSoup from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from huggingface_hub import hf_hub_download from llama_cpp import Llama # ========================================== # 1. CONFIGURATION & CORE SETUP # ========================================== # IMPORTANT: Set REAPERAI_SECRET as a Secret in your Space Settings SECRET_KEY = os.environ.get("REAPERAI_SECRET", "jan30") # Optimized for 16GB RAM: Smaller, heavily quantized model MODEL_REPO = "bartowski/Qwen2.5-1.5B-Instruct-GGUF" MODEL_FILE = "Qwen2.5-1.5B-Instruct-Q4_K_M.gguf" # Exact filename chat_memory = {} MAX_GLOBAL_USERS = 50 memory_lock = threading.Lock() llm = None # Initialize as None model_semaphore = threading.Semaphore(1) # Only 1 inference at a time # ========================================== # 2. MODEL LOADING (OPTIMIZED FOR SPACES) # ========================================== @asynccontextmanager async def lifespan(app: FastAPI): """ Lifespan handler for FastAPI startup/shutdown. Downloads and loads the model. """ print(f"--- [SYSTEM] Initializing ReaperAI on Hugging Face Space ---") global llm try: # Step 1: Download model to cache (will use /tmp from Dockerfile) print(f"--- [SYSTEM] Downloading model: {MODEL_REPO}/{MODEL_FILE} ---") model_path = hf_hub_download( repo_id=MODEL_REPO, filename=MODEL_FILE, cache_dir=os.getenv("HF_HOME", "/tmp") ) # Step 2: Load with optimized settings for 2 vCPU / 16GB RAM print(f"--- [SYSTEM] Loading model into RAM (this may take a moment) ---") llm = Llama( model_path=model_path, n_ctx=1024, # Reduced for memory efficiency n_threads=2, # Matches your 2 vCPUs n_gpu_layers=0, # CPU only verbose=False ) print(f"--- [SYSTEM] Model loaded successfully. ReaperAI is ready. ---") except Exception as e: print(f"--- [CRITICAL ERROR] Model loading failed: {str(e)} ---") llm = None # Ensure it's None if loading fails yield # App runs here # Cleanup on shutdown (optional) if llm is not None: del llm gc.collect() # ========================================== # 3. FASTAPI APP INITIALIZATION # ========================================== app = FastAPI( title="ReaperAI Secure Core", description="AI Assistant with Web Search Capabilities", version="2.0", lifespan=lifespan ) # CORS configuration app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["POST", "GET"], allow_headers=["*"], ) # ========================================== # 4. AUTONOMOUS TOOLS (REFINED) # ========================================== def ddg_search(query): """Perform a DuckDuckGo search and return top 3 results.""" print(f"--- [TOOL] Searching Web: {query} ---") try: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} res = requests.get( "https://html.duckduckgo.com/html/", params={"q": query, "kl": "us-en"}, headers=headers, timeout=8 ) res.raise_for_status() soup = BeautifulSoup(res.text, "html.parser") results = [] for r in soup.select(".result")[:3]: # Limit to 3 results title_elem = r.select_one('.result__a') snippet_elem = r.select_one('.result__snippet') if title_elem and snippet_elem: title = title_elem.get_text(strip=True) snippet = snippet_elem.get_text(strip=True)[:200] # Truncate results.append(f"• {title}: {snippet}") if results: return f"\n[REAL-TIME SEARCH RESULTS]:\n" + "\n".join(results) + "\n" return "" except Exception as e: print(f"--- [TOOL ERROR] Search failed: {e} ---") return "" def jina_read(url): """Fetch and parse content from a URL using Jina Reader.""" print(f"--- [TOOL] Reading Source: {url} ---") try: # Clean the URL url = url.strip() if not url.startswith(('http://', 'https://')): url = 'https://' + url res = requests.get( f"https://r.jina.ai/{url}", headers={"x-respond-with": "text", "User-Agent": "ReaperAI/2.0"}, timeout=10 ) res.raise_for_status() # Extract first 1200 chars for context content = res.text[:1200].strip() if content: return f"\n[SOURCE CONTENT]:\n{content}\n" return "" except Exception as e: print(f"--- [TOOL ERROR] URL read failed: {e} ---") return "" def fast_intent_detection(message): """Detect user intent from message.""" message_lower = message.lower() # Check for URL url_match = re.search(r"(https?://\S+)", message) if url_match: return "URL", url_match.group(1) # Check for search keywords search_keywords = [ "who is", "what is", "how to", "price of", "latest", "current", "news", "today", "weather", "score", "stock", "update", "2024", "define", "explain" ] if any(keyword in message_lower for keyword in search_keywords): return "SEARCH", message return "CHAT", None # ========================================== # 5. SECURITY MIDDLEWARE & ENDPOINTS # ========================================== @app.middleware("http") async def security_guard(request: Request, call_next): """Security middleware for API key validation.""" # Allow root endpoint without auth if request.url.path == "/": return await call_next(request) # Check for API key in headers if request.headers.get("x-reaperai-key") != SECRET_KEY: return JSONResponse( status_code=403, content={"error": "ACCESS_DENIED", "message": "Invalid or missing API key"} ) return await call_next(request) @app.get("/") async def root(): """Root endpoint for health checks.""" status = "ready" if llm is not None else "loading" return { "status": status, "service": "ReaperAI Secure Core", "version": "2.0", "model_loaded": llm is not None, "endpoints": {"/chat": "POST", "/health": "GET"} } @app.get("/health") async def health_check(): """Health check endpoint for monitoring.""" return { "status": "healthy" if llm is not None else "unhealthy", "model": MODEL_REPO if llm is not None else None, "memory_users": len(chat_memory), "timestamp": datetime.now().isoformat() } @app.post("/chat") async def chat_endpoint(request: Request): """Main chat endpoint.""" try: payload = await request.json() except: return JSONResponse( status_code=400, content={"error": "INVALID_JSON", "response": "Request must be valid JSON"} ) user_id = payload.get("userId", "default") message = payload.get("message", "").strip() current_date = datetime.now().strftime("%A, %B %d, %Y") # Validate input if not message: return JSONResponse( status_code=400, content={"error": "EMPTY_MESSAGE", "response": "Message cannot be empty"} ) # Check if model is loaded if llm is None: return JSONResponse( status_code=503, content={ "error": "MODEL_NOT_LOADED", "response": "AI model is still initializing. Please try again in 30 seconds." } ) # Manage conversation history (thread-safe) with memory_lock: # Clean up old users if needed if len(chat_memory) > MAX_GLOBAL_USERS: oldest_user = next(iter(chat_memory)) del chat_memory[oldest_user] # Get user's history (last 5 exchanges) if user_id not in chat_memory: chat_memory[user_id] = [] history = chat_memory[user_id][-5:] # Determine intent and gather context intent, data = fast_intent_detection(message) context = "" if intent == "URL": context = jina_read(data) elif intent == "SEARCH": context = ddg_search(message) # Build conversation messages messages = [ { "role": "system", "content": f"""You are ReaperAI, a helpful and concise AI assistant. Current Date: {current_date} Instructions: 1. Be direct and informative 2. Use provided context when available 3. Keep responses under 300 words 4. If you don't know, say so """ } ] # Add conversation history for h in history: messages.append({"role": "user", "content": h['u']}) messages.append({"role": "assistant", "content": h['a']}) # Add current query with context if context: final_query = f"Context:\n{context}\n\nUser Query: {message}" else: final_query = message messages.append({"role": "user", "content": final_query}) # Generate response (thread-safe) with model_semaphore: try: response = llm.create_chat_completion( messages=messages, max_tokens=400, # Limit response length temperature=0.7, stop=["###", "User:", "Assistant:"] ) ai_response = response["choices"][0]["message"]["content"].strip() except Exception as e: print(f"--- [INFERENCE ERROR] {str(e)} ---") ai_response = f"I encountered an error processing your request. Please try again." # Update conversation history (thread-safe) with memory_lock: chat_memory[user_id].append({"u": message, "a": ai_response}) # Keep last 8 exchanges per user chat_memory[user_id] = chat_memory[user_id][-8:] # Clean up gc.collect() return { "intent": intent, "response": ai_response, "context_used": bool(context), "user_id": user_id, "timestamp": datetime.now().isoformat() } # ========================================== # 6. MAIN EXECUTION # ========================================== if __name__ == "__main__": import uvicorn print("--- [SYSTEM] Starting ReaperAI Server ---") uvicorn.run( "main:app", host="0.0.0.0", port=7860, reload=False, # Disable reload in production timeout_keep_alive=60, access_log=True )