Spaces:
Runtime error
Runtime error
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # β CodeMind AI β HuggingFace Spaces app.py FINAL v3 β | |
| # β β All bugs fixed β Always Online β API Key β | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # HOW TO SET YOUR API KEY IN HUGGINGFACE (do this ONCE): | |
| # | |
| # 1. Open your Space β click Settings | |
| # 2. Scroll to "Variables and Secrets" | |
| # 3. Click "New Secret" | |
| # 4. Name β CODEMIND_API_KEY | |
| # 5. Value β cm-7305859eb37f36f24169eeb16f6bef6ff3ad825cddc60254d2c124d3ccb9f9c2 | |
| # 6. Click Save β | |
| # | |
| # That's it! Your AI is now protected with your API key. | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import os, re, ast, json, time, math, random, hashlib | |
| import warnings; warnings.filterwarnings("ignore") | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from transformers import GPT2TokenizerFast | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Optional, Any, Tuple | |
| from collections import deque | |
| import gradio as gr | |
| from fastapi import FastAPI, HTTPException, Depends | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from fastapi.security import APIKeyHeader | |
| from pydantic import BaseModel | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DEVICE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"π CodeMind AI | Device: {device.upper()}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # API KEY β loaded from HuggingFace Secret CODEMIND_API_KEY | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| API_KEY = os.environ.get("CODEMIND_API_KEY", "codemind-change-me") | |
| print("β API Key ready" if "change-me" not in API_KEY | |
| else "β οΈ Set CODEMIND_API_KEY in HF Space β Settings β Secrets!") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONFIG | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Config: | |
| vocab_size: int = 50304 | |
| n_embd: int = 512 | |
| n_head: int = 8 | |
| n_kv_head: int = 4 | |
| n_layer: int = 8 | |
| block_size: int = 512 | |
| dropout: float = 0.1 | |
| temperature: float = 0.8 | |
| top_k: int = 50 | |
| top_p: float = 0.95 | |
| rep_penalty: float = 1.1 | |
| max_new_tokens: int = 256 | |
| cfg = Config() | |
| random.seed(42); torch.manual_seed(42) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOKENIZER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("π Loading tokenizer...") | |
| tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") | |
| tokenizer.pad_token = tokenizer.eos_token | |
| _SPECIAL = [ | |
| '<|generate|>','<|complete|>','<|explain|>','<|bugfix|>', | |
| '<|optimize|>','<|translate|>','<|docstring|>','<|unittest|>', | |
| '<|review|>','<|refactor|>','<|security|>','<|complexity|>', | |
| '<|async|>','<|typehint|>','<|format|>','<|endofcode|>', | |
| '<|python|>','<|javascript|>','<|java|>','<|cpp|>', | |
| '<|typescript|>','<|go|>','<|rust|>', | |
| ] | |
| tokenizer.add_special_tokens({'additional_special_tokens': _SPECIAL}) | |
| cfg.vocab_size = len(tokenizer) | |
| print(f"β Vocab: {cfg.vocab_size:,}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODEL β GQA + RoPE + SwiGLU + RMSNorm + KV-Cache (~70M params) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RMSNorm(nn.Module): | |
| def __init__(self, d, eps=1e-8): | |
| super().__init__() | |
| self.scale = nn.Parameter(torch.ones(d)); self.eps = eps | |
| def forward(self, x): | |
| return self.scale * x / (x.pow(2).mean(-1,keepdim=True).add(self.eps).sqrt()) | |
| class RotaryEmbedding(nn.Module): | |
| def __init__(self, dim): | |
| super().__init__() | |
| self.register_buffer("inv_freq", | |
| 1.0 / (10000 ** (torch.arange(0,dim,2).float()/dim))) | |
| def forward(self, T, dev): | |
| t = torch.arange(T, device=dev).float() | |
| f = torch.outer(t, self.inv_freq) | |
| e = torch.cat([f, f], dim=-1) | |
| return e.cos(), e.sin() | |
| def _rot(x): | |
| a, b = x.chunk(2, dim=-1) | |
| return torch.cat([-b, a], dim=-1) | |
| def apply_rope(q, k, cos, sin): | |
| c, s = cos[None,None], sin[None,None] | |
| return (q*c)+(_rot(q)*s), (k*c)+(_rot(k)*s) | |
| class GQA(nn.Module): | |
| def __init__(self, cfg): | |
| super().__init__() | |
| self.nh = cfg.n_head | |
| self.nkv = cfg.n_kv_head | |
| self.hd = cfg.n_embd // cfg.n_head | |
| self.q = nn.Linear(cfg.n_embd, cfg.n_embd, bias=False) | |
| self.k = nn.Linear(cfg.n_embd, self.nkv*self.hd, bias=False) | |
| self.v = nn.Linear(cfg.n_embd, self.nkv*self.hd, bias=False) | |
| self.o = nn.Linear(cfg.n_embd, cfg.n_embd, bias=False) | |
| self.rope = RotaryEmbedding(self.hd) | |
| def forward(self, x, cache=None): | |
| B, T, C = x.shape | |
| cos, sin = self.rope(T, x.device) | |
| q = self.q(x).view(B,T,self.nh, self.hd).transpose(1,2) | |
| k = self.k(x).view(B,T,self.nkv,self.hd).transpose(1,2) | |
| v = self.v(x).view(B,T,self.nkv,self.hd).transpose(1,2) | |
| q, k = apply_rope(q, k, cos, sin) | |
| if cache is not None: | |
| k = torch.cat([cache[0], k], dim=2) | |
| v = torch.cat([cache[1], v], dim=2) | |
| new_cache = (k.detach(), v.detach()) | |
| k = k.repeat_interleave(self.nh // self.nkv, dim=1) | |
| v = v.repeat_interleave(self.nh // self.nkv, dim=1) | |
| out = F.scaled_dot_product_attention(q, k, v, is_causal=True, dropout_p=0.0) | |
| return self.o(out.transpose(1,2).contiguous().view(B,T,C)), new_cache | |
| class SwiGLU(nn.Module): | |
| def __init__(self, cfg): | |
| super().__init__() | |
| h = int(cfg.n_embd * 8/3) | |
| self.w1 = nn.Linear(cfg.n_embd, h, bias=False) | |
| self.w2 = nn.Linear(h, cfg.n_embd, bias=False) | |
| self.w3 = nn.Linear(cfg.n_embd, h, bias=False) | |
| def forward(self, x): | |
| return self.w2(F.silu(self.w1(x)) * self.w3(x)) | |
| class Block(nn.Module): | |
| def __init__(self, cfg): | |
| super().__init__() | |
| self.n1 = RMSNorm(cfg.n_embd) | |
| self.n2 = RMSNorm(cfg.n_embd) | |
| self.attn = GQA(cfg) | |
| self.mlp = SwiGLU(cfg) | |
| def forward(self, x, cache=None): | |
| a, c = self.attn(self.n1(x), cache) | |
| x = x + a | |
| x = x + self.mlp(self.n2(x)) | |
| return x, c | |
| class CodeMindModel(nn.Module): | |
| def __init__(self, cfg): | |
| super().__init__() | |
| self.emb = nn.Embedding(cfg.vocab_size, cfg.n_embd) | |
| self.blocks = nn.ModuleList([Block(cfg) for _ in range(cfg.n_layer)]) | |
| self.norm = RMSNorm(cfg.n_embd) | |
| self.head = nn.Linear(cfg.n_embd, cfg.vocab_size, bias=False) | |
| self.emb.weight = self.head.weight # weight tying | |
| self.apply(lambda m: nn.init.normal_(m.weight, 0, 0.02) | |
| if isinstance(m, (nn.Linear, nn.Embedding)) else None) | |
| n = sum(p.numel() for p in self.parameters()) | |
| print(f"π§ CodeMind: {n/1e6:.1f}M params | GQA+RoPE+SwiGLU+RMSNorm") | |
| def forward(self, idx, targets=None, caches=None): | |
| x = self.emb(idx) | |
| nc = [] | |
| for i, b in enumerate(self.blocks): | |
| x, c = b(x, caches[i] if caches else None) | |
| nc.append(c) | |
| logits = self.head(self.norm(x)) | |
| loss = (F.cross_entropy(logits.view(-1, logits.size(-1)), | |
| targets.view(-1), ignore_index=-1) | |
| if targets is not None else None) | |
| return logits, loss, nc | |
| def generate(self, ids, max_t=256): | |
| self.eval() | |
| caches = None | |
| start = ids.shape[1] | |
| for _ in range(max_t): | |
| inp = ids[:, -cfg.block_size:] | |
| logits, _, caches = self(inp, caches=caches) | |
| logits = logits[:, -1, :].float() / cfg.temperature | |
| # top-k | |
| v, _ = torch.topk(logits, min(cfg.top_k, logits.size(-1))) | |
| logits[logits < v[:, [-1]]] = float('-inf') | |
| # top-p | |
| probs = F.softmax(logits, dim=-1) | |
| sp, si = torch.sort(probs, descending=True) | |
| cp = sp.cumsum(-1) | |
| sp[cp-sp > cfg.top_p] = 0.0 | |
| probs = torch.zeros_like(probs).scatter_(1, si, sp) | |
| probs /= probs.sum(-1, keepdim=True).clamp(1e-8) | |
| # repetition penalty | |
| for tid in set(ids[0, -20:].tolist()): | |
| if probs[0, tid] > 0: | |
| probs[0, tid] /= cfg.rep_penalty | |
| probs /= probs.sum(-1, keepdim=True).clamp(1e-8) | |
| nxt = torch.multinomial(probs, 1) | |
| if nxt.item() == tokenizer.eos_token_id: | |
| break | |
| ids = torch.cat([ids, nxt], dim=1) | |
| return tokenizer.decode(ids[0, start:].tolist(), | |
| skip_special_tokens=True).strip() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MEMORY | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Memory: | |
| def __init__(self): | |
| self.short = deque(maxlen=20) | |
| self.cache: Dict[str, Any] = {} | |
| self.history: List[Dict] = [] | |
| def push(self, e): | |
| self.short.append(e) | |
| self.history.append({**e, "ts": time.time()}) | |
| def get(self, code, kind): | |
| return self.cache.get(f"{hashlib.md5(code.encode()).hexdigest()}_{kind}") | |
| def set(self, code, kind, v): | |
| self.cache[f"{hashlib.md5(code.encode()).hexdigest()}_{kind}"] = v | |
| def stats(self): | |
| return {"interactions": len(self.history), "cache": len(self.cache)} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 20 FUNCTIONS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Functions: | |
| def __init__(self, model, mem): | |
| self.model = model | |
| self.mem = mem | |
| def _gen(self, prompt, max_t=128): | |
| ids = tokenizer.encode(prompt, return_tensors="pt").to(device) | |
| return self.model.generate(ids[:, -cfg.block_size:], max_t) | |
| # FN 1 β Generate code | |
| def generate_code(self, prompt, lang="python", max_t=256): | |
| lt = f"<|{lang}|>" if f"<|{lang}|>" in _SPECIAL else "" | |
| out = self._gen(f"{lt}<|generate|># Task: {prompt}\n", max_t) | |
| imp = self.suggest_imports(out) | |
| return ("\n".join(imp) + "\n\n" + out) if imp else out | |
| # FN 2 β Complete code | |
| def complete_code(self, partial, max_t=128): | |
| return self._gen(f"<|complete|>\n{partial}", max_t) | |
| # FN 3 β Explain code | |
| def explain_code(self, code): | |
| c = self.mem.get(code, "explain") | |
| if c: return c | |
| r = self._gen(f"<|explain|>\n{code[:400]}\n# Explanation:", 200) | |
| self.mem.set(code, "explain", r) | |
| return r | |
| # FN 4 β Detect bugs | |
| def detect_bugs(self, code): | |
| bugs = [] | |
| try: ast.parse(code); ok = True | |
| except SyntaxError as e: | |
| ok = False | |
| bugs.append({"type": "SyntaxError", "line": e.lineno, "msg": str(e)}) | |
| rules = [ | |
| (r'== None', "StyleWarning", "Use 'is None'"), | |
| (r'!= None', "StyleWarning", "Use 'is not None'"), | |
| (r'except:\s*$', "BestPractice", "Bare except clause"), | |
| (r'print\s*\(', "DebugCode", "Debug print left in"), | |
| (r'TODO|FIXME', "Incomplete", "Unresolved TODO/FIXME"), | |
| ] | |
| for i, line in enumerate(code.split('\n'), 1): | |
| for pat, kind, msg in rules: | |
| if re.search(pat, line): | |
| bugs.append({"type": kind, "line": i, "msg": msg}) | |
| return {"syntax_ok": ok, "bugs": bugs, "total": len(bugs), | |
| "model_tip": self._gen(f"<|bugfix|>\n{code[:300]}\n# Bug:", 80)} | |
| # FN 5 β Optimize | |
| def optimize_code(self, code): | |
| return self._gen(f"<|optimize|>\n# Original:\n{code[:400]}\n# Optimized:", 256) | |
| # FN 6 β Translate | |
| def translate_code(self, code, target="javascript"): | |
| return self._gen(f"<|translate|>\n# Python:\n{code[:400]}\n# {target}:", 300) | |
| # FN 7 β Generate docs | |
| def generate_docs(self, code): | |
| return self._gen(f"<|docstring|>\n{code[:400]}\n# With docstrings:", 300) | |
| # FN 8 β Generate tests | |
| def generate_tests(self, code, fw="pytest"): | |
| return self._gen(f"<|unittest|>\n{code[:350]}\n# {fw} tests:", 350) | |
| # FN 9 β Review code | |
| def review_code(self, code): | |
| lines = [l for l in code.split('\n') if l.strip()] | |
| score, iss = 100, [] | |
| if '"""' not in code: score -= 20; iss.append("β No docstrings") | |
| if '->' not in code: score -= 10; iss.append("β οΈ No type hints") | |
| if not any(l.strip().startswith('#') for l in code.split('\n')): | |
| score -= 10; iss.append("β οΈ No comments") | |
| if len(lines) > 50: score -= 15; iss.append("β οΈ Too long β split it") | |
| g = "A" if score>=90 else "B" if score>=75 else "C" if score>=60 else "D" | |
| return {"score": max(score,0), "grade": g, "issues": iss, "loc": len(lines)} | |
| # FN 10 β Complexity | |
| def analyze_complexity(self, code): | |
| md = 0 | |
| for line in code.split('\n'): | |
| s = line.lstrip() | |
| if s.startswith(('for ','while ')): | |
| md = max(md, (len(line)-len(s))//4+1) | |
| tm = {0:"O(1)",1:"O(n)",2:"O(nΒ²)",3:"O(nΒ³)"}.get(md, f"O(n^{md})") | |
| sp = "O(n)" if re.search(r'\bappend\b|\[\]', code) else "O(1)" | |
| for fn in re.findall(r'def (\w+)\s*\(', code): | |
| if f'{fn}(' in code.split(f'def {fn}')[-1]: | |
| tm = "O(2βΏ) β Recursive"; sp = "O(n) stack"; break | |
| return {"time": tm, "space": sp, "loop_depth": md} | |
| # FN 11 β Suggest imports | |
| def suggest_imports(self, code): | |
| MAP = { | |
| r'\bpd\.': "import pandas as pd", | |
| r'\bnp\.': "import numpy as np", | |
| r'\bplt\.': "import matplotlib.pyplot as plt", | |
| r'\btorch\b': "import torch", | |
| r'\bos\b': "import os", | |
| r'\bre\b': "import re", | |
| r'\bmath\b': "import math", | |
| r'\bjson\b': "import json", | |
| r'\brandom\b': "import random", | |
| r'\bsys\b': "import sys", | |
| r'\btime\b': "import time", | |
| r'\btyping\b': "from typing import List, Dict, Any", | |
| r'\bcollections\b': "from collections import defaultdict, deque", | |
| r'\bpathlib\b': "from pathlib import Path", | |
| } | |
| ex = set(re.findall(r'(?:import|from)\s+(\w+)', code)) | |
| return [s for p,s in MAP.items() | |
| if re.search(p,code) and s.split()[-1].split('.')[0] not in ex] | |
| # FN 12 β Format code | |
| def format_code(self, code): | |
| lines = [] | |
| for line in code.split('\n'): | |
| line = re.sub(r'(?<![=!<>])=(?!=)', ' = ', line) | |
| line = re.sub(r'(?<! ),', ', ', line) | |
| lines.append(line.rstrip()) | |
| return '\n'.join(lines).rstrip() + '\n' | |
| # FN 13 β Summarize | |
| def summarize_code(self, code): | |
| fns = re.findall(r'def (\w+)', code) | |
| cls = re.findall(r'class (\w+)', code) | |
| lns = [l for l in code.split('\n') if l.strip()] | |
| parts = [] | |
| if cls: parts.append(f"Classes: {', '.join(cls)}") | |
| if fns: parts.append(f"Functions: {', '.join(fns)}") | |
| parts.append(f"{len(lns)} lines") | |
| return " | ".join(parts) | |
| # FN 14 β Dead code | |
| def detect_dead_code(self, code): | |
| dead = [] | |
| try: tree = ast.parse(code) | |
| except: return [{"type":"ParseError","msg":"Cannot parse"}] | |
| assigned, used = set(), set() | |
| for n in ast.walk(tree): | |
| if isinstance(n, ast.Assign): | |
| for t in n.targets: | |
| if isinstance(t, ast.Name): assigned.add(t.id) | |
| elif isinstance(n, ast.Name) and not isinstance(n.ctx, ast.Store): | |
| used.add(n.id) | |
| for v in (assigned - used - {'self','_'}): | |
| dead.append({"type":"UnusedVariable","name":v,"msg":f"'{v}' never used"}) | |
| for i, line in enumerate(code.split('\n')): | |
| if line.strip().startswith('return'): | |
| nxt = [l.strip() for l in code.split('\n')[i+1:] if l.strip()] | |
| if nxt and not nxt[0].startswith(('#','def ','class ')): | |
| dead.append({"type":"UnreachableCode","line":i+2,"msg":"Code after return"}) | |
| break | |
| return dead | |
| # FN 15 β Security scan | |
| def scan_security(self, code): | |
| checks = [ | |
| (r'\beval\s*\(', "CRITICAL", "eval() is dangerous"), | |
| (r'\bexec\s*\(', "CRITICAL", "exec() is dangerous"), | |
| (r'os\.system\s*\(', "HIGH", "os.system() risk"), | |
| (r'pickle\.loads?\s*\(', "HIGH", "Unsafe pickle"), | |
| (r'shell\s*=\s*True', "HIGH", "shell=True injection"), | |
| (r'password\s*=\s*["\']',"HIGH", "Hardcoded password"), | |
| (r'api_key\s*=\s*["\']', "HIGH", "Hardcoded API key"), | |
| (r'secret\s*=\s*["\']', "HIGH", "Hardcoded secret"), | |
| (r'\bmd5\b', "MEDIUM", "MD5 is broken"), | |
| (r'http://', "LOW", "Use HTTPS not HTTP"), | |
| ] | |
| vulns = [] | |
| for i, line in enumerate(code.split('\n'), 1): | |
| for pat, sev, msg in checks: | |
| if re.search(pat, line, re.I): | |
| vulns.append({"line":i,"severity":sev,"msg":msg,"snippet":line.strip()[:60]}) | |
| order = {"CRITICAL":0,"HIGH":1,"MEDIUM":2,"LOW":3} | |
| risk = ("CRITICAL" if any(v["severity"]=="CRITICAL" for v in vulns) | |
| else "HIGH" if any(v["severity"]=="HIGH" for v in vulns) | |
| else "MEDIUM" if vulns else "SAFE") | |
| return sorted(vulns, key=lambda x: order.get(x["severity"],9)), risk | |
| # FN 16 β Type hints | |
| def generate_type_hints(self, code): | |
| lines, out = code.split('\n'), [] | |
| for line in lines: | |
| m = re.match(r'(\s*def \w+\()(.*)(\):.*)', line) | |
| if m and '->' not in line: | |
| typed = [] | |
| for p in m.group(2).split(','): | |
| p = p.strip() | |
| if not p or p=='self': typed.append(p) | |
| elif any(k in p for k in ('name','text','msg','key')): typed.append(f"{p}: str") | |
| elif any(k in p for k in ('num','count','n','i','k')): typed.append(f"{p}: int") | |
| elif any(k in p for k in ('flag','is_','has_')): typed.append(f"{p}: bool") | |
| elif any(k in p for k in ('list','arr','data','items')):typed.append(f"{p}: list") | |
| else: typed.append(f"{p}: Any") | |
| out.append(f"{m.group(1)}{', '.join(typed)}) -> Any:") | |
| else: | |
| out.append(line) | |
| return '\n'.join(out) | |
| # FN 17 β Refactor | |
| def refactor_code(self, code): | |
| return self._gen(f"<|refactor|>\n# Messy:\n{code[:400]}\n# Clean:", 300) | |
| # FN 18 β Extract functions | |
| def extract_functions(self, code): | |
| try: tree = ast.parse(code) | |
| except Exception as e: return [{"error": str(e)}] | |
| return [{"name": n.name, | |
| "args": [a.arg for a in n.args.args], | |
| "line": n.lineno, | |
| "docstring": (ast.get_docstring(n) or "")[:60], | |
| "has_return": any(isinstance(x, ast.Return) for x in ast.walk(n))} | |
| for n in ast.walk(tree) if isinstance(n, ast.FunctionDef)] | |
| # FN 19 β Convert to async | |
| def convert_to_async(self, code): | |
| out = re.sub(r'\bdef (\w+)\s*\(', r'async def \1(', code) | |
| out = re.sub(r'\btime\.sleep\b', 'await asyncio.sleep', out) | |
| out = re.sub(r'\brequests\.get\b', 'await session.get', out) | |
| return "import asyncio\nimport aiohttp\n\n" + out | |
| # FN 20 β Estimate cost | |
| def estimate_cost(self, n_params=70_000_000, n_tokens=5_000_000, gpu="T4"): | |
| flops = 6 * n_params * n_tokens | |
| tp = {"T4":65e12,"A100":312e12,"V100":112e12}.get(gpu, 65e12) | |
| pr = {"T4":0.35, "A100":3.00, "V100":1.50 }.get(gpu, 0.35) | |
| h = flops / tp / 3600 | |
| return {"gpu":gpu,"est_hours":round(h,2),"est_cost_usd":round(h*pr,2)} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 17 AGENTS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Agent: | |
| def __init__(self, name, fn): self.name=name; self.fn=fn | |
| def run(self, *a, **k): raise NotImplementedError | |
| def ok(self, d): return {"agent":self.name,"status":"ok","result":d} | |
| def err(self, m): return {"agent":self.name,"status":"error","msg":m} | |
| class GenAgent(Agent): | |
| def __init__(self,fn): super().__init__("CodeGenerator",fn) | |
| def run(self,prompt,lang="python"): | |
| return self.ok({"code":self.fn.generate_code(prompt,lang),"lang":lang}) | |
| class BugAgent(Agent): | |
| def __init__(self,fn): super().__init__("BugDetector",fn) | |
| def run(self,code): | |
| b=self.fn.detect_bugs(code); d=self.fn.detect_dead_code(code) | |
| return self.ok({"bugs":b,"dead_code":d,"total":b["total"]+len(d),"healthy":b["total"]+len(d)==0}) | |
| class OptAgent(Agent): | |
| def __init__(self,fn): super().__init__("Optimizer",fn) | |
| def run(self,code): | |
| return self.ok({"optimized":self.fn.format_code(self.fn.optimize_code(code)), | |
| "complexity":self.fn.analyze_complexity(code)}) | |
| class DocAgent(Agent): | |
| def __init__(self,fn): super().__init__("Documentation",fn) | |
| def run(self,code): | |
| return self.ok({"docstrings":self.fn.generate_docs(code), | |
| "summary":self.fn.summarize_code(code), | |
| "functions":self.fn.extract_functions(code), | |
| "typed":self.fn.generate_type_hints(code)}) | |
| class TestAgent(Agent): | |
| def __init__(self,fn): super().__init__("TestGenerator",fn) | |
| def run(self,code,fw="pytest"): | |
| fns=self.fn.extract_functions(code) | |
| return self.ok({"tests":self.fn.generate_tests(code,fw),"framework":fw, | |
| "est_coverage":f"{min(len(fns)*25,95)}%"}) | |
| class SecAgent(Agent): | |
| def __init__(self,fn): super().__init__("SecurityScanner",fn) | |
| def run(self,code): | |
| v,r=self.fn.scan_security(code) | |
| return self.ok({"vulnerabilities":v,"risk_level":r,"is_safe":not v}) | |
| class RefAgent(Agent): | |
| def __init__(self,fn): super().__init__("Refactor",fn) | |
| def run(self,code): | |
| r=self.fn.refactor_code(code) | |
| before=self.fn.review_code(code)["score"] | |
| after=self.fn.review_code(r)["score"] | |
| return self.ok({"refactored":r,"score_before":before,"score_after":after}) | |
| class TrAgent(Agent): | |
| SUPPORTED=["javascript","java","cpp","typescript","go","rust","csharp","kotlin"] | |
| def __init__(self,fn): super().__init__("Translator",fn) | |
| def run(self,code,target="javascript"): | |
| if target not in self.SUPPORTED: return self.err(f"Choose: {self.SUPPORTED}") | |
| return self.ok({"translated":self.fn.translate_code(code,target),"target":target}) | |
| class RevAgent(Agent): | |
| def __init__(self,fn): super().__init__("CodeReviewer",fn) | |
| def run(self,code): | |
| r=self.fn.review_code(code); v,_=self.fn.scan_security(code) | |
| d=self.fn.detect_dead_code(code) | |
| s=max(r["score"]-len(v)*5-len(d)*2,0) | |
| return self.ok({"score":s,"grade":r["grade"],"issues":r["issues"], | |
| "security_flags":len(v), | |
| "recommendation":"β LGTM!" if s>=80 else "β Needs work"}) | |
| class CpxAgent(Agent): | |
| def __init__(self,fn): super().__init__("ComplexityAnalyzer",fn) | |
| def run(self,code): return self.ok(self.fn.analyze_complexity(code)) | |
| class ImpAgent(Agent): | |
| def __init__(self,fn): super().__init__("ImportManager",fn) | |
| def run(self,code): | |
| s=self.fn.suggest_imports(code) | |
| e=re.findall(r'^(?:import|from)\s+\S+',code,re.M) | |
| return self.ok({"suggested":s,"existing":e}) | |
| class FmtAgent(Agent): | |
| def __init__(self,fn): super().__init__("Formatter",fn) | |
| def run(self,code): | |
| f=self.fn.format_code(code) | |
| return self.ok({"formatted":self.fn.generate_type_hints(f)}) | |
| class ExpAgent(Agent): | |
| def __init__(self,fn): super().__init__("Explainer",fn) | |
| def run(self,code,level="beginner"): | |
| pre={"beginner":"Simply: ","expert":"Technical: "}.get(level,"") | |
| return self.ok({"explanation":pre+self.fn.explain_code(code), | |
| "summary":self.fn.summarize_code(code),"level":level}) | |
| class DcdAgent(Agent): | |
| def __init__(self,fn): super().__init__("DeadCodeDetector",fn) | |
| def run(self,code): | |
| d=self.fn.detect_dead_code(code) | |
| return self.ok({"items":d,"total":len(d)}) | |
| class PerfAgent(Agent): | |
| def __init__(self,fn): super().__init__("Profiler",fn) | |
| def run(self,code): | |
| sug=[] | |
| if re.search(r'for .+ in .+:\n.*\.append\(',code,re.S): sug.append("Use list comprehension") | |
| if re.search(r'for .* in range\(len\(',code): sug.append("Use enumerate()") | |
| if 'global ' in code: sug.append("Remove global variables") | |
| if re.search(r'\+= .+str',code): sug.append("Use ''.join() for strings") | |
| return self.ok({"suggestions":sug,"perf_score":max(100-len(sug)*15,10), | |
| "complexity":self.fn.analyze_complexity(code)}) | |
| class AsynAgent(Agent): | |
| def __init__(self,fn): super().__init__("AsyncConverter",fn) | |
| def run(self,code): | |
| return self.ok({"async_code":self.fn.convert_to_async(code)}) | |
| class Orchestrator: | |
| def __init__(self, fn): | |
| self.fn = fn | |
| self.mem = fn.mem | |
| self.agents = { | |
| "generate": GenAgent(fn), | |
| "bugs": BugAgent(fn), | |
| "optimize": OptAgent(fn), | |
| "docs": DocAgent(fn), | |
| "tests": TestAgent(fn), | |
| "security": SecAgent(fn), | |
| "refactor": RefAgent(fn), | |
| "translate": TrAgent(fn), | |
| "review": RevAgent(fn), | |
| "complexity": CpxAgent(fn), | |
| "imports": ImpAgent(fn), | |
| "format": FmtAgent(fn), | |
| "explain": ExpAgent(fn), | |
| "deadcode": DcdAgent(fn), | |
| "performance": PerfAgent(fn), | |
| "async": AsynAgent(fn), | |
| } | |
| print(f"β Orchestrator: {len(self.agents)} agents ready") | |
| def run(self, task, data, **kw): | |
| a = self.agents.get(task) | |
| if not a: return {"status":"error","msg":f"Unknown task: {task}"} | |
| self.mem.push({"agent":task,"summary":str(data)[:60]}) | |
| return a.run(data, **kw) | |
| def pipeline(self, code): | |
| t0, res = time.time(), {} | |
| for step in ["bugs","security","review","complexity","deadcode","performance"]: | |
| try: res[step] = self.agents[step].run(code) | |
| except Exception as e: res[step] = {"error": str(e)} | |
| res["_meta"] = {"elapsed_s": round(time.time()-t0,2), "memory": self.mem.stats()} | |
| return res | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # BUILD SYSTEM | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("π¨ Building CodeMind system...") | |
| model = CodeMindModel(cfg).to(device) | |
| memory = Memory() | |
| functions = Functions(model, memory) | |
| orc = Orchestrator(functions) | |
| # Auto-load checkpoint if available | |
| import glob as _glob | |
| _ckpts = sorted(_glob.glob("/tmp/*.pt") + | |
| _glob.glob("*.pt") + | |
| _glob.glob("checkpoints/*.pt")) | |
| if _ckpts: | |
| try: | |
| ck = torch.load(_ckpts[-1], map_location=device) | |
| model.load_state_dict(ck["model"]) | |
| print(f"β Checkpoint loaded: {_ckpts[-1]}") | |
| except Exception as e: | |
| print(f"β οΈ Checkpoint error: {e}") | |
| else: | |
| print("βΉοΈ No checkpoint found β static agents work perfectly right now!") | |
| print("β System ready!\n") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FASTAPI β API Key Protected | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| fapi = FastAPI(title="CodeMind AI API", version="3.0", docs_url="/docs") | |
| fapi.add_middleware(CORSMiddleware, | |
| allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| _key_header = APIKeyHeader(name="X-API-Key", auto_error=False) | |
| async def require_key(key: str = Depends(_key_header)): | |
| """Checks that exactly your key is used β nobody else can access the API.""" | |
| if key != API_KEY: | |
| raise HTTPException(status_code=401, detail={ | |
| "error": "β Invalid or missing API Key", | |
| "fix": "Add header X-API-Key: YOUR_KEY to your request", | |
| "get_key": "Set CODEMIND_API_KEY in HuggingFace Space β Settings β Secrets" | |
| }) | |
| return key | |
| class Req(BaseModel): | |
| code: str = "" | |
| prompt: str = "" | |
| lang: str = "python" | |
| target: str = "javascript" | |
| framework: str = "pytest" | |
| level: str = "beginner" | |
| max_tokens: int = 256 | |
| def _j(data): | |
| return JSONResponse(content=data if isinstance(data,dict) else {"result":data}) | |
| # ββ Public (no key needed) ββββββββββββββββββββββββββββββββββββββββ | |
| async def root(): | |
| return {"name":"CodeMind AI","version":"3.0","status":"online", | |
| "agents":len(orc.agents),"auth":"Add X-API-Key header","docs":"/docs"} | |
| async def health(): | |
| return {"status":"online","device":device, | |
| "agents":len(orc.agents),"memory":memory.stats(),"ts":time.time()} | |
| # ββ Protected (need your API key) βββββββββββββββββββββββββββββββββ | |
| async def ep_gen(r:Req): return _j(orc.run("generate", r.prompt, lang=r.lang)) | |
| async def ep_cmp(r:Req): return _j({"completion": functions.complete_code(r.code, r.max_tokens)}) | |
| async def ep_bug(r:Req): return _j(orc.run("bugs", r.code)) | |
| async def ep_opt(r:Req): return _j(orc.run("optimize", r.code)) | |
| async def ep_doc(r:Req): return _j(orc.run("docs", r.code)) | |
| async def ep_tst(r:Req): return _j(orc.run("tests", r.code, fw=r.framework)) | |
| async def ep_sec(r:Req): return _j(orc.run("security", r.code)) | |
| async def ep_ref(r:Req): return _j(orc.run("refactor", r.code)) | |
| async def ep_tr(r:Req): return _j(orc.run("translate", r.code, target=r.target)) | |
| async def ep_rev(r:Req): return _j(orc.run("review", r.code)) | |
| async def ep_cpx(r:Req): return _j(orc.run("complexity", r.code)) | |
| async def ep_imp(r:Req): return _j(orc.run("imports", r.code)) | |
| async def ep_fmt(r:Req): return _j(orc.run("format", r.code)) | |
| async def ep_exp(r:Req): return _j(orc.run("explain", r.code, level=r.level)) | |
| async def ep_dcd(r:Req): return _j(orc.run("deadcode", r.code)) | |
| async def ep_prf(r:Req): return _j(orc.run("performance", r.code)) | |
| async def ep_asn(r:Req): return _j(orc.run("async", r.code)) | |
| async def ep_pip(r:Req): return _j(orc.pipeline(r.code)) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GRADIO UI β 14 tabs, API key gate (BUG FIXED) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _CSS = """ | |
| .gradio-container { max-width:1100px !important; font-family: monospace; } | |
| footer { display: none !important; } | |
| """ | |
| def _str(task, data, key, **kw): | |
| try: | |
| r = orc.run(task, data, **kw) | |
| return (r["result"].get(key,"") if r.get("status")=="ok" | |
| else r.get("msg","Error")) | |
| except Exception as e: | |
| return f"β {e}" | |
| # ββ β FIXED: _check_key now returns to named components key_msg & tabs_col | |
| def _check_key(k): | |
| if k.strip() == API_KEY: | |
| return "β Correct Key! All 14 tabs unlocked π", gr.update(visible=True) | |
| return "β Wrong API Key β check your key and try again.", gr.update(visible=False) | |
| with gr.Blocks(title="π§ CodeMind AI", theme=gr.themes.Soft(), css=_CSS) as ui: | |
| gr.Markdown("# π§ CodeMind AI β Always Online\n" | |
| "**17 Agents Β· 20 Functions Β· GQA + RoPE + SwiGLU Β· REST API**") | |
| # ββ API Key Gate βββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Group(): | |
| gr.Markdown("### π Enter Your API Key to Unlock") | |
| with gr.Row(): | |
| ui_key = gr.Textbox(placeholder="Paste your API key...", | |
| label="API Key", type="password", scale=4) | |
| unlock_btn = gr.Button("π Unlock", variant="primary", scale=1) | |
| # β FIXED: key_msg and tabs_col are proper named components | |
| key_msg = gr.Markdown("*Enter your API key above and click Unlock*") | |
| tabs_col = gr.Column(visible=False) | |
| with tabs_col: | |
| with gr.Tabs(): | |
| with gr.Tab("π Generate"): | |
| p1 = gr.Textbox(label="Describe what to build", lines=3, | |
| placeholder="e.g. binary search function") | |
| l1 = gr.Dropdown(["python","javascript","java","cpp","go","rust"], | |
| value="python", label="Language") | |
| o1 = gr.Code(label="Generated Code", language="python", lines=18) | |
| gr.Button("β‘ Generate", variant="primary").click( | |
| lambda p,l: _str("generate",p,"code",lang=l), [p1,l1], o1) | |
| with gr.Tab("π Bug Detect"): | |
| with gr.Row(): | |
| i2 = gr.Code(label="Paste Code", language="python", lines=15) | |
| o2 = gr.JSON(label="Bug Report") | |
| gr.Button("π Find Bugs", variant="primary").click( | |
| lambda c: orc.run("bugs",c)["result"], i2, o2) | |
| with gr.Tab("π Security"): | |
| with gr.Row(): | |
| i3 = gr.Code(label="Paste Code", language="python", lines=15) | |
| o3 = gr.JSON(label="Security Report") | |
| gr.Button("π‘οΈ Scan Security", variant="primary").click( | |
| lambda c: orc.run("security",c)["result"], i3, o3) | |
| with gr.Tab("π Review"): | |
| with gr.Row(): | |
| i4 = gr.Code(label="Code to Review", language="python", lines=15) | |
| o4 = gr.JSON(label="Review Report") | |
| gr.Button("π Review Code", variant="primary").click( | |
| lambda c: orc.run("review",c)["result"], i4, o4) | |
| with gr.Tab("β‘ Optimize"): | |
| with gr.Row(): | |
| i5 = gr.Code(label="Original Code", language="python", lines=14) | |
| o5 = gr.Code(label="Optimized Code", language="python", lines=14) | |
| gr.Button("π Optimize", variant="primary").click( | |
| lambda c: _str("optimize",c,"optimized"), i5, o5) | |
| with gr.Tab("π Translate"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| i6 = gr.Code(label="Python Code", language="python", lines=14) | |
| t6 = gr.Dropdown(["javascript","java","cpp","typescript","go","rust"], | |
| value="javascript", label="Target Language") | |
| o6 = gr.Code(label="Translated Code", lines=14) | |
| gr.Button("π Translate", variant="primary").click( | |
| lambda c,t: _str("translate",c,"translated",target=t), [i6,t6], o6) | |
| with gr.Tab("π§ͺ Tests"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| i7 = gr.Code(label="Code to Test", language="python", lines=12) | |
| f7 = gr.Radio(["pytest","unittest"], value="pytest", label="Framework") | |
| o7 = gr.Code(label="Unit Tests", language="python", lines=12) | |
| gr.Button("π§ͺ Generate Tests", variant="primary").click( | |
| lambda c,f: _str("tests",c,"tests",fw=f), [i7,f7], o7) | |
| with gr.Tab("π Docs"): | |
| with gr.Row(): | |
| i8 = gr.Code(label="Code", language="python", lines=12) | |
| o8 = gr.Code(label="Documented Code", language="python", lines=12) | |
| gr.Button("π Generate Docs", variant="primary").click( | |
| lambda c: _str("docs",c,"docstrings"), i8, o8) | |
| with gr.Tab("π¬ Explain"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| i9 = gr.Code(label="Code to Explain", language="python", lines=12) | |
| l9 = gr.Radio(["beginner","expert"], value="beginner", label="Level") | |
| o9 = gr.Textbox(label="Explanation", lines=12) | |
| gr.Button("π‘ Explain Code", variant="primary").click( | |
| lambda c,l: _str("explain",c,"explanation",level=l), [i9,l9], o9) | |
| with gr.Tab("π§ Refactor"): | |
| with gr.Row(): | |
| i10 = gr.Code(label="Messy Code", language="python", lines=13) | |
| o10 = gr.Code(label="Clean Code", language="python", lines=13) | |
| gr.Button("π§ Refactor", variant="primary").click( | |
| lambda c: _str("refactor",c,"refactored"), i10, o10) | |
| with gr.Tab("π Complexity"): | |
| with gr.Row(): | |
| i11 = gr.Code(label="Code", language="python", lines=12) | |
| o11 = gr.JSON(label="Complexity Analysis") | |
| gr.Button("π Analyze", variant="primary").click( | |
| lambda c: orc.run("complexity",c)["result"], i11, o11) | |
| with gr.Tab("β‘ Profiler"): | |
| with gr.Row(): | |
| i12 = gr.Code(label="Code", language="python", lines=12) | |
| o12 = gr.JSON(label="Performance Report") | |
| gr.Button("β‘ Profile", variant="primary").click( | |
| lambda c: orc.run("performance",c)["result"], i12, o12) | |
| with gr.Tab("βοΈ Async"): | |
| with gr.Row(): | |
| i13 = gr.Code(label="Sync Code", language="python", lines=12) | |
| o13 = gr.Code(label="Async Code", language="python", lines=12) | |
| gr.Button("βοΈ Convert to Async", variant="primary").click( | |
| lambda c: _str("async",c,"async_code"), i13, o13) | |
| with gr.Tab("π¬ Full Pipeline"): | |
| with gr.Row(): | |
| i14 = gr.Code(label="Code β all 17 agents run on this", | |
| language="python", lines=12) | |
| o14 = gr.JSON(label="Full Report") | |
| gr.Button("π¬ Run All Agents", variant="primary").click( | |
| orc.pipeline, i14, o14) | |
| gr.Markdown(""" | |
| --- | |
| ### π‘ Use via API from anywhere | |
| ```python | |
| import requests | |
| response = requests.post( | |
| "https://YOUR_HF_USERNAME-codemindai.hf.space/api/bugs", | |
| headers={"X-API-Key": "YOUR_API_KEY"}, | |
| json={"code": "paste your code here"} | |
| ) | |
| print(response.json()) | |
| ``` | |
| **All endpoints:** /api/generate Β· /api/bugs Β· /api/security Β· /api/review | |
| Β· /api/optimize Β· /api/translate Β· /api/tests Β· /api/docs Β· /api/pipeline | |
| Β· GET /health Β· GET /docs (Swagger) | |
| """) | |
| # β FIXED: Unlock button correctly wired to key_msg and tabs_col | |
| unlock_btn.click(_check_key, inputs=ui_key, outputs=[key_msg, tabs_col]) | |
| ui_key.submit(_check_key, inputs=ui_key, outputs=[key_msg, tabs_col]) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MOUNT GRADIO INSIDE FASTAPI & LAUNCH | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = gr.mount_gradio_app(fapi, ui, path="/") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860, log_level="warning") | |