from flask import Flask, request, jsonify, Response, stream_with_context from flask_cors import CORS from flask_compress import Compress import logging import os import threading import time import uuid import datetime import tempfile import json from collections import OrderedDict, defaultdict # Unified LLM import (router selects Groq if available else local llama) from ai.llm_router import run_llama, init_model, stream_llama from ai.code_analyzer import analyze_code from ai.profile_manager import ( update_profile, load_profile, update_profile_components, update_personal_info, get_personal_context ) from ai.refactor_engine import suggest_refactor from ai.prompt_builder import build_persona_prompt from ai.code_fingerprint import analyze_fingerprint from ai.code_memory import save_to_memory # --------------------------------------------- # Basic setup # --------------------------------------------- LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") logging.basicConfig(level=LOG_LEVEL) PROFILE_PATH = os.environ.get("USER_PROFILE_PATH", "./user_data/user_profile.json") app = Flask(__name__) CORS(app) Compress(app) # Lightweight cache for identical queries ASK_CACHE_MAX = int(os.environ.get("ASK_CACHE_MAX", 50)) ask_cache: "OrderedDict[str, str]" = OrderedDict() # Conversation history storage: {session_id: {"messages": [...], "last_active": timestamp}} conversation_history = defaultdict(lambda: {"messages": [], "last_active": time.time()}) HISTORY_TIMEOUT = int(os.environ.get("HISTORY_TIMEOUT_SECONDS", 3600)) # 1 hour default MAX_HISTORY_LENGTH = int(os.environ.get("MAX_HISTORY_LENGTH", 20)) # Max messages per session def _cache_get(key: str): if key in ask_cache: ask_cache.move_to_end(key) return ask_cache[key] return None def _cache_set(key: str, value: str): ask_cache[key] = value ask_cache.move_to_end(key) if len(ask_cache) > ASK_CACHE_MAX: ask_cache.popitem(last=False) def _cleanup_old_sessions(): """Remove conversation histories older than HISTORY_TIMEOUT.""" current_time = time.time() expired_sessions = [ sid for sid, data in conversation_history.items() if current_time - data["last_active"] > HISTORY_TIMEOUT ] for sid in expired_sessions: del conversation_history[sid] logging.info(f"Cleaned up expired session: {sid}") def _get_or_create_session(session_id: str = None) -> str: """Get existing session or create a new one.""" if not session_id: session_id = str(uuid.uuid4()) # Cleanup old sessions periodically if len(conversation_history) > 100: # Only cleanup when we have many sessions _cleanup_old_sessions() # Update last active time conversation_history[session_id]["last_active"] = time.time() return session_id def _add_to_history(session_id: str, role: str, content: str): """Add a message to conversation history.""" conversation_history[session_id]["messages"].append({ "role": role, "content": content }) # Trim history if too long (keep system message + recent messages) messages = conversation_history[session_id]["messages"] if len(messages) > MAX_HISTORY_LENGTH: # Keep system message if it exists system_msgs = [m for m in messages if m["role"] == "system"] other_msgs = [m for m in messages if m["role"] != "system"] # Keep only recent messages conversation_history[session_id]["messages"] = system_msgs + other_msgs[-(MAX_HISTORY_LENGTH-len(system_msgs)):] def _get_messages(session_id: str, system_prompt: str = None) -> list: """Get full message history for a session.""" messages = conversation_history[session_id]["messages"].copy() # Ensure system message is at the beginning if system_prompt and (not messages or messages[0]["role"] != "system"): messages.insert(0, {"role": "system", "content": system_prompt}) return messages def _warmup_model(): """Optional background warm-up.""" try: llm = init_model() run_llama("Warmup.", max_tokens=4, temperature=0.0, top_p=0.5) logging.info("Model warm-up complete (context=%s).", getattr(llm, "n_ctx", "n/a")) except Exception as e: logging.warning("Model warm-up failed: %s", e) if os.environ.get("DISABLE_WARMUP", "0") != "1": threading.Thread(target=_warmup_model, daemon=True).start() # --------------------------------------------- # Routes # --------------------------------------------- @app.route("/", methods=["GET"]) def home(): return jsonify({"status": "Backend running ✅"}) # ---- /ask ------------------------------------------------------------ @app.route("/ask", methods=["POST"]) def ask(): try: data = request.json or {} query = data.get("query", "").strip() if not query: return jsonify({"error": "query is required"}), 400 # Session management session_id = data.get("session_id") session_id = _get_or_create_session(session_id) meta = data.get("metadata", {}) skill = meta.get("skill", "moderate_learner") emotion = meta.get("emotion", "neutral") # Get personal context from profile personal_context = get_personal_context() personal_section = f"\n\n{personal_context}" if personal_context else "" SYSTEM_PROMPT = ( "You are CodeMate — an empathetic coding tutor. " "Explain or answer the question clearly and helpfully. " f"User context: skill={skill}, emotion={emotion}" f"{personal_section}" ) # Add user message to history _add_to_history(session_id, "user", query) # Get full conversation history messages = _get_messages(session_id, SYSTEM_PROMPT) # Check cache only for first message in session (no history) if len(messages) <= 2: # system + user only cache_key = f"{skill}|{emotion}|{query}"[:4096] cached = _cache_get(cache_key) if cached: _add_to_history(session_id, "assistant", cached) return jsonify({"reply": cached, "cached": True, "session_id": session_id}) # Get AI response with full conversation context reply = run_llama(messages=messages, max_tokens=400, temperature=0.1, top_p=0.9) # Add assistant response to history _add_to_history(session_id, "assistant", reply) # Cache only if first message if len(messages) <= 2: _cache_set(cache_key, reply) return jsonify({"reply": reply, "session_id": session_id}) except Exception as e: logging.exception("/ask failed") return jsonify({"error": str(e)}), 500 # ---- /ask_stream ----------------------------------------------------- @app.route("/ask_stream", methods=["POST"]) def ask_stream(): """Stream GPT responses (Server-Sent Events) with conversation history.""" try: data = request.json or {} query = data.get("query", "").strip() if not query: return jsonify({"error": "query is required"}), 400 # Session management session_id = data.get("session_id") session_id = _get_or_create_session(session_id) meta = data.get("metadata", {}) skill = meta.get("skill", "moderate_learner") emotion = meta.get("emotion", "neutral") # Get personal context from profile personal_context = get_personal_context() personal_section = f"\n\n{personal_context}" if personal_context else "" SYSTEM_PROMPT = ( "You are CodeMate — an empathetic coding tutor. " "Explain or answer the question clearly and helpfully. " f"User context: skill={skill}, emotion={emotion}" f"{personal_section}" ) # Add user message to history _add_to_history(session_id, "user", query) # Get full conversation history messages = _get_messages(session_id, SYSTEM_PROMPT) @stream_with_context def event_stream(): collected_response = [] try: # Send session_id first yield f"event: session\ndata: {session_id}\n\n" for chunk in stream_llama(messages=messages, max_tokens=400, temperature=0.1, top_p=0.9): if chunk: collected_response.append(chunk) yield f"data: {chunk}\n\n" # Add complete response to history full_response = "".join(collected_response) if full_response: _add_to_history(session_id, "assistant", full_response) yield "event: done\ndata: [DONE]\n\n" except Exception as ex: logging.exception("/ask_stream failed during streaming") yield f"event: error\ndata: {str(ex)}\n\n" return Response(event_stream(), mimetype="text/event-stream") except Exception as e: logging.exception("/ask_stream failed") return jsonify({"error": str(e)}), 500 @app.route("/code_assist", methods=["POST"]) def code_assist(): try: data = request.json or {} query = data.get("query", "").strip() if not query: return jsonify({"error": "query is required"}), 400 # Session management session_id = data.get("session_id") session_id = _get_or_create_session(session_id) metadata = data.get("metadata", {}) emotion = metadata.get("emotion", "neutral") skill = metadata.get("skill", "moderate_learner") # Load existing profile data profile = load_profile() style = profile.get("coding_style", {}) fingerprint = profile.get("fingerprint", {}) # Get personal context from profile personal_context = get_personal_context() personal_section = f"\n\n{personal_context}" if personal_context else "" # Create adaptive persona prompt (this creates a complete prompt string) prompt_content = build_persona_prompt(query, emotion, skill, style, fingerprint) # Add user message to history _add_to_history(session_id, "user", query) # Build messages array with system context and history system_prompt = ( "You are CodeMate — an empathetic coding assistant. " f"User profile: skill={skill}, emotion={emotion}. " "Provide helpful code examples and explanations." f"{personal_section}" ) messages = _get_messages(session_id, system_prompt) # If this is the first message, replace the user message with the full persona prompt if len(messages) == 2: # system + user only messages[-1]["content"] = prompt_content # Get AI output with conversation context reply = run_llama(messages=messages, max_tokens=400, temperature=0.2) # Add assistant response to history _add_to_history(session_id, "assistant", reply) # Extract any code returned by AI import re code_blocks = re.findall(r"```(?:python)?\n([\s\S]*?)```", reply) if code_blocks: combined = "\n".join(code_blocks) # Analyze AI-generated code (self-learning) style_update = analyze_code(combined) fingerprint_update = analyze_fingerprint(combined) # Update persistent user profile update_profile_components(coding_style=style_update, fingerprint=fingerprint_update) # Store in memory unless disabled via env if os.environ.get("DISABLE_CODE_MEMORY", "0") != "1": save_to_memory(query, combined, emotion, skill) return jsonify({"reply": reply, "session_id": session_id}) except Exception as e: logging.exception("/code_assist failed") return jsonify({"error": str(e)}), 500 # ---- /analyze -------------------------------------------------------- @app.route("/analyze", methods=["POST"]) def analyze(): try: req = request.json or {} code = req.get("code", "") if not code: return jsonify({"error": "code is required"}), 400 style = analyze_code(code) update_profile(style) return jsonify({"message": "Profile updated", "style": style}) except Exception as e: logging.exception("/analyze failed") return jsonify({"error": str(e)}), 500 # ---- /refactor ------------------------------------------------------- @app.route("/refactor", methods=["POST"]) def refactor(): try: req = request.json or {} code = req.get("code", "") if not code: return jsonify({"error": "code is required"}), 400 result = suggest_refactor(code) return jsonify({"refactored": result}) except Exception as e: logging.exception("/refactor failed") return jsonify({"error": str(e)}), 500 # ---- /profile -------------------------------------------------------- @app.route("/profile", methods=["GET"]) def profile(): try: return jsonify(load_profile()) except Exception as e: logging.exception("/profile failed") return jsonify({"error": str(e)}), 500 # ---- /train -------------------------------------------------- @app.route("/train", methods=["POST"]) def train(): """Train both coding style and fingerprint from provided raw code string or files. Input JSON: code: string (optional) - single code sample to learn from files: array of objects (optional) - multiple code files Each file object: { "filename": "file.py", "content": "code..." } At least one of 'code' or 'files' must be provided. """ try: req = request.json or {} code = req.get("code", "").strip() files = req.get("files", []) if not code and not files: return jsonify({"error": "Either 'code' or 'files' is required"}), 400 # Collect all code samples code_samples = [] processed_files = [] # Add direct code if provided if code: code_samples.append(code) processed_files.append({"filename": "direct_input", "size": len(code)}) # Process multiple files if provided if files: if not isinstance(files, list): return jsonify({"error": "'files' must be an array"}), 400 for idx, file_obj in enumerate(files): if not isinstance(file_obj, dict): return jsonify({"error": f"File at index {idx} must be an object with 'filename' and 'content'"}), 400 filename = file_obj.get("filename", f"file_{idx}") content = file_obj.get("content", "").strip() if not content: continue # Skip empty files code_samples.append(content) processed_files.append({"filename": filename, "size": len(content)}) if not code_samples: return jsonify({"error": "No valid code content found"}), 400 # Combine all code samples for analysis combined_code = "\n\n# --- File Separator ---\n\n".join(code_samples) # Analyze the combined code style_update = analyze_code(combined_code) fingerprint_update = analyze_fingerprint(combined_code) # Update profile update_profile_components(coding_style=style_update, fingerprint=fingerprint_update) # Calculate total size total_size = sum(f["size"] for f in processed_files) return jsonify({ "message": "Profile trained successfully", "files_processed": len(processed_files), "total_characters": total_size, "files": processed_files, "coding_style": style_update, "fingerprint": fingerprint_update }) except Exception as e: logging.exception("/train failed") return jsonify({"error": str(e)}), 500 # ---- /train_overall ---------------------------------------------- @app.route("/train_overall", methods=["POST"]) def train_overall(): """Store user's personal information for personalized AI responses. This endpoint stores any personal details, preferences, or background information about the user. The AI will use this context in all future conversations. Input JSON: personal_data: string (required) - text containing personal information Examples: "My name is John. I'm 20 years old. I'm studying CS at MIT." "I prefer Python over JavaScript. I work at Google." "My hobbies include gaming and reading sci-fi novels." Returns: JSON with success message and updated personal info summary """ try: req = request.json or {} personal_data = req.get("personal_data", "").strip() if not personal_data: return jsonify({"error": "personal_data is required"}), 400 # Limit size to prevent abuse (max 50KB per entry) if len(personal_data) > 50000: return jsonify({"error": "personal_data too large (max 50KB)"}), 400 # Update profile with personal information profile = update_personal_info(personal_data) # Get summary of stored data personal_info = profile.get("personal_info", {}) entry_count = len(personal_info.get("entries", [])) total_chars = len(personal_info.get("raw_data", "")) return jsonify({ "message": "Personal information saved successfully", "entry_count": entry_count, "total_characters": total_chars, "preview": personal_data[:200] + ("..." if len(personal_data) > 200 else "") }) except Exception as e: logging.exception("/train_overall failed") return jsonify({"error": str(e)}), 500 # ---- /get_personal_info ------------------------------------------ @app.route("/get_personal_info", methods=["GET"]) def get_personal_info(): """Retrieve all stored personal information. Returns: JSON with all personal info entries and summary """ try: profile = load_profile() personal_info = profile.get("personal_info", {}) return jsonify({ "entries": personal_info.get("entries", []), "raw_data": personal_info.get("raw_data", ""), "entry_count": len(personal_info.get("entries", [])), "total_characters": len(personal_info.get("raw_data", "")) }) except Exception as e: logging.exception("/get_personal_info failed") return jsonify({"error": str(e)}), 500 # ---- /clear_personal_info ---------------------------------------- @app.route("/clear_personal_info", methods=["POST"]) def clear_personal_info(): """Clear all stored personal information. Returns: JSON with success message """ try: profile = load_profile() if "personal_info" in profile: del profile["personal_info"] profile["last_updated"] = datetime.datetime.now().isoformat() # Save updated profile os.makedirs(os.path.dirname(PROFILE_PATH) or ".", exist_ok=True) import tempfile fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(PROFILE_PATH) or ".") try: with os.fdopen(fd, "w", encoding="utf-8") as tmp: json.dump(profile, tmp, indent=2, ensure_ascii=False) os.replace(tmp_path, PROFILE_PATH) except Exception: try: os.remove(tmp_path) except Exception: pass raise return jsonify({"message": "Personal information cleared successfully"}) except Exception as e: logging.exception("/clear_personal_info failed") return jsonify({"error": str(e)}), 500 # ---- /get_history ------------------------------------------------ @app.route("/get_history", methods=["POST"]) def get_history(): """Get conversation history for a session. Input JSON: session_id: string (required) - session ID to retrieve history for """ try: data = request.json or {} session_id = data.get("session_id") if not session_id: return jsonify({"error": "session_id is required"}), 400 if session_id not in conversation_history: return jsonify({"error": "Session not found"}), 404 messages = conversation_history[session_id]["messages"] last_active = conversation_history[session_id]["last_active"] return jsonify({ "session_id": session_id, "messages": messages, "last_active": last_active, "message_count": len(messages) }) except Exception as e: logging.exception("/get_history failed") return jsonify({"error": str(e)}), 500 # ---- /clear_history ---------------------------------------------- @app.route("/clear_history", methods=["POST"]) def clear_history(): """Clear conversation history for a session or all sessions. Input JSON: session_id: string (optional) - specific session to clear, or omit to clear all """ try: data = request.json or {} session_id = data.get("session_id") if session_id: if session_id in conversation_history: del conversation_history[session_id] return jsonify({"message": f"History cleared for session {session_id}"}) else: return jsonify({"error": "Session not found"}), 404 else: # Clear all sessions count = len(conversation_history) conversation_history.clear() return jsonify({"message": f"Cleared history for {count} sessions"}) except Exception as e: logging.exception("/clear_history failed") return jsonify({"error": str(e)}), 500 # ---- /list_sessions ---------------------------------------------- @app.route("/list_sessions", methods=["GET"]) def list_sessions(): """List all active sessions with their message counts.""" try: sessions = [] for sid, data in conversation_history.items(): sessions.append({ "session_id": sid, "message_count": len(data["messages"]), "last_active": data["last_active"] }) # Sort by last active (most recent first) sessions.sort(key=lambda x: x["last_active"], reverse=True) return jsonify({ "sessions": sessions, "total_count": len(sessions) }) except Exception as e: logging.exception("/list_sessions failed") return jsonify({"error": str(e)}), 500 # --------------------------------------------- # Entry point # --------------------------------------------- if __name__ == "__main__": host = os.environ.get("HOST", "0.0.0.0") port = int(os.environ.get("PORT", 7860)) app.run(host=host, port=port)