Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify, Response, stream_with_context | |
| from flask_cors import CORS | |
| from flask_compress import Compress | |
| import logging | |
| import os | |
| import threading | |
| import time | |
| import uuid | |
| import datetime | |
| import tempfile | |
| import json | |
| from collections import OrderedDict, defaultdict | |
| # Unified LLM import (router selects Groq if available else local llama) | |
| from ai.llm_router import run_llama, init_model, stream_llama | |
| from ai.code_analyzer import analyze_code | |
| from ai.profile_manager import ( | |
| update_profile, | |
| load_profile, | |
| update_profile_components, | |
| update_personal_info, | |
| get_personal_context | |
| ) | |
| from ai.refactor_engine import suggest_refactor | |
| from ai.prompt_builder import build_persona_prompt | |
| from ai.code_fingerprint import analyze_fingerprint | |
| from ai.code_memory import save_to_memory | |
| # --------------------------------------------- | |
| # Basic setup | |
| # --------------------------------------------- | |
| LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") | |
| logging.basicConfig(level=LOG_LEVEL) | |
| PROFILE_PATH = os.environ.get("USER_PROFILE_PATH", "./user_data/user_profile.json") | |
| app = Flask(__name__) | |
| CORS(app) | |
| Compress(app) | |
| # Lightweight cache for identical queries | |
| ASK_CACHE_MAX = int(os.environ.get("ASK_CACHE_MAX", 50)) | |
| ask_cache: "OrderedDict[str, str]" = OrderedDict() | |
| # Conversation history storage: {session_id: {"messages": [...], "last_active": timestamp}} | |
| conversation_history = defaultdict(lambda: {"messages": [], "last_active": time.time()}) | |
| HISTORY_TIMEOUT = int(os.environ.get("HISTORY_TIMEOUT_SECONDS", 3600)) # 1 hour default | |
| MAX_HISTORY_LENGTH = int(os.environ.get("MAX_HISTORY_LENGTH", 20)) # Max messages per session | |
| def _cache_get(key: str): | |
| if key in ask_cache: | |
| ask_cache.move_to_end(key) | |
| return ask_cache[key] | |
| return None | |
| def _cache_set(key: str, value: str): | |
| ask_cache[key] = value | |
| ask_cache.move_to_end(key) | |
| if len(ask_cache) > ASK_CACHE_MAX: | |
| ask_cache.popitem(last=False) | |
| def _cleanup_old_sessions(): | |
| """Remove conversation histories older than HISTORY_TIMEOUT.""" | |
| current_time = time.time() | |
| expired_sessions = [ | |
| sid for sid, data in conversation_history.items() | |
| if current_time - data["last_active"] > HISTORY_TIMEOUT | |
| ] | |
| for sid in expired_sessions: | |
| del conversation_history[sid] | |
| logging.info(f"Cleaned up expired session: {sid}") | |
| def _get_or_create_session(session_id: str = None) -> str: | |
| """Get existing session or create a new one.""" | |
| if not session_id: | |
| session_id = str(uuid.uuid4()) | |
| # Cleanup old sessions periodically | |
| if len(conversation_history) > 100: # Only cleanup when we have many sessions | |
| _cleanup_old_sessions() | |
| # Update last active time | |
| conversation_history[session_id]["last_active"] = time.time() | |
| return session_id | |
| def _add_to_history(session_id: str, role: str, content: str): | |
| """Add a message to conversation history.""" | |
| conversation_history[session_id]["messages"].append({ | |
| "role": role, | |
| "content": content | |
| }) | |
| # Trim history if too long (keep system message + recent messages) | |
| messages = conversation_history[session_id]["messages"] | |
| if len(messages) > MAX_HISTORY_LENGTH: | |
| # Keep system message if it exists | |
| system_msgs = [m for m in messages if m["role"] == "system"] | |
| other_msgs = [m for m in messages if m["role"] != "system"] | |
| # Keep only recent messages | |
| conversation_history[session_id]["messages"] = system_msgs + other_msgs[-(MAX_HISTORY_LENGTH-len(system_msgs)):] | |
| def _get_messages(session_id: str, system_prompt: str = None) -> list: | |
| """Get full message history for a session.""" | |
| messages = conversation_history[session_id]["messages"].copy() | |
| # Ensure system message is at the beginning | |
| if system_prompt and (not messages or messages[0]["role"] != "system"): | |
| messages.insert(0, {"role": "system", "content": system_prompt}) | |
| return messages | |
| def _warmup_model(): | |
| """Optional background warm-up.""" | |
| try: | |
| llm = init_model() | |
| run_llama("Warmup.", max_tokens=4, temperature=0.0, top_p=0.5) | |
| logging.info("Model warm-up complete (context=%s).", getattr(llm, "n_ctx", "n/a")) | |
| except Exception as e: | |
| logging.warning("Model warm-up failed: %s", e) | |
| if os.environ.get("DISABLE_WARMUP", "0") != "1": | |
| threading.Thread(target=_warmup_model, daemon=True).start() | |
| # --------------------------------------------- | |
| # Routes | |
| # --------------------------------------------- | |
| def home(): | |
| return jsonify({"status": "Backend running ✅"}) | |
| # ---- /ask ------------------------------------------------------------ | |
| def ask(): | |
| try: | |
| data = request.json or {} | |
| query = data.get("query", "").strip() | |
| if not query: | |
| return jsonify({"error": "query is required"}), 400 | |
| # Session management | |
| session_id = data.get("session_id") | |
| session_id = _get_or_create_session(session_id) | |
| meta = data.get("metadata", {}) | |
| skill = meta.get("skill", "moderate_learner") | |
| emotion = meta.get("emotion", "neutral") | |
| # Get personal context from profile | |
| personal_context = get_personal_context() | |
| personal_section = f"\n\n{personal_context}" if personal_context else "" | |
| SYSTEM_PROMPT = ( | |
| "You are CodeMate — an empathetic coding tutor. " | |
| "Explain or answer the question clearly and helpfully. " | |
| f"User context: skill={skill}, emotion={emotion}" | |
| f"{personal_section}" | |
| ) | |
| # Add user message to history | |
| _add_to_history(session_id, "user", query) | |
| # Get full conversation history | |
| messages = _get_messages(session_id, SYSTEM_PROMPT) | |
| # Check cache only for first message in session (no history) | |
| if len(messages) <= 2: # system + user only | |
| cache_key = f"{skill}|{emotion}|{query}"[:4096] | |
| cached = _cache_get(cache_key) | |
| if cached: | |
| _add_to_history(session_id, "assistant", cached) | |
| return jsonify({"reply": cached, "cached": True, "session_id": session_id}) | |
| # Get AI response with full conversation context | |
| reply = run_llama(messages=messages, max_tokens=400, temperature=0.1, top_p=0.9) | |
| # Add assistant response to history | |
| _add_to_history(session_id, "assistant", reply) | |
| # Cache only if first message | |
| if len(messages) <= 2: | |
| _cache_set(cache_key, reply) | |
| return jsonify({"reply": reply, "session_id": session_id}) | |
| except Exception as e: | |
| logging.exception("/ask failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /ask_stream ----------------------------------------------------- | |
| def ask_stream(): | |
| """Stream GPT responses (Server-Sent Events) with conversation history.""" | |
| try: | |
| data = request.json or {} | |
| query = data.get("query", "").strip() | |
| if not query: | |
| return jsonify({"error": "query is required"}), 400 | |
| # Session management | |
| session_id = data.get("session_id") | |
| session_id = _get_or_create_session(session_id) | |
| meta = data.get("metadata", {}) | |
| skill = meta.get("skill", "moderate_learner") | |
| emotion = meta.get("emotion", "neutral") | |
| # Get personal context from profile | |
| personal_context = get_personal_context() | |
| personal_section = f"\n\n{personal_context}" if personal_context else "" | |
| SYSTEM_PROMPT = ( | |
| "You are CodeMate — an empathetic coding tutor. " | |
| "Explain or answer the question clearly and helpfully. " | |
| f"User context: skill={skill}, emotion={emotion}" | |
| f"{personal_section}" | |
| ) | |
| # Add user message to history | |
| _add_to_history(session_id, "user", query) | |
| # Get full conversation history | |
| messages = _get_messages(session_id, SYSTEM_PROMPT) | |
| def event_stream(): | |
| collected_response = [] | |
| try: | |
| # Send session_id first | |
| yield f"event: session\ndata: {session_id}\n\n" | |
| for chunk in stream_llama(messages=messages, max_tokens=400, temperature=0.1, top_p=0.9): | |
| if chunk: | |
| collected_response.append(chunk) | |
| yield f"data: {chunk}\n\n" | |
| # Add complete response to history | |
| full_response = "".join(collected_response) | |
| if full_response: | |
| _add_to_history(session_id, "assistant", full_response) | |
| yield "event: done\ndata: [DONE]\n\n" | |
| except Exception as ex: | |
| logging.exception("/ask_stream failed during streaming") | |
| yield f"event: error\ndata: {str(ex)}\n\n" | |
| return Response(event_stream(), mimetype="text/event-stream") | |
| except Exception as e: | |
| logging.exception("/ask_stream failed") | |
| return jsonify({"error": str(e)}), 500 | |
| def code_assist(): | |
| try: | |
| data = request.json or {} | |
| query = data.get("query", "").strip() | |
| if not query: | |
| return jsonify({"error": "query is required"}), 400 | |
| # Session management | |
| session_id = data.get("session_id") | |
| session_id = _get_or_create_session(session_id) | |
| metadata = data.get("metadata", {}) | |
| emotion = metadata.get("emotion", "neutral") | |
| skill = metadata.get("skill", "moderate_learner") | |
| # Load existing profile data | |
| profile = load_profile() | |
| style = profile.get("coding_style", {}) | |
| fingerprint = profile.get("fingerprint", {}) | |
| # Get personal context from profile | |
| personal_context = get_personal_context() | |
| personal_section = f"\n\n{personal_context}" if personal_context else "" | |
| # Create adaptive persona prompt (this creates a complete prompt string) | |
| prompt_content = build_persona_prompt(query, emotion, skill, style, fingerprint) | |
| # Add user message to history | |
| _add_to_history(session_id, "user", query) | |
| # Build messages array with system context and history | |
| system_prompt = ( | |
| "You are CodeMate — an empathetic coding assistant. " | |
| f"User profile: skill={skill}, emotion={emotion}. " | |
| "Provide helpful code examples and explanations." | |
| f"{personal_section}" | |
| ) | |
| messages = _get_messages(session_id, system_prompt) | |
| # If this is the first message, replace the user message with the full persona prompt | |
| if len(messages) == 2: # system + user only | |
| messages[-1]["content"] = prompt_content | |
| # Get AI output with conversation context | |
| reply = run_llama(messages=messages, max_tokens=400, temperature=0.2) | |
| # Add assistant response to history | |
| _add_to_history(session_id, "assistant", reply) | |
| # Extract any code returned by AI | |
| import re | |
| code_blocks = re.findall(r"```(?:python)?\n([\s\S]*?)```", reply) | |
| if code_blocks: | |
| combined = "\n".join(code_blocks) | |
| # Analyze AI-generated code (self-learning) | |
| style_update = analyze_code(combined) | |
| fingerprint_update = analyze_fingerprint(combined) | |
| # Update persistent user profile | |
| update_profile_components(coding_style=style_update, fingerprint=fingerprint_update) | |
| # Store in memory unless disabled via env | |
| if os.environ.get("DISABLE_CODE_MEMORY", "0") != "1": | |
| save_to_memory(query, combined, emotion, skill) | |
| return jsonify({"reply": reply, "session_id": session_id}) | |
| except Exception as e: | |
| logging.exception("/code_assist failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /analyze -------------------------------------------------------- | |
| def analyze(): | |
| try: | |
| req = request.json or {} | |
| code = req.get("code", "") | |
| if not code: | |
| return jsonify({"error": "code is required"}), 400 | |
| style = analyze_code(code) | |
| update_profile(style) | |
| return jsonify({"message": "Profile updated", "style": style}) | |
| except Exception as e: | |
| logging.exception("/analyze failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /refactor ------------------------------------------------------- | |
| def refactor(): | |
| try: | |
| req = request.json or {} | |
| code = req.get("code", "") | |
| if not code: | |
| return jsonify({"error": "code is required"}), 400 | |
| result = suggest_refactor(code) | |
| return jsonify({"refactored": result}) | |
| except Exception as e: | |
| logging.exception("/refactor failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /profile -------------------------------------------------------- | |
| def profile(): | |
| try: | |
| return jsonify(load_profile()) | |
| except Exception as e: | |
| logging.exception("/profile failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /train -------------------------------------------------- | |
| def train(): | |
| """Train both coding style and fingerprint from provided raw code string or files. | |
| Input JSON: | |
| code: string (optional) - single code sample to learn from | |
| files: array of objects (optional) - multiple code files | |
| Each file object: { "filename": "file.py", "content": "code..." } | |
| At least one of 'code' or 'files' must be provided. | |
| """ | |
| try: | |
| req = request.json or {} | |
| code = req.get("code", "").strip() | |
| files = req.get("files", []) | |
| if not code and not files: | |
| return jsonify({"error": "Either 'code' or 'files' is required"}), 400 | |
| # Collect all code samples | |
| code_samples = [] | |
| processed_files = [] | |
| # Add direct code if provided | |
| if code: | |
| code_samples.append(code) | |
| processed_files.append({"filename": "direct_input", "size": len(code)}) | |
| # Process multiple files if provided | |
| if files: | |
| if not isinstance(files, list): | |
| return jsonify({"error": "'files' must be an array"}), 400 | |
| for idx, file_obj in enumerate(files): | |
| if not isinstance(file_obj, dict): | |
| return jsonify({"error": f"File at index {idx} must be an object with 'filename' and 'content'"}), 400 | |
| filename = file_obj.get("filename", f"file_{idx}") | |
| content = file_obj.get("content", "").strip() | |
| if not content: | |
| continue # Skip empty files | |
| code_samples.append(content) | |
| processed_files.append({"filename": filename, "size": len(content)}) | |
| if not code_samples: | |
| return jsonify({"error": "No valid code content found"}), 400 | |
| # Combine all code samples for analysis | |
| combined_code = "\n\n# --- File Separator ---\n\n".join(code_samples) | |
| # Analyze the combined code | |
| style_update = analyze_code(combined_code) | |
| fingerprint_update = analyze_fingerprint(combined_code) | |
| # Update profile | |
| update_profile_components(coding_style=style_update, fingerprint=fingerprint_update) | |
| # Calculate total size | |
| total_size = sum(f["size"] for f in processed_files) | |
| return jsonify({ | |
| "message": "Profile trained successfully", | |
| "files_processed": len(processed_files), | |
| "total_characters": total_size, | |
| "files": processed_files, | |
| "coding_style": style_update, | |
| "fingerprint": fingerprint_update | |
| }) | |
| except Exception as e: | |
| logging.exception("/train failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /train_overall ---------------------------------------------- | |
| def train_overall(): | |
| """Store user's personal information for personalized AI responses. | |
| This endpoint stores any personal details, preferences, or background information | |
| about the user. The AI will use this context in all future conversations. | |
| Input JSON: | |
| personal_data: string (required) - text containing personal information | |
| Examples: "My name is John. I'm 20 years old. I'm studying CS at MIT." | |
| "I prefer Python over JavaScript. I work at Google." | |
| "My hobbies include gaming and reading sci-fi novels." | |
| Returns: | |
| JSON with success message and updated personal info summary | |
| """ | |
| try: | |
| req = request.json or {} | |
| personal_data = req.get("personal_data", "").strip() | |
| if not personal_data: | |
| return jsonify({"error": "personal_data is required"}), 400 | |
| # Limit size to prevent abuse (max 50KB per entry) | |
| if len(personal_data) > 50000: | |
| return jsonify({"error": "personal_data too large (max 50KB)"}), 400 | |
| # Update profile with personal information | |
| profile = update_personal_info(personal_data) | |
| # Get summary of stored data | |
| personal_info = profile.get("personal_info", {}) | |
| entry_count = len(personal_info.get("entries", [])) | |
| total_chars = len(personal_info.get("raw_data", "")) | |
| return jsonify({ | |
| "message": "Personal information saved successfully", | |
| "entry_count": entry_count, | |
| "total_characters": total_chars, | |
| "preview": personal_data[:200] + ("..." if len(personal_data) > 200 else "") | |
| }) | |
| except Exception as e: | |
| logging.exception("/train_overall failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /get_personal_info ------------------------------------------ | |
| def get_personal_info(): | |
| """Retrieve all stored personal information. | |
| Returns: | |
| JSON with all personal info entries and summary | |
| """ | |
| try: | |
| profile = load_profile() | |
| personal_info = profile.get("personal_info", {}) | |
| return jsonify({ | |
| "entries": personal_info.get("entries", []), | |
| "raw_data": personal_info.get("raw_data", ""), | |
| "entry_count": len(personal_info.get("entries", [])), | |
| "total_characters": len(personal_info.get("raw_data", "")) | |
| }) | |
| except Exception as e: | |
| logging.exception("/get_personal_info failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /clear_personal_info ---------------------------------------- | |
| def clear_personal_info(): | |
| """Clear all stored personal information. | |
| Returns: | |
| JSON with success message | |
| """ | |
| try: | |
| profile = load_profile() | |
| if "personal_info" in profile: | |
| del profile["personal_info"] | |
| profile["last_updated"] = datetime.datetime.now().isoformat() | |
| # Save updated profile | |
| os.makedirs(os.path.dirname(PROFILE_PATH) or ".", exist_ok=True) | |
| import tempfile | |
| fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(PROFILE_PATH) or ".") | |
| try: | |
| with os.fdopen(fd, "w", encoding="utf-8") as tmp: | |
| json.dump(profile, tmp, indent=2, ensure_ascii=False) | |
| os.replace(tmp_path, PROFILE_PATH) | |
| except Exception: | |
| try: | |
| os.remove(tmp_path) | |
| except Exception: | |
| pass | |
| raise | |
| return jsonify({"message": "Personal information cleared successfully"}) | |
| except Exception as e: | |
| logging.exception("/clear_personal_info failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /get_history ------------------------------------------------ | |
| def get_history(): | |
| """Get conversation history for a session. | |
| Input JSON: | |
| session_id: string (required) - session ID to retrieve history for | |
| """ | |
| try: | |
| data = request.json or {} | |
| session_id = data.get("session_id") | |
| if not session_id: | |
| return jsonify({"error": "session_id is required"}), 400 | |
| if session_id not in conversation_history: | |
| return jsonify({"error": "Session not found"}), 404 | |
| messages = conversation_history[session_id]["messages"] | |
| last_active = conversation_history[session_id]["last_active"] | |
| return jsonify({ | |
| "session_id": session_id, | |
| "messages": messages, | |
| "last_active": last_active, | |
| "message_count": len(messages) | |
| }) | |
| except Exception as e: | |
| logging.exception("/get_history failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /clear_history ---------------------------------------------- | |
| def clear_history(): | |
| """Clear conversation history for a session or all sessions. | |
| Input JSON: | |
| session_id: string (optional) - specific session to clear, or omit to clear all | |
| """ | |
| try: | |
| data = request.json or {} | |
| session_id = data.get("session_id") | |
| if session_id: | |
| if session_id in conversation_history: | |
| del conversation_history[session_id] | |
| return jsonify({"message": f"History cleared for session {session_id}"}) | |
| else: | |
| return jsonify({"error": "Session not found"}), 404 | |
| else: | |
| # Clear all sessions | |
| count = len(conversation_history) | |
| conversation_history.clear() | |
| return jsonify({"message": f"Cleared history for {count} sessions"}) | |
| except Exception as e: | |
| logging.exception("/clear_history failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # ---- /list_sessions ---------------------------------------------- | |
| def list_sessions(): | |
| """List all active sessions with their message counts.""" | |
| try: | |
| sessions = [] | |
| for sid, data in conversation_history.items(): | |
| sessions.append({ | |
| "session_id": sid, | |
| "message_count": len(data["messages"]), | |
| "last_active": data["last_active"] | |
| }) | |
| # Sort by last active (most recent first) | |
| sessions.sort(key=lambda x: x["last_active"], reverse=True) | |
| return jsonify({ | |
| "sessions": sessions, | |
| "total_count": len(sessions) | |
| }) | |
| except Exception as e: | |
| logging.exception("/list_sessions failed") | |
| return jsonify({"error": str(e)}), 500 | |
| # --------------------------------------------- | |
| # Entry point | |
| # --------------------------------------------- | |
| if __name__ == "__main__": | |
| host = os.environ.get("HOST", "0.0.0.0") | |
| port = int(os.environ.get("PORT", 7860)) | |
| app.run(host=host, port=port) | |