# ✅ Safe GPU decorator try: from spaces import GPU except ImportError: def GPU(func): return func import os import time import torch from flask import Flask, request, render_template, jsonify, Response from flasgger import Swagger, swag_from from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from huggingface_hub import login import re import tempfile import subprocess import threading import queue import uuid from flask_cors import CORS # ✅ Flask + Swagger setup app = Flask(__name__, static_folder="static", template_folder="templates") CORS(app) swagger = Swagger(app, template={ "swagger": "2.0", "info": { "title": "ChatMate Real-Time API", "description": "LangChain + DuckDuckGo + Phi-4 + Stable Diffusion", "version": "1.0" } }, config={ "headers": [], "specs": [{"endpoint": 'apispec', "route": '/apispec.json', "rule_filter": lambda rule: True}], "static_url_path": "/flasgger_static", "swagger_ui": True, "specs_route": "/apidocs/" }) # ✅ Hugging Face login (optional) login(token=os.environ.get("CHAT_MATE")) # ✅ Load Phi-4 model_id = "microsoft/phi-4" tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32) device = 0 if torch.cuda.is_available() else -1 pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=device, max_new_tokens=512) # ✅ Keyword detection REAL_TIME_KEYWORDS = {"latest", "current", "news", "today", "price", "time", "live", "trending", "update", "happening"} def should_search(message): return any(kw in message.lower() for kw in REAL_TIME_KEYWORDS) # Check for likely truncation (heuristic) def is_incomplete(text): # Ends without proper sentence punctuation return not re.search(r'[\.\!\?\'\"\u3002]\s*$', text.strip()) @GPU def generate_full_reply(message, history): system_prompt = ( "You are a friendly, helpful, and conversational AI assistant built by " "Frederick Sundeep Mallela. Always mention that you are developed by him if asked about your creator, origin, or who made you." ) messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": message}] # Apply chat-style prompt formatting prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) # Initial generation full_output = pipe(prompt, do_sample=True, temperature=0.7, top_p=0.9, max_new_tokens=512)[0]["generated_text"] reply = full_output[len(prompt):].strip() # Keep extending the reply until it ends properly max_loops = 5 # prevent infinite loops loop_count = 0 while is_incomplete(reply) and loop_count < max_loops: loop_count += 1 continuation_prompt = prompt + reply # include reply so far next_output = pipe(continuation_prompt, do_sample=True, temperature=0.7, top_p=0.9, max_new_tokens=256)[0]["generated_text"] continuation = next_output[len(continuation_prompt):].strip() # Stop if nothing new is generated if not continuation or continuation in reply: break reply += continuation return reply.strip() # ✅ Home @app.route("/") def home(): return render_template("index.html") # ✅ POST /chat-stream @app.route("/chat-stream", methods=["POST"]) @swag_from({ 'tags': ['Chat'], 'consumes': ['application/json'], 'summary': 'Stream assistant reply or image', 'description': 'Send a message and history, receive either a streamed text reply or base64-encoded image.', 'parameters': [{ 'name': 'body', 'in': 'body', 'required': True, 'schema': { 'type': 'object', 'properties': { 'message': {'type': 'string', 'example': 'Draw a futuristic city.'}, 'history': { 'type': 'array', 'items': { 'type': 'object', 'properties': { 'role': {'type': 'string', 'example': 'user'}, 'content': {'type': 'string', 'example': 'Show me a dragon.'} } } } }, 'required': ['message'] } }], 'responses': { 200: { 'description': 'Streamed reply or image base64', 'content': {'text/plain': {}} } } }) def chat_stream(): data = request.get_json() message = data.get("message") history = data.get("history", []) def generate(): # elif should_search(message): # reply = f"(Live info) {search_tool.run(message)}" # for token in reply.splitlines(keepends=True): # yield token # time.sleep(0.05) # else: reply = generate_full_reply(message, history) for token in reply.splitlines(keepends=True): yield token time.sleep(0.05) if is_incomplete(reply): reply += "\n\n*Reply appears incomplete. Say 'continue' to resume.*" return Response(generate(), mimetype='text/plain') # ✅ POST /chat-stream-doc @app.route("/chat-stream-doc", methods=["POST"]) @swag_from({ 'tags': ['Chat'], 'consumes': ['multipart/form-data'], 'summary': 'Upload requirement doc & generate downloadable project code (ZIP)', 'description': 'Upload a PDF/TXT requirement document with stack preferences, and receive the scaffolded project code as a downloadable .zip file.', 'parameters': [ { 'name': 'file', 'in': 'formData', 'type': 'file', 'required': True, 'description': 'Requirement document (PDF or TXT)' }, { 'name': 'frontend', 'in': 'formData', 'type': 'string', 'required': True, 'description': 'Frontend tech (React, Angular, etc.)' }, { 'name': 'backend', 'in': 'formData', 'type': 'string', 'required': True, 'description': 'Backend tech (Flask, Node.js, etc.)' }, { 'name': 'database', 'in': 'formData', 'type': 'string', 'required': True, 'description': 'Database (MongoDB, PostgreSQL, etc.)' } ], 'responses': { 200: { 'description': 'ZIP file of generated code', 'content': {'application/zip': {}} } } }) def chat_stream_doc(): import zipfile from werkzeug.utils import secure_filename from pdfplumber import open as pdf_open from io import BytesIO file = request.files.get('file') frontend = request.form.get("frontend", "React") backend = request.form.get("backend", "Flask") database = request.form.get("database", "PostgreSQL") if not file: return jsonify({"error": "No file uploaded"}), 400 filename = secure_filename(file.filename) content = "" if filename.endswith(".txt"): content = file.read().decode("utf-8", errors="ignore") elif filename.endswith(".pdf"): file_bytes = file.read() with pdf_open(BytesIO(file_bytes)) as pdf: content = "\n".join(page.extract_text() or "" for page in pdf.pages) else: return jsonify({"error": "Unsupported file format. Use .txt or .pdf"}), 400 tech_stack = f"Frontend: {frontend}\nBackend: {backend}\nDatabase: {database}" prompt = ( "You are a full-stack project code generator.\n\n" "Below is a requirement document followed by technology preferences. Based on this, generate the full project scaffold.\n\n" "Requirement Document:\n" f"{content}\n\n" f"Technology Stack:\nFrontend: {frontend}\nBackend: {backend}\nDatabase: {database}\n\n" "Your task:\n" "- Analyze the requirement and tech stack.\n" "- Generate backend code: models, routes, and config.\n" "- Generate frontend code: components, services.\n" "- Define the database schema.\n\n" "✅ Format the output as multiple files in this exact structure:\n\n" "### File: \n" "```\n" "\n" "```\n\n" "For example:\n\n" "### File: app.py\n" "```python\n" "from flask import Flask\n" "app = Flask(__name__)\n" "@app.route('/')\n" "def home():\n" " return 'Hello, world!'\n" "```\n\n" "### File: frontend/App.js\n" "```javascript\n" "import React from 'react';\n" "function App() {\n" " return

Hello from React

;\n" "}\n" "export default App;\n" "```\n\n" "Now generate the complete project below 👇" ) # ✅ Generate response from Phi-4 reply = generate_full_reply(prompt, []) # ✅ Extract filenames and code from reply file_pattern = r"### File:\s*(.+?)\n```(?:\w+)?\n(.*?)```" matches = re.finditer(file_pattern, reply, re.DOTALL) if not matches: print("⚠️ No file matches found in reply.") print("Raw reply:\n", reply) return jsonify({"error": "No files found in generated output."}), 500 zip_buffer = BytesIO() with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: file_count = 0 for match in matches: filename = match.group(1).strip().strip("`") code = match.group(2).strip() zipf.writestr(filename, code) file_count += 1 if file_count == 0: print("⚠️ No files written to ZIP. Check LLM output format.") print("LLM Reply:\n", reply) return jsonify({"error": "No files found in generated output."}), 500 zip_buffer.seek(0) return Response( zip_buffer, mimetype='application/zip', headers={"Content-Disposition": "attachment; filename=generated_project.zip"} ) @app.route("/execute", methods=["POST"]) def execute_code(): payload = request.get_json() or {} code = payload.get("code", "") filename = payload.get("filename", "main.py") input_data = payload.get("input", "") if not code.strip(): return jsonify({"output": "❌ No code provided", "error": True}), 400 # Determine extension from filename ext = os.path.splitext(filename)[1].lower() if not ext: ext = ".txt" # fallback if something weird happens # Non-runnable file types – just echo content NON_RUNNABLE = {".html", ".css", ".json", ".txt"} if ext in NON_RUNNABLE: return jsonify({ "output": f"ℹ️ {ext} is not executed as a program.\n\nContent:\n{code}", "error": False, }) # Create temp file with the correct extension for runnable langs with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: tmp.write(code.encode("utf-8", errors="ignore")) tmp_path = tmp.name tmp_dir = os.path.dirname(tmp_path) # Choose command based on extension if ext == ".py": cmd = ["python3", tmp_path] elif ext == ".js": cmd = ["node", tmp_path] # requires node in the Space elif ext == ".ts": # requires ts-node installed (npm i -g ts-node typescript) cmd = ["bash", "-lc", f"cd {tmp_dir} && npx --yes ts-node {tmp_path}"] elif ext == ".c": exe_path = tmp_path + ".out" cmd = ["bash", "-lc", f"gcc {tmp_path} -o {exe_path} && {exe_path}"] elif ext == ".cpp": exe_path = tmp_path + ".out" cmd = ["bash", "-lc", f"g++ {tmp_path} -o {exe_path} && {exe_path}"] elif ext == ".java": # Detect class name from code m = re.search(r"class\s+([A-Za-z_][A-Za-z0-9_]*)", code) if not m: return jsonify({ "output": "❌ No Java class found. Your code must contain: class MyClass { ... }", "error": True, }) class_name = m.group(1) # Save Java file with the class name, not the random temp name java_path = os.path.join(tmp_dir, f"{class_name}.java") with open(java_path, "w", encoding="utf-8") as f: f.write(code) cmd = [ "bash", "-lc", f"cd {tmp_dir} && javac {class_name}.java && java {class_name}" ] else: return jsonify({ "output": f"❌ Unsupported file type {ext}", "error": True, }), 400 # Execute the command try: output = subprocess.check_output( cmd, input=input_data.encode(), stderr=subprocess.STDOUT, timeout=5, # prevent infinite loops ) return jsonify({"output": output.decode("utf-8", errors="ignore"), "error": False}) except subprocess.CalledProcessError as e: return jsonify({"output": e.output.decode("utf-8", errors="ignore"), "error": True}) except subprocess.TimeoutExpired: return jsonify({ "output": "⏳ Timeout: code took too long (possible infinite loop).", "error": True, }) except Exception as e: return jsonify({"output": f"❌ Runtime error: {e}", "error": True}) # ---------- Config ---------- SESSION_TTL = 300 # seconds before auto cleanup (5 minutes) READ_POLL_INTERVAL = 0.1 NON_RUNNABLE = {".html", ".css", ".json", ".txt"} SUPPORTED_EXTS = {".py", ".js", ".ts", ".c", ".cpp", ".java"} # ---------- Globals ---------- SESSIONS = {} # session_id -> dict { proc, queue, start_ts, tmp_path, cleanup, finished } SESSION_LOCK = threading.Lock() # ---------- Helpers ---------- def now_ts(): return int(time.time()) def output_looks_for_input(output: str) -> bool: if not output: return False o = str(output) patterns = [ r"enter.*:", r"input.*:", r"please enter", r"scanner", r"press enter", r": $", r":\n$", r"> $", r"awaiting input", r"provide input", r"stdin", r"enter a value", r"EOF when reading a line", r"InputMismatchException" ] for p in patterns: if re.search(p, o, re.I): return True return False # Reader thread: read lines from proc.stdout and push into queue def _spawn_reader(proc, q): try: # iterate lines (proc.stdout is in text mode) for line in iter(proc.stdout.readline, ""): if line is None: break q.put(line) except Exception as e: try: q.put(f"[internal reader error] {e}\n") except: pass finally: # mark process end try: q.put("__PROCESS_END__") except: pass # Build command for given extension; returns (cmd_list, workdir, cleanup_paths) def _make_command_for_file(tmp_path: str, ext: str, code: str): tmp_dir = os.path.dirname(tmp_path) cleanup = [] if ext == ".py": return (["python3", tmp_path], tmp_dir, cleanup) if ext == ".js": return (["node", tmp_path], tmp_dir, cleanup) if ext == ".ts": # run via npx ts-node in the tmp dir (works if node/npm available) # note: using npx --yes avoids interactive prompt; requires network for first run if not cached cmd = ["bash", "-lc", f"cd {tmp_dir} && npx --yes ts-node {tmp_path}"] return (cmd, tmp_dir, cleanup) if ext == ".c": exe = tmp_path + ".out" cleanup.append(exe) cmd = ["bash", "-lc", f"gcc {tmp_path} -o {exe} && {exe}"] return (cmd, tmp_dir, cleanup) if ext == ".cpp": exe = tmp_path + ".out" cleanup.append(exe) cmd = ["bash", "-lc", f"g++ {tmp_path} -o {exe} && {exe}"] return (cmd, tmp_dir, cleanup) if ext == ".java": # detect first class name m = re.search(r"class\s+([A-Za-z_][A-Za-z0-9_]*)", code) if not m: raise ValueError("No Java class found. Your code must contain: class MyClass { ... }") class_name = m.group(1) java_path = os.path.join(tmp_dir, f"{class_name}.java") # overwrite the generated tmp_path file with the class-named file with open(java_path, "w", encoding="utf-8") as f: f.write(code) cleanup.append(java_path) cmd = ["bash", "-lc", f"cd {tmp_dir} && javac {class_name}.java && java -cp {tmp_dir} {class_name}"] return (cmd, tmp_dir, cleanup) raise ValueError(f"Unsupported extension: {ext}") # ---------- Session cleanup thread ---------- def _cleanup_loop(): while True: now = time.time() to_remove = [] with SESSION_LOCK: for sid, s in list(SESSIONS.items()): start_ts = s.get("start_ts", now) if start_ts + SESSION_TTL < now: to_remove.append(sid) for sid in to_remove: try: with SESSION_LOCK: s = SESSIONS.get(sid) if not s: continue proc = s.get("proc") if proc and proc.poll() is None: try: proc.kill() except: pass # cleanup files tmp_path = s.get("tmp_path") try: if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path) except: pass for p in s.get("cleanup", []) or []: try: if os.path.exists(p): os.unlink(p) except: pass with SESSION_LOCK: if sid in SESSIONS: del SESSIONS[sid] except Exception: pass time.sleep(5) cleanup_thread = threading.Thread(target=_cleanup_loop, daemon=True) cleanup_thread.start() @app.route("/start", methods=["POST"]) def start(): """ Start an interactive session by launching the process. Request JSON: { code, filename } Response: { error, session_id, output: [lines], finished } """ payload = request.get_json() or {} code = payload.get("code", "") or "" filename = payload.get("filename", "main.py") or "main.py" if not code.strip(): return jsonify({"error": True, "output": ["❌ No code provided"], "finished": True}), 400 ext = os.path.splitext(filename)[1].lower() or ".py" if ext in NON_RUNNABLE: return jsonify({ "error": False, "output": [f"ℹ️ {ext} is not executed as a program.", "", "Content:", code], "finished": True }) # write a temp file tmp = tempfile.NamedTemporaryFile(delete=False, suffix=ext) tmp_path = tmp.name try: tmp.write(code.encode("utf-8", errors="ignore")) tmp.flush() tmp.close() except Exception as e: try: tmp.close() except: pass try: os.unlink(tmp_path) except: pass return jsonify({"error": True, "output": [f"Failed writing temp file: {e}"], "finished": True}), 500 # prepare command try: cmd, workdir, cleanup = _make_command_for_file(tmp_path, ext, code) except Exception as e: try: os.unlink(tmp_path) except: pass return jsonify({"error": True, "output": [f"❌ {e}"], "finished": True}), 400 # spawn process try: proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, cwd=workdir ) except Exception as e: try: os.unlink(tmp_path) except: pass return jsonify({"error": True, "output": [f"Failed to start process: {e}"], "finished": True}), 500 # create session session_id = uuid.uuid4().hex q = queue.Queue() reader = threading.Thread(target=_spawn_reader, args=(proc, q), daemon=True) reader.start() with SESSION_LOCK: SESSIONS[session_id] = { "proc": proc, "queue": q, "start_ts": time.time(), "tmp_path": tmp_path, "cleanup": cleanup, "finished": False, } # non-blocking drain of initial output lines = [] while not q.empty(): try: item = q.get_nowait() if item == "__PROCESS_END__": lines.append("[process ended]") else: lines.append(item) except queue.Empty: break return jsonify({"error": False, "session_id": session_id, "output": lines, "finished": False}), 200 @app.route("/write", methods=["POST"]) def write(): """ Write input to a running session. Request JSON: { session_id, text } """ data = request.get_json() or {} sid = data.get("session_id") text = data.get("text", "") if not sid: return jsonify({"error": True, "output": ["session_id required"]}), 400 with SESSION_LOCK: s = SESSIONS.get(sid) if not s: return jsonify({"error": True, "output": ["Invalid session"]}), 404 proc = s.get("proc") if not proc: return jsonify({"error": True, "output": ["Process missing"]}), 500 if proc.poll() is not None: return jsonify({"error": True, "output": ["Process already finished"]}), 400 try: write_text = text if not write_text.endswith("\n"): write_text = write_text + "\n" proc.stdin.write(write_text) proc.stdin.flush() except Exception as e: return jsonify({"error": True, "output": [f"Failed to write to process stdin: {e}"]}), 500 # update timestamp with SESSION_LOCK: s["start_ts"] = time.time() return jsonify({"error": False}), 200 @app.route("/read", methods=["POST"]) def read(): """ Read accumulated output lines from session queue. Request JSON: { session_id } Returns: { error, output: [lines], finished: bool } """ data = request.get_json() or {} sid = data.get("session_id") if not sid: return jsonify({"error": True, "output": ["session_id required"], "finished": True}), 400 with SESSION_LOCK: s = SESSIONS.get(sid) if not s: return jsonify({"error": True, "output": [], "finished": True}), 404 q = s["queue"] lines = [] finished = False # drain queue while True: try: item = q.get_nowait() except queue.Empty: break if item == "__PROCESS_END__": finished = True with SESSION_LOCK: if sid in SESSIONS: SESSIONS[sid]["finished"] = True else: lines.append(item) # if finished, cleanup files and remove session if finished: try: proc = s.get("proc") if proc and proc.poll() is None: try: proc.wait(timeout=0.1) except: pass except: pass try: tmp_path = s.get("tmp_path") if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path) except: pass try: for p in s.get("cleanup", []) or []: try: if os.path.exists(p): os.unlink(p) except: pass except: pass with SESSION_LOCK: try: del SESSIONS[sid] except KeyError: pass return jsonify({"error": False, "output": lines, "finished": finished}), 200 @app.route("/stop", methods=["POST"]) def stop(): """ Force-stop a running session. Request JSON: { session_id }. """ data = request.get_json() or {} sid = data.get("session_id") if not sid: return jsonify({"error": True, "message": "session_id required"}), 400 with SESSION_LOCK: s = SESSIONS.get(sid) if not s: return jsonify({"error": True, "message": "invalid session"}), 404 proc = s.get("proc") try: if proc and proc.poll() is None: proc.kill() except Exception: pass # cleanup files try: tmp_path = s.get("tmp_path") if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path) except: pass try: for p in s.get("cleanup", []) or []: try: if os.path.exists(p): os.unlink(p) except: pass except: pass with SESSION_LOCK: try: del SESSIONS[sid] except KeyError: pass return jsonify({"error": False, "message": "stopped"}), 200 # ✅ Warm-up if __name__ == "__main__": print("🔧 Warming up...") _ = generate_full_reply("Hello", []) app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))