Human-AII / app.py
swayamshetkar
file acceptence by route train
30544e0
from flask import Flask, request, jsonify, Response, stream_with_context
from flask_cors import CORS
from flask_compress import Compress
import logging
import os
import threading
import time
import uuid
import datetime
import tempfile
import json
from collections import OrderedDict, defaultdict
# Unified LLM import (router selects Groq if available else local llama)
from ai.llm_router import run_llama, init_model, stream_llama
from ai.code_analyzer import analyze_code
from ai.profile_manager import (
update_profile,
load_profile,
update_profile_components,
update_personal_info,
get_personal_context
)
from ai.refactor_engine import suggest_refactor
from ai.prompt_builder import build_persona_prompt
from ai.code_fingerprint import analyze_fingerprint
from ai.code_memory import save_to_memory
# ---------------------------------------------
# Basic setup
# ---------------------------------------------
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
logging.basicConfig(level=LOG_LEVEL)
PROFILE_PATH = os.environ.get("USER_PROFILE_PATH", "./user_data/user_profile.json")
app = Flask(__name__)
CORS(app)
Compress(app)
# Lightweight cache for identical queries
ASK_CACHE_MAX = int(os.environ.get("ASK_CACHE_MAX", 50))
ask_cache: "OrderedDict[str, str]" = OrderedDict()
# Conversation history storage: {session_id: {"messages": [...], "last_active": timestamp}}
conversation_history = defaultdict(lambda: {"messages": [], "last_active": time.time()})
HISTORY_TIMEOUT = int(os.environ.get("HISTORY_TIMEOUT_SECONDS", 3600)) # 1 hour default
MAX_HISTORY_LENGTH = int(os.environ.get("MAX_HISTORY_LENGTH", 20)) # Max messages per session
def _cache_get(key: str):
if key in ask_cache:
ask_cache.move_to_end(key)
return ask_cache[key]
return None
def _cache_set(key: str, value: str):
ask_cache[key] = value
ask_cache.move_to_end(key)
if len(ask_cache) > ASK_CACHE_MAX:
ask_cache.popitem(last=False)
def _cleanup_old_sessions():
"""Remove conversation histories older than HISTORY_TIMEOUT."""
current_time = time.time()
expired_sessions = [
sid for sid, data in conversation_history.items()
if current_time - data["last_active"] > HISTORY_TIMEOUT
]
for sid in expired_sessions:
del conversation_history[sid]
logging.info(f"Cleaned up expired session: {sid}")
def _get_or_create_session(session_id: str = None) -> str:
"""Get existing session or create a new one."""
if not session_id:
session_id = str(uuid.uuid4())
# Cleanup old sessions periodically
if len(conversation_history) > 100: # Only cleanup when we have many sessions
_cleanup_old_sessions()
# Update last active time
conversation_history[session_id]["last_active"] = time.time()
return session_id
def _add_to_history(session_id: str, role: str, content: str):
"""Add a message to conversation history."""
conversation_history[session_id]["messages"].append({
"role": role,
"content": content
})
# Trim history if too long (keep system message + recent messages)
messages = conversation_history[session_id]["messages"]
if len(messages) > MAX_HISTORY_LENGTH:
# Keep system message if it exists
system_msgs = [m for m in messages if m["role"] == "system"]
other_msgs = [m for m in messages if m["role"] != "system"]
# Keep only recent messages
conversation_history[session_id]["messages"] = system_msgs + other_msgs[-(MAX_HISTORY_LENGTH-len(system_msgs)):]
def _get_messages(session_id: str, system_prompt: str = None) -> list:
"""Get full message history for a session."""
messages = conversation_history[session_id]["messages"].copy()
# Ensure system message is at the beginning
if system_prompt and (not messages or messages[0]["role"] != "system"):
messages.insert(0, {"role": "system", "content": system_prompt})
return messages
def _warmup_model():
"""Optional background warm-up."""
try:
llm = init_model()
run_llama("Warmup.", max_tokens=4, temperature=0.0, top_p=0.5)
logging.info("Model warm-up complete (context=%s).", getattr(llm, "n_ctx", "n/a"))
except Exception as e:
logging.warning("Model warm-up failed: %s", e)
if os.environ.get("DISABLE_WARMUP", "0") != "1":
threading.Thread(target=_warmup_model, daemon=True).start()
# ---------------------------------------------
# Routes
# ---------------------------------------------
@app.route("/", methods=["GET"])
def home():
return jsonify({"status": "Backend running ✅"})
# ---- /ask ------------------------------------------------------------
@app.route("/ask", methods=["POST"])
def ask():
try:
data = request.json or {}
query = data.get("query", "").strip()
if not query:
return jsonify({"error": "query is required"}), 400
# Session management
session_id = data.get("session_id")
session_id = _get_or_create_session(session_id)
meta = data.get("metadata", {})
skill = meta.get("skill", "moderate_learner")
emotion = meta.get("emotion", "neutral")
# Get personal context from profile
personal_context = get_personal_context()
personal_section = f"\n\n{personal_context}" if personal_context else ""
SYSTEM_PROMPT = (
"You are CodeMate — an empathetic coding tutor. "
"Explain or answer the question clearly and helpfully. "
f"User context: skill={skill}, emotion={emotion}"
f"{personal_section}"
)
# Add user message to history
_add_to_history(session_id, "user", query)
# Get full conversation history
messages = _get_messages(session_id, SYSTEM_PROMPT)
# Check cache only for first message in session (no history)
if len(messages) <= 2: # system + user only
cache_key = f"{skill}|{emotion}|{query}"[:4096]
cached = _cache_get(cache_key)
if cached:
_add_to_history(session_id, "assistant", cached)
return jsonify({"reply": cached, "cached": True, "session_id": session_id})
# Get AI response with full conversation context
reply = run_llama(messages=messages, max_tokens=400, temperature=0.1, top_p=0.9)
# Add assistant response to history
_add_to_history(session_id, "assistant", reply)
# Cache only if first message
if len(messages) <= 2:
_cache_set(cache_key, reply)
return jsonify({"reply": reply, "session_id": session_id})
except Exception as e:
logging.exception("/ask failed")
return jsonify({"error": str(e)}), 500
# ---- /ask_stream -----------------------------------------------------
@app.route("/ask_stream", methods=["POST"])
def ask_stream():
"""Stream GPT responses (Server-Sent Events) with conversation history."""
try:
data = request.json or {}
query = data.get("query", "").strip()
if not query:
return jsonify({"error": "query is required"}), 400
# Session management
session_id = data.get("session_id")
session_id = _get_or_create_session(session_id)
meta = data.get("metadata", {})
skill = meta.get("skill", "moderate_learner")
emotion = meta.get("emotion", "neutral")
# Get personal context from profile
personal_context = get_personal_context()
personal_section = f"\n\n{personal_context}" if personal_context else ""
SYSTEM_PROMPT = (
"You are CodeMate — an empathetic coding tutor. "
"Explain or answer the question clearly and helpfully. "
f"User context: skill={skill}, emotion={emotion}"
f"{personal_section}"
)
# Add user message to history
_add_to_history(session_id, "user", query)
# Get full conversation history
messages = _get_messages(session_id, SYSTEM_PROMPT)
@stream_with_context
def event_stream():
collected_response = []
try:
# Send session_id first
yield f"event: session\ndata: {session_id}\n\n"
for chunk in stream_llama(messages=messages, max_tokens=400, temperature=0.1, top_p=0.9):
if chunk:
collected_response.append(chunk)
yield f"data: {chunk}\n\n"
# Add complete response to history
full_response = "".join(collected_response)
if full_response:
_add_to_history(session_id, "assistant", full_response)
yield "event: done\ndata: [DONE]\n\n"
except Exception as ex:
logging.exception("/ask_stream failed during streaming")
yield f"event: error\ndata: {str(ex)}\n\n"
return Response(event_stream(), mimetype="text/event-stream")
except Exception as e:
logging.exception("/ask_stream failed")
return jsonify({"error": str(e)}), 500
@app.route("/code_assist", methods=["POST"])
def code_assist():
try:
data = request.json or {}
query = data.get("query", "").strip()
if not query:
return jsonify({"error": "query is required"}), 400
# Session management
session_id = data.get("session_id")
session_id = _get_or_create_session(session_id)
metadata = data.get("metadata", {})
emotion = metadata.get("emotion", "neutral")
skill = metadata.get("skill", "moderate_learner")
# Load existing profile data
profile = load_profile()
style = profile.get("coding_style", {})
fingerprint = profile.get("fingerprint", {})
# Get personal context from profile
personal_context = get_personal_context()
personal_section = f"\n\n{personal_context}" if personal_context else ""
# Create adaptive persona prompt (this creates a complete prompt string)
prompt_content = build_persona_prompt(query, emotion, skill, style, fingerprint)
# Add user message to history
_add_to_history(session_id, "user", query)
# Build messages array with system context and history
system_prompt = (
"You are CodeMate — an empathetic coding assistant. "
f"User profile: skill={skill}, emotion={emotion}. "
"Provide helpful code examples and explanations."
f"{personal_section}"
)
messages = _get_messages(session_id, system_prompt)
# If this is the first message, replace the user message with the full persona prompt
if len(messages) == 2: # system + user only
messages[-1]["content"] = prompt_content
# Get AI output with conversation context
reply = run_llama(messages=messages, max_tokens=400, temperature=0.2)
# Add assistant response to history
_add_to_history(session_id, "assistant", reply)
# Extract any code returned by AI
import re
code_blocks = re.findall(r"```(?:python)?\n([\s\S]*?)```", reply)
if code_blocks:
combined = "\n".join(code_blocks)
# Analyze AI-generated code (self-learning)
style_update = analyze_code(combined)
fingerprint_update = analyze_fingerprint(combined)
# Update persistent user profile
update_profile_components(coding_style=style_update, fingerprint=fingerprint_update)
# Store in memory unless disabled via env
if os.environ.get("DISABLE_CODE_MEMORY", "0") != "1":
save_to_memory(query, combined, emotion, skill)
return jsonify({"reply": reply, "session_id": session_id})
except Exception as e:
logging.exception("/code_assist failed")
return jsonify({"error": str(e)}), 500
# ---- /analyze --------------------------------------------------------
@app.route("/analyze", methods=["POST"])
def analyze():
try:
req = request.json or {}
code = req.get("code", "")
if not code:
return jsonify({"error": "code is required"}), 400
style = analyze_code(code)
update_profile(style)
return jsonify({"message": "Profile updated", "style": style})
except Exception as e:
logging.exception("/analyze failed")
return jsonify({"error": str(e)}), 500
# ---- /refactor -------------------------------------------------------
@app.route("/refactor", methods=["POST"])
def refactor():
try:
req = request.json or {}
code = req.get("code", "")
if not code:
return jsonify({"error": "code is required"}), 400
result = suggest_refactor(code)
return jsonify({"refactored": result})
except Exception as e:
logging.exception("/refactor failed")
return jsonify({"error": str(e)}), 500
# ---- /profile --------------------------------------------------------
@app.route("/profile", methods=["GET"])
def profile():
try:
return jsonify(load_profile())
except Exception as e:
logging.exception("/profile failed")
return jsonify({"error": str(e)}), 500
# ---- /train --------------------------------------------------
@app.route("/train", methods=["POST"])
def train():
"""Train both coding style and fingerprint from provided raw code string or files.
Input JSON:
code: string (optional) - single code sample to learn from
files: array of objects (optional) - multiple code files
Each file object: { "filename": "file.py", "content": "code..." }
At least one of 'code' or 'files' must be provided.
"""
try:
req = request.json or {}
code = req.get("code", "").strip()
files = req.get("files", [])
if not code and not files:
return jsonify({"error": "Either 'code' or 'files' is required"}), 400
# Collect all code samples
code_samples = []
processed_files = []
# Add direct code if provided
if code:
code_samples.append(code)
processed_files.append({"filename": "direct_input", "size": len(code)})
# Process multiple files if provided
if files:
if not isinstance(files, list):
return jsonify({"error": "'files' must be an array"}), 400
for idx, file_obj in enumerate(files):
if not isinstance(file_obj, dict):
return jsonify({"error": f"File at index {idx} must be an object with 'filename' and 'content'"}), 400
filename = file_obj.get("filename", f"file_{idx}")
content = file_obj.get("content", "").strip()
if not content:
continue # Skip empty files
code_samples.append(content)
processed_files.append({"filename": filename, "size": len(content)})
if not code_samples:
return jsonify({"error": "No valid code content found"}), 400
# Combine all code samples for analysis
combined_code = "\n\n# --- File Separator ---\n\n".join(code_samples)
# Analyze the combined code
style_update = analyze_code(combined_code)
fingerprint_update = analyze_fingerprint(combined_code)
# Update profile
update_profile_components(coding_style=style_update, fingerprint=fingerprint_update)
# Calculate total size
total_size = sum(f["size"] for f in processed_files)
return jsonify({
"message": "Profile trained successfully",
"files_processed": len(processed_files),
"total_characters": total_size,
"files": processed_files,
"coding_style": style_update,
"fingerprint": fingerprint_update
})
except Exception as e:
logging.exception("/train failed")
return jsonify({"error": str(e)}), 500
# ---- /train_overall ----------------------------------------------
@app.route("/train_overall", methods=["POST"])
def train_overall():
"""Store user's personal information for personalized AI responses.
This endpoint stores any personal details, preferences, or background information
about the user. The AI will use this context in all future conversations.
Input JSON:
personal_data: string (required) - text containing personal information
Examples: "My name is John. I'm 20 years old. I'm studying CS at MIT."
"I prefer Python over JavaScript. I work at Google."
"My hobbies include gaming and reading sci-fi novels."
Returns:
JSON with success message and updated personal info summary
"""
try:
req = request.json or {}
personal_data = req.get("personal_data", "").strip()
if not personal_data:
return jsonify({"error": "personal_data is required"}), 400
# Limit size to prevent abuse (max 50KB per entry)
if len(personal_data) > 50000:
return jsonify({"error": "personal_data too large (max 50KB)"}), 400
# Update profile with personal information
profile = update_personal_info(personal_data)
# Get summary of stored data
personal_info = profile.get("personal_info", {})
entry_count = len(personal_info.get("entries", []))
total_chars = len(personal_info.get("raw_data", ""))
return jsonify({
"message": "Personal information saved successfully",
"entry_count": entry_count,
"total_characters": total_chars,
"preview": personal_data[:200] + ("..." if len(personal_data) > 200 else "")
})
except Exception as e:
logging.exception("/train_overall failed")
return jsonify({"error": str(e)}), 500
# ---- /get_personal_info ------------------------------------------
@app.route("/get_personal_info", methods=["GET"])
def get_personal_info():
"""Retrieve all stored personal information.
Returns:
JSON with all personal info entries and summary
"""
try:
profile = load_profile()
personal_info = profile.get("personal_info", {})
return jsonify({
"entries": personal_info.get("entries", []),
"raw_data": personal_info.get("raw_data", ""),
"entry_count": len(personal_info.get("entries", [])),
"total_characters": len(personal_info.get("raw_data", ""))
})
except Exception as e:
logging.exception("/get_personal_info failed")
return jsonify({"error": str(e)}), 500
# ---- /clear_personal_info ----------------------------------------
@app.route("/clear_personal_info", methods=["POST"])
def clear_personal_info():
"""Clear all stored personal information.
Returns:
JSON with success message
"""
try:
profile = load_profile()
if "personal_info" in profile:
del profile["personal_info"]
profile["last_updated"] = datetime.datetime.now().isoformat()
# Save updated profile
os.makedirs(os.path.dirname(PROFILE_PATH) or ".", exist_ok=True)
import tempfile
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(PROFILE_PATH) or ".")
try:
with os.fdopen(fd, "w", encoding="utf-8") as tmp:
json.dump(profile, tmp, indent=2, ensure_ascii=False)
os.replace(tmp_path, PROFILE_PATH)
except Exception:
try:
os.remove(tmp_path)
except Exception:
pass
raise
return jsonify({"message": "Personal information cleared successfully"})
except Exception as e:
logging.exception("/clear_personal_info failed")
return jsonify({"error": str(e)}), 500
# ---- /get_history ------------------------------------------------
@app.route("/get_history", methods=["POST"])
def get_history():
"""Get conversation history for a session.
Input JSON:
session_id: string (required) - session ID to retrieve history for
"""
try:
data = request.json or {}
session_id = data.get("session_id")
if not session_id:
return jsonify({"error": "session_id is required"}), 400
if session_id not in conversation_history:
return jsonify({"error": "Session not found"}), 404
messages = conversation_history[session_id]["messages"]
last_active = conversation_history[session_id]["last_active"]
return jsonify({
"session_id": session_id,
"messages": messages,
"last_active": last_active,
"message_count": len(messages)
})
except Exception as e:
logging.exception("/get_history failed")
return jsonify({"error": str(e)}), 500
# ---- /clear_history ----------------------------------------------
@app.route("/clear_history", methods=["POST"])
def clear_history():
"""Clear conversation history for a session or all sessions.
Input JSON:
session_id: string (optional) - specific session to clear, or omit to clear all
"""
try:
data = request.json or {}
session_id = data.get("session_id")
if session_id:
if session_id in conversation_history:
del conversation_history[session_id]
return jsonify({"message": f"History cleared for session {session_id}"})
else:
return jsonify({"error": "Session not found"}), 404
else:
# Clear all sessions
count = len(conversation_history)
conversation_history.clear()
return jsonify({"message": f"Cleared history for {count} sessions"})
except Exception as e:
logging.exception("/clear_history failed")
return jsonify({"error": str(e)}), 500
# ---- /list_sessions ----------------------------------------------
@app.route("/list_sessions", methods=["GET"])
def list_sessions():
"""List all active sessions with their message counts."""
try:
sessions = []
for sid, data in conversation_history.items():
sessions.append({
"session_id": sid,
"message_count": len(data["messages"]),
"last_active": data["last_active"]
})
# Sort by last active (most recent first)
sessions.sort(key=lambda x: x["last_active"], reverse=True)
return jsonify({
"sessions": sessions,
"total_count": len(sessions)
})
except Exception as e:
logging.exception("/list_sessions failed")
return jsonify({"error": str(e)}), 500
# ---------------------------------------------
# Entry point
# ---------------------------------------------
if __name__ == "__main__":
host = os.environ.get("HOST", "0.0.0.0")
port = int(os.environ.get("PORT", 7860))
app.run(host=host, port=port)