Rixf123's picture
Create App
94b92ae verified
# ╔══════════════════════════════════════════════════════════════════╗
# β•‘ CodeMind AI β€” HuggingFace Spaces app.py FINAL v3 β•‘
# β•‘ βœ… All bugs fixed βœ… Always Online βœ… API Key β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
#
# ══════════════════════════════════════════════════════════════════
# HOW TO SET YOUR API KEY IN HUGGINGFACE (do this ONCE):
#
# 1. Open your Space β†’ click Settings
# 2. Scroll to "Variables and Secrets"
# 3. Click "New Secret"
# 4. Name β†’ CODEMIND_API_KEY
# 5. Value β†’ cm-7305859eb37f36f24169eeb16f6bef6ff3ad825cddc60254d2c124d3ccb9f9c2
# 6. Click Save βœ…
#
# That's it! Your AI is now protected with your API key.
# ══════════════════════════════════════════════════════════════════
import os, re, ast, json, time, math, random, hashlib
import warnings; warnings.filterwarnings("ignore")
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import GPT2TokenizerFast
from dataclasses import dataclass
from typing import List, Dict, Optional, Any, Tuple
from collections import deque
import gradio as gr
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
# ══════════════════════════════════════════════════════════════════
# DEVICE
# ══════════════════════════════════════════════════════════════════
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"πŸš€ CodeMind AI | Device: {device.upper()}")
# ══════════════════════════════════════════════════════════════════
# API KEY ← loaded from HuggingFace Secret CODEMIND_API_KEY
# ══════════════════════════════════════════════════════════════════
API_KEY = os.environ.get("CODEMIND_API_KEY", "codemind-change-me")
print("βœ… API Key ready" if "change-me" not in API_KEY
else "⚠️ Set CODEMIND_API_KEY in HF Space β†’ Settings β†’ Secrets!")
# ══════════════════════════════════════════════════════════════════
# CONFIG
# ══════════════════════════════════════════════════════════════════
@dataclass
class Config:
vocab_size: int = 50304
n_embd: int = 512
n_head: int = 8
n_kv_head: int = 4
n_layer: int = 8
block_size: int = 512
dropout: float = 0.1
temperature: float = 0.8
top_k: int = 50
top_p: float = 0.95
rep_penalty: float = 1.1
max_new_tokens: int = 256
cfg = Config()
random.seed(42); torch.manual_seed(42)
# ══════════════════════════════════════════════════════════════════
# TOKENIZER
# ══════════════════════════════════════════════════════════════════
print("πŸ“ Loading tokenizer...")
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
_SPECIAL = [
'<|generate|>','<|complete|>','<|explain|>','<|bugfix|>',
'<|optimize|>','<|translate|>','<|docstring|>','<|unittest|>',
'<|review|>','<|refactor|>','<|security|>','<|complexity|>',
'<|async|>','<|typehint|>','<|format|>','<|endofcode|>',
'<|python|>','<|javascript|>','<|java|>','<|cpp|>',
'<|typescript|>','<|go|>','<|rust|>',
]
tokenizer.add_special_tokens({'additional_special_tokens': _SPECIAL})
cfg.vocab_size = len(tokenizer)
print(f"βœ… Vocab: {cfg.vocab_size:,}")
# ══════════════════════════════════════════════════════════════════
# MODEL β€” GQA + RoPE + SwiGLU + RMSNorm + KV-Cache (~70M params)
# ══════════════════════════════════════════════════════════════════
class RMSNorm(nn.Module):
def __init__(self, d, eps=1e-8):
super().__init__()
self.scale = nn.Parameter(torch.ones(d)); self.eps = eps
def forward(self, x):
return self.scale * x / (x.pow(2).mean(-1,keepdim=True).add(self.eps).sqrt())
class RotaryEmbedding(nn.Module):
def __init__(self, dim):
super().__init__()
self.register_buffer("inv_freq",
1.0 / (10000 ** (torch.arange(0,dim,2).float()/dim)))
def forward(self, T, dev):
t = torch.arange(T, device=dev).float()
f = torch.outer(t, self.inv_freq)
e = torch.cat([f, f], dim=-1)
return e.cos(), e.sin()
def _rot(x):
a, b = x.chunk(2, dim=-1)
return torch.cat([-b, a], dim=-1)
def apply_rope(q, k, cos, sin):
c, s = cos[None,None], sin[None,None]
return (q*c)+(_rot(q)*s), (k*c)+(_rot(k)*s)
class GQA(nn.Module):
def __init__(self, cfg):
super().__init__()
self.nh = cfg.n_head
self.nkv = cfg.n_kv_head
self.hd = cfg.n_embd // cfg.n_head
self.q = nn.Linear(cfg.n_embd, cfg.n_embd, bias=False)
self.k = nn.Linear(cfg.n_embd, self.nkv*self.hd, bias=False)
self.v = nn.Linear(cfg.n_embd, self.nkv*self.hd, bias=False)
self.o = nn.Linear(cfg.n_embd, cfg.n_embd, bias=False)
self.rope = RotaryEmbedding(self.hd)
def forward(self, x, cache=None):
B, T, C = x.shape
cos, sin = self.rope(T, x.device)
q = self.q(x).view(B,T,self.nh, self.hd).transpose(1,2)
k = self.k(x).view(B,T,self.nkv,self.hd).transpose(1,2)
v = self.v(x).view(B,T,self.nkv,self.hd).transpose(1,2)
q, k = apply_rope(q, k, cos, sin)
if cache is not None:
k = torch.cat([cache[0], k], dim=2)
v = torch.cat([cache[1], v], dim=2)
new_cache = (k.detach(), v.detach())
k = k.repeat_interleave(self.nh // self.nkv, dim=1)
v = v.repeat_interleave(self.nh // self.nkv, dim=1)
out = F.scaled_dot_product_attention(q, k, v, is_causal=True, dropout_p=0.0)
return self.o(out.transpose(1,2).contiguous().view(B,T,C)), new_cache
class SwiGLU(nn.Module):
def __init__(self, cfg):
super().__init__()
h = int(cfg.n_embd * 8/3)
self.w1 = nn.Linear(cfg.n_embd, h, bias=False)
self.w2 = nn.Linear(h, cfg.n_embd, bias=False)
self.w3 = nn.Linear(cfg.n_embd, h, bias=False)
def forward(self, x):
return self.w2(F.silu(self.w1(x)) * self.w3(x))
class Block(nn.Module):
def __init__(self, cfg):
super().__init__()
self.n1 = RMSNorm(cfg.n_embd)
self.n2 = RMSNorm(cfg.n_embd)
self.attn = GQA(cfg)
self.mlp = SwiGLU(cfg)
def forward(self, x, cache=None):
a, c = self.attn(self.n1(x), cache)
x = x + a
x = x + self.mlp(self.n2(x))
return x, c
class CodeMindModel(nn.Module):
def __init__(self, cfg):
super().__init__()
self.emb = nn.Embedding(cfg.vocab_size, cfg.n_embd)
self.blocks = nn.ModuleList([Block(cfg) for _ in range(cfg.n_layer)])
self.norm = RMSNorm(cfg.n_embd)
self.head = nn.Linear(cfg.n_embd, cfg.vocab_size, bias=False)
self.emb.weight = self.head.weight # weight tying
self.apply(lambda m: nn.init.normal_(m.weight, 0, 0.02)
if isinstance(m, (nn.Linear, nn.Embedding)) else None)
n = sum(p.numel() for p in self.parameters())
print(f"🧠 CodeMind: {n/1e6:.1f}M params | GQA+RoPE+SwiGLU+RMSNorm")
def forward(self, idx, targets=None, caches=None):
x = self.emb(idx)
nc = []
for i, b in enumerate(self.blocks):
x, c = b(x, caches[i] if caches else None)
nc.append(c)
logits = self.head(self.norm(x))
loss = (F.cross_entropy(logits.view(-1, logits.size(-1)),
targets.view(-1), ignore_index=-1)
if targets is not None else None)
return logits, loss, nc
@torch.no_grad()
def generate(self, ids, max_t=256):
self.eval()
caches = None
start = ids.shape[1]
for _ in range(max_t):
inp = ids[:, -cfg.block_size:]
logits, _, caches = self(inp, caches=caches)
logits = logits[:, -1, :].float() / cfg.temperature
# top-k
v, _ = torch.topk(logits, min(cfg.top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = float('-inf')
# top-p
probs = F.softmax(logits, dim=-1)
sp, si = torch.sort(probs, descending=True)
cp = sp.cumsum(-1)
sp[cp-sp > cfg.top_p] = 0.0
probs = torch.zeros_like(probs).scatter_(1, si, sp)
probs /= probs.sum(-1, keepdim=True).clamp(1e-8)
# repetition penalty
for tid in set(ids[0, -20:].tolist()):
if probs[0, tid] > 0:
probs[0, tid] /= cfg.rep_penalty
probs /= probs.sum(-1, keepdim=True).clamp(1e-8)
nxt = torch.multinomial(probs, 1)
if nxt.item() == tokenizer.eos_token_id:
break
ids = torch.cat([ids, nxt], dim=1)
return tokenizer.decode(ids[0, start:].tolist(),
skip_special_tokens=True).strip()
# ══════════════════════════════════════════════════════════════════
# MEMORY
# ══════════════════════════════════════════════════════════════════
class Memory:
def __init__(self):
self.short = deque(maxlen=20)
self.cache: Dict[str, Any] = {}
self.history: List[Dict] = []
def push(self, e):
self.short.append(e)
self.history.append({**e, "ts": time.time()})
def get(self, code, kind):
return self.cache.get(f"{hashlib.md5(code.encode()).hexdigest()}_{kind}")
def set(self, code, kind, v):
self.cache[f"{hashlib.md5(code.encode()).hexdigest()}_{kind}"] = v
def stats(self):
return {"interactions": len(self.history), "cache": len(self.cache)}
# ══════════════════════════════════════════════════════════════════
# 20 FUNCTIONS
# ══════════════════════════════════════════════════════════════════
class Functions:
def __init__(self, model, mem):
self.model = model
self.mem = mem
def _gen(self, prompt, max_t=128):
ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
return self.model.generate(ids[:, -cfg.block_size:], max_t)
# FN 1 β€” Generate code
def generate_code(self, prompt, lang="python", max_t=256):
lt = f"<|{lang}|>" if f"<|{lang}|>" in _SPECIAL else ""
out = self._gen(f"{lt}<|generate|># Task: {prompt}\n", max_t)
imp = self.suggest_imports(out)
return ("\n".join(imp) + "\n\n" + out) if imp else out
# FN 2 β€” Complete code
def complete_code(self, partial, max_t=128):
return self._gen(f"<|complete|>\n{partial}", max_t)
# FN 3 β€” Explain code
def explain_code(self, code):
c = self.mem.get(code, "explain")
if c: return c
r = self._gen(f"<|explain|>\n{code[:400]}\n# Explanation:", 200)
self.mem.set(code, "explain", r)
return r
# FN 4 β€” Detect bugs
def detect_bugs(self, code):
bugs = []
try: ast.parse(code); ok = True
except SyntaxError as e:
ok = False
bugs.append({"type": "SyntaxError", "line": e.lineno, "msg": str(e)})
rules = [
(r'== None', "StyleWarning", "Use 'is None'"),
(r'!= None', "StyleWarning", "Use 'is not None'"),
(r'except:\s*$', "BestPractice", "Bare except clause"),
(r'print\s*\(', "DebugCode", "Debug print left in"),
(r'TODO|FIXME', "Incomplete", "Unresolved TODO/FIXME"),
]
for i, line in enumerate(code.split('\n'), 1):
for pat, kind, msg in rules:
if re.search(pat, line):
bugs.append({"type": kind, "line": i, "msg": msg})
return {"syntax_ok": ok, "bugs": bugs, "total": len(bugs),
"model_tip": self._gen(f"<|bugfix|>\n{code[:300]}\n# Bug:", 80)}
# FN 5 β€” Optimize
def optimize_code(self, code):
return self._gen(f"<|optimize|>\n# Original:\n{code[:400]}\n# Optimized:", 256)
# FN 6 β€” Translate
def translate_code(self, code, target="javascript"):
return self._gen(f"<|translate|>\n# Python:\n{code[:400]}\n# {target}:", 300)
# FN 7 β€” Generate docs
def generate_docs(self, code):
return self._gen(f"<|docstring|>\n{code[:400]}\n# With docstrings:", 300)
# FN 8 β€” Generate tests
def generate_tests(self, code, fw="pytest"):
return self._gen(f"<|unittest|>\n{code[:350]}\n# {fw} tests:", 350)
# FN 9 β€” Review code
def review_code(self, code):
lines = [l for l in code.split('\n') if l.strip()]
score, iss = 100, []
if '"""' not in code: score -= 20; iss.append("❌ No docstrings")
if '->' not in code: score -= 10; iss.append("⚠️ No type hints")
if not any(l.strip().startswith('#') for l in code.split('\n')):
score -= 10; iss.append("⚠️ No comments")
if len(lines) > 50: score -= 15; iss.append("⚠️ Too long β€” split it")
g = "A" if score>=90 else "B" if score>=75 else "C" if score>=60 else "D"
return {"score": max(score,0), "grade": g, "issues": iss, "loc": len(lines)}
# FN 10 β€” Complexity
def analyze_complexity(self, code):
md = 0
for line in code.split('\n'):
s = line.lstrip()
if s.startswith(('for ','while ')):
md = max(md, (len(line)-len(s))//4+1)
tm = {0:"O(1)",1:"O(n)",2:"O(nΒ²)",3:"O(nΒ³)"}.get(md, f"O(n^{md})")
sp = "O(n)" if re.search(r'\bappend\b|\[\]', code) else "O(1)"
for fn in re.findall(r'def (\w+)\s*\(', code):
if f'{fn}(' in code.split(f'def {fn}')[-1]:
tm = "O(2ⁿ) β€” Recursive"; sp = "O(n) stack"; break
return {"time": tm, "space": sp, "loop_depth": md}
# FN 11 β€” Suggest imports
def suggest_imports(self, code):
MAP = {
r'\bpd\.': "import pandas as pd",
r'\bnp\.': "import numpy as np",
r'\bplt\.': "import matplotlib.pyplot as plt",
r'\btorch\b': "import torch",
r'\bos\b': "import os",
r'\bre\b': "import re",
r'\bmath\b': "import math",
r'\bjson\b': "import json",
r'\brandom\b': "import random",
r'\bsys\b': "import sys",
r'\btime\b': "import time",
r'\btyping\b': "from typing import List, Dict, Any",
r'\bcollections\b': "from collections import defaultdict, deque",
r'\bpathlib\b': "from pathlib import Path",
}
ex = set(re.findall(r'(?:import|from)\s+(\w+)', code))
return [s for p,s in MAP.items()
if re.search(p,code) and s.split()[-1].split('.')[0] not in ex]
# FN 12 β€” Format code
def format_code(self, code):
lines = []
for line in code.split('\n'):
line = re.sub(r'(?<![=!<>])=(?!=)', ' = ', line)
line = re.sub(r'(?<! ),', ', ', line)
lines.append(line.rstrip())
return '\n'.join(lines).rstrip() + '\n'
# FN 13 β€” Summarize
def summarize_code(self, code):
fns = re.findall(r'def (\w+)', code)
cls = re.findall(r'class (\w+)', code)
lns = [l for l in code.split('\n') if l.strip()]
parts = []
if cls: parts.append(f"Classes: {', '.join(cls)}")
if fns: parts.append(f"Functions: {', '.join(fns)}")
parts.append(f"{len(lns)} lines")
return " | ".join(parts)
# FN 14 β€” Dead code
def detect_dead_code(self, code):
dead = []
try: tree = ast.parse(code)
except: return [{"type":"ParseError","msg":"Cannot parse"}]
assigned, used = set(), set()
for n in ast.walk(tree):
if isinstance(n, ast.Assign):
for t in n.targets:
if isinstance(t, ast.Name): assigned.add(t.id)
elif isinstance(n, ast.Name) and not isinstance(n.ctx, ast.Store):
used.add(n.id)
for v in (assigned - used - {'self','_'}):
dead.append({"type":"UnusedVariable","name":v,"msg":f"'{v}' never used"})
for i, line in enumerate(code.split('\n')):
if line.strip().startswith('return'):
nxt = [l.strip() for l in code.split('\n')[i+1:] if l.strip()]
if nxt and not nxt[0].startswith(('#','def ','class ')):
dead.append({"type":"UnreachableCode","line":i+2,"msg":"Code after return"})
break
return dead
# FN 15 β€” Security scan
def scan_security(self, code):
checks = [
(r'\beval\s*\(', "CRITICAL", "eval() is dangerous"),
(r'\bexec\s*\(', "CRITICAL", "exec() is dangerous"),
(r'os\.system\s*\(', "HIGH", "os.system() risk"),
(r'pickle\.loads?\s*\(', "HIGH", "Unsafe pickle"),
(r'shell\s*=\s*True', "HIGH", "shell=True injection"),
(r'password\s*=\s*["\']',"HIGH", "Hardcoded password"),
(r'api_key\s*=\s*["\']', "HIGH", "Hardcoded API key"),
(r'secret\s*=\s*["\']', "HIGH", "Hardcoded secret"),
(r'\bmd5\b', "MEDIUM", "MD5 is broken"),
(r'http://', "LOW", "Use HTTPS not HTTP"),
]
vulns = []
for i, line in enumerate(code.split('\n'), 1):
for pat, sev, msg in checks:
if re.search(pat, line, re.I):
vulns.append({"line":i,"severity":sev,"msg":msg,"snippet":line.strip()[:60]})
order = {"CRITICAL":0,"HIGH":1,"MEDIUM":2,"LOW":3}
risk = ("CRITICAL" if any(v["severity"]=="CRITICAL" for v in vulns)
else "HIGH" if any(v["severity"]=="HIGH" for v in vulns)
else "MEDIUM" if vulns else "SAFE")
return sorted(vulns, key=lambda x: order.get(x["severity"],9)), risk
# FN 16 β€” Type hints
def generate_type_hints(self, code):
lines, out = code.split('\n'), []
for line in lines:
m = re.match(r'(\s*def \w+\()(.*)(\):.*)', line)
if m and '->' not in line:
typed = []
for p in m.group(2).split(','):
p = p.strip()
if not p or p=='self': typed.append(p)
elif any(k in p for k in ('name','text','msg','key')): typed.append(f"{p}: str")
elif any(k in p for k in ('num','count','n','i','k')): typed.append(f"{p}: int")
elif any(k in p for k in ('flag','is_','has_')): typed.append(f"{p}: bool")
elif any(k in p for k in ('list','arr','data','items')):typed.append(f"{p}: list")
else: typed.append(f"{p}: Any")
out.append(f"{m.group(1)}{', '.join(typed)}) -> Any:")
else:
out.append(line)
return '\n'.join(out)
# FN 17 β€” Refactor
def refactor_code(self, code):
return self._gen(f"<|refactor|>\n# Messy:\n{code[:400]}\n# Clean:", 300)
# FN 18 β€” Extract functions
def extract_functions(self, code):
try: tree = ast.parse(code)
except Exception as e: return [{"error": str(e)}]
return [{"name": n.name,
"args": [a.arg for a in n.args.args],
"line": n.lineno,
"docstring": (ast.get_docstring(n) or "")[:60],
"has_return": any(isinstance(x, ast.Return) for x in ast.walk(n))}
for n in ast.walk(tree) if isinstance(n, ast.FunctionDef)]
# FN 19 β€” Convert to async
def convert_to_async(self, code):
out = re.sub(r'\bdef (\w+)\s*\(', r'async def \1(', code)
out = re.sub(r'\btime\.sleep\b', 'await asyncio.sleep', out)
out = re.sub(r'\brequests\.get\b', 'await session.get', out)
return "import asyncio\nimport aiohttp\n\n" + out
# FN 20 β€” Estimate cost
def estimate_cost(self, n_params=70_000_000, n_tokens=5_000_000, gpu="T4"):
flops = 6 * n_params * n_tokens
tp = {"T4":65e12,"A100":312e12,"V100":112e12}.get(gpu, 65e12)
pr = {"T4":0.35, "A100":3.00, "V100":1.50 }.get(gpu, 0.35)
h = flops / tp / 3600
return {"gpu":gpu,"est_hours":round(h,2),"est_cost_usd":round(h*pr,2)}
# ══════════════════════════════════════════════════════════════════
# 17 AGENTS
# ══════════════════════════════════════════════════════════════════
class Agent:
def __init__(self, name, fn): self.name=name; self.fn=fn
def run(self, *a, **k): raise NotImplementedError
def ok(self, d): return {"agent":self.name,"status":"ok","result":d}
def err(self, m): return {"agent":self.name,"status":"error","msg":m}
class GenAgent(Agent):
def __init__(self,fn): super().__init__("CodeGenerator",fn)
def run(self,prompt,lang="python"):
return self.ok({"code":self.fn.generate_code(prompt,lang),"lang":lang})
class BugAgent(Agent):
def __init__(self,fn): super().__init__("BugDetector",fn)
def run(self,code):
b=self.fn.detect_bugs(code); d=self.fn.detect_dead_code(code)
return self.ok({"bugs":b,"dead_code":d,"total":b["total"]+len(d),"healthy":b["total"]+len(d)==0})
class OptAgent(Agent):
def __init__(self,fn): super().__init__("Optimizer",fn)
def run(self,code):
return self.ok({"optimized":self.fn.format_code(self.fn.optimize_code(code)),
"complexity":self.fn.analyze_complexity(code)})
class DocAgent(Agent):
def __init__(self,fn): super().__init__("Documentation",fn)
def run(self,code):
return self.ok({"docstrings":self.fn.generate_docs(code),
"summary":self.fn.summarize_code(code),
"functions":self.fn.extract_functions(code),
"typed":self.fn.generate_type_hints(code)})
class TestAgent(Agent):
def __init__(self,fn): super().__init__("TestGenerator",fn)
def run(self,code,fw="pytest"):
fns=self.fn.extract_functions(code)
return self.ok({"tests":self.fn.generate_tests(code,fw),"framework":fw,
"est_coverage":f"{min(len(fns)*25,95)}%"})
class SecAgent(Agent):
def __init__(self,fn): super().__init__("SecurityScanner",fn)
def run(self,code):
v,r=self.fn.scan_security(code)
return self.ok({"vulnerabilities":v,"risk_level":r,"is_safe":not v})
class RefAgent(Agent):
def __init__(self,fn): super().__init__("Refactor",fn)
def run(self,code):
r=self.fn.refactor_code(code)
before=self.fn.review_code(code)["score"]
after=self.fn.review_code(r)["score"]
return self.ok({"refactored":r,"score_before":before,"score_after":after})
class TrAgent(Agent):
SUPPORTED=["javascript","java","cpp","typescript","go","rust","csharp","kotlin"]
def __init__(self,fn): super().__init__("Translator",fn)
def run(self,code,target="javascript"):
if target not in self.SUPPORTED: return self.err(f"Choose: {self.SUPPORTED}")
return self.ok({"translated":self.fn.translate_code(code,target),"target":target})
class RevAgent(Agent):
def __init__(self,fn): super().__init__("CodeReviewer",fn)
def run(self,code):
r=self.fn.review_code(code); v,_=self.fn.scan_security(code)
d=self.fn.detect_dead_code(code)
s=max(r["score"]-len(v)*5-len(d)*2,0)
return self.ok({"score":s,"grade":r["grade"],"issues":r["issues"],
"security_flags":len(v),
"recommendation":"βœ… LGTM!" if s>=80 else "❌ Needs work"})
class CpxAgent(Agent):
def __init__(self,fn): super().__init__("ComplexityAnalyzer",fn)
def run(self,code): return self.ok(self.fn.analyze_complexity(code))
class ImpAgent(Agent):
def __init__(self,fn): super().__init__("ImportManager",fn)
def run(self,code):
s=self.fn.suggest_imports(code)
e=re.findall(r'^(?:import|from)\s+\S+',code,re.M)
return self.ok({"suggested":s,"existing":e})
class FmtAgent(Agent):
def __init__(self,fn): super().__init__("Formatter",fn)
def run(self,code):
f=self.fn.format_code(code)
return self.ok({"formatted":self.fn.generate_type_hints(f)})
class ExpAgent(Agent):
def __init__(self,fn): super().__init__("Explainer",fn)
def run(self,code,level="beginner"):
pre={"beginner":"Simply: ","expert":"Technical: "}.get(level,"")
return self.ok({"explanation":pre+self.fn.explain_code(code),
"summary":self.fn.summarize_code(code),"level":level})
class DcdAgent(Agent):
def __init__(self,fn): super().__init__("DeadCodeDetector",fn)
def run(self,code):
d=self.fn.detect_dead_code(code)
return self.ok({"items":d,"total":len(d)})
class PerfAgent(Agent):
def __init__(self,fn): super().__init__("Profiler",fn)
def run(self,code):
sug=[]
if re.search(r'for .+ in .+:\n.*\.append\(',code,re.S): sug.append("Use list comprehension")
if re.search(r'for .* in range\(len\(',code): sug.append("Use enumerate()")
if 'global ' in code: sug.append("Remove global variables")
if re.search(r'\+= .+str',code): sug.append("Use ''.join() for strings")
return self.ok({"suggestions":sug,"perf_score":max(100-len(sug)*15,10),
"complexity":self.fn.analyze_complexity(code)})
class AsynAgent(Agent):
def __init__(self,fn): super().__init__("AsyncConverter",fn)
def run(self,code):
return self.ok({"async_code":self.fn.convert_to_async(code)})
class Orchestrator:
def __init__(self, fn):
self.fn = fn
self.mem = fn.mem
self.agents = {
"generate": GenAgent(fn),
"bugs": BugAgent(fn),
"optimize": OptAgent(fn),
"docs": DocAgent(fn),
"tests": TestAgent(fn),
"security": SecAgent(fn),
"refactor": RefAgent(fn),
"translate": TrAgent(fn),
"review": RevAgent(fn),
"complexity": CpxAgent(fn),
"imports": ImpAgent(fn),
"format": FmtAgent(fn),
"explain": ExpAgent(fn),
"deadcode": DcdAgent(fn),
"performance": PerfAgent(fn),
"async": AsynAgent(fn),
}
print(f"βœ… Orchestrator: {len(self.agents)} agents ready")
def run(self, task, data, **kw):
a = self.agents.get(task)
if not a: return {"status":"error","msg":f"Unknown task: {task}"}
self.mem.push({"agent":task,"summary":str(data)[:60]})
return a.run(data, **kw)
def pipeline(self, code):
t0, res = time.time(), {}
for step in ["bugs","security","review","complexity","deadcode","performance"]:
try: res[step] = self.agents[step].run(code)
except Exception as e: res[step] = {"error": str(e)}
res["_meta"] = {"elapsed_s": round(time.time()-t0,2), "memory": self.mem.stats()}
return res
# ══════════════════════════════════════════════════════════════════
# BUILD SYSTEM
# ══════════════════════════════════════════════════════════════════
print("πŸ”¨ Building CodeMind system...")
model = CodeMindModel(cfg).to(device)
memory = Memory()
functions = Functions(model, memory)
orc = Orchestrator(functions)
# Auto-load checkpoint if available
import glob as _glob
_ckpts = sorted(_glob.glob("/tmp/*.pt") +
_glob.glob("*.pt") +
_glob.glob("checkpoints/*.pt"))
if _ckpts:
try:
ck = torch.load(_ckpts[-1], map_location=device)
model.load_state_dict(ck["model"])
print(f"βœ… Checkpoint loaded: {_ckpts[-1]}")
except Exception as e:
print(f"⚠️ Checkpoint error: {e}")
else:
print("ℹ️ No checkpoint found β€” static agents work perfectly right now!")
print("βœ… System ready!\n")
# ══════════════════════════════════════════════════════════════════
# FASTAPI β€” API Key Protected
# ══════════════════════════════════════════════════════════════════
fapi = FastAPI(title="CodeMind AI API", version="3.0", docs_url="/docs")
fapi.add_middleware(CORSMiddleware,
allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def require_key(key: str = Depends(_key_header)):
"""Checks that exactly your key is used β€” nobody else can access the API."""
if key != API_KEY:
raise HTTPException(status_code=401, detail={
"error": "❌ Invalid or missing API Key",
"fix": "Add header X-API-Key: YOUR_KEY to your request",
"get_key": "Set CODEMIND_API_KEY in HuggingFace Space β†’ Settings β†’ Secrets"
})
return key
class Req(BaseModel):
code: str = ""
prompt: str = ""
lang: str = "python"
target: str = "javascript"
framework: str = "pytest"
level: str = "beginner"
max_tokens: int = 256
def _j(data):
return JSONResponse(content=data if isinstance(data,dict) else {"result":data})
# ── Public (no key needed) ────────────────────────────────────────
@fapi.get("/")
async def root():
return {"name":"CodeMind AI","version":"3.0","status":"online",
"agents":len(orc.agents),"auth":"Add X-API-Key header","docs":"/docs"}
@fapi.get("/health")
async def health():
return {"status":"online","device":device,
"agents":len(orc.agents),"memory":memory.stats(),"ts":time.time()}
# ── Protected (need your API key) ─────────────────────────────────
@fapi.post("/api/generate", dependencies=[Depends(require_key)])
async def ep_gen(r:Req): return _j(orc.run("generate", r.prompt, lang=r.lang))
@fapi.post("/api/complete", dependencies=[Depends(require_key)])
async def ep_cmp(r:Req): return _j({"completion": functions.complete_code(r.code, r.max_tokens)})
@fapi.post("/api/bugs", dependencies=[Depends(require_key)])
async def ep_bug(r:Req): return _j(orc.run("bugs", r.code))
@fapi.post("/api/optimize", dependencies=[Depends(require_key)])
async def ep_opt(r:Req): return _j(orc.run("optimize", r.code))
@fapi.post("/api/docs", dependencies=[Depends(require_key)])
async def ep_doc(r:Req): return _j(orc.run("docs", r.code))
@fapi.post("/api/tests", dependencies=[Depends(require_key)])
async def ep_tst(r:Req): return _j(orc.run("tests", r.code, fw=r.framework))
@fapi.post("/api/security", dependencies=[Depends(require_key)])
async def ep_sec(r:Req): return _j(orc.run("security", r.code))
@fapi.post("/api/refactor", dependencies=[Depends(require_key)])
async def ep_ref(r:Req): return _j(orc.run("refactor", r.code))
@fapi.post("/api/translate", dependencies=[Depends(require_key)])
async def ep_tr(r:Req): return _j(orc.run("translate", r.code, target=r.target))
@fapi.post("/api/review", dependencies=[Depends(require_key)])
async def ep_rev(r:Req): return _j(orc.run("review", r.code))
@fapi.post("/api/complexity", dependencies=[Depends(require_key)])
async def ep_cpx(r:Req): return _j(orc.run("complexity", r.code))
@fapi.post("/api/imports", dependencies=[Depends(require_key)])
async def ep_imp(r:Req): return _j(orc.run("imports", r.code))
@fapi.post("/api/format", dependencies=[Depends(require_key)])
async def ep_fmt(r:Req): return _j(orc.run("format", r.code))
@fapi.post("/api/explain", dependencies=[Depends(require_key)])
async def ep_exp(r:Req): return _j(orc.run("explain", r.code, level=r.level))
@fapi.post("/api/deadcode", dependencies=[Depends(require_key)])
async def ep_dcd(r:Req): return _j(orc.run("deadcode", r.code))
@fapi.post("/api/performance", dependencies=[Depends(require_key)])
async def ep_prf(r:Req): return _j(orc.run("performance", r.code))
@fapi.post("/api/async", dependencies=[Depends(require_key)])
async def ep_asn(r:Req): return _j(orc.run("async", r.code))
@fapi.post("/api/pipeline", dependencies=[Depends(require_key)])
async def ep_pip(r:Req): return _j(orc.pipeline(r.code))
# ══════════════════════════════════════════════════════════════════
# GRADIO UI β€” 14 tabs, API key gate (BUG FIXED)
# ══════════════════════════════════════════════════════════════════
_CSS = """
.gradio-container { max-width:1100px !important; font-family: monospace; }
footer { display: none !important; }
"""
def _str(task, data, key, **kw):
try:
r = orc.run(task, data, **kw)
return (r["result"].get(key,"") if r.get("status")=="ok"
else r.get("msg","Error"))
except Exception as e:
return f"❌ {e}"
# ── βœ… FIXED: _check_key now returns to named components key_msg & tabs_col
def _check_key(k):
if k.strip() == API_KEY:
return "βœ… Correct Key! All 14 tabs unlocked πŸŽ‰", gr.update(visible=True)
return "❌ Wrong API Key β€” check your key and try again.", gr.update(visible=False)
with gr.Blocks(title="🧠 CodeMind AI", theme=gr.themes.Soft(), css=_CSS) as ui:
gr.Markdown("# 🧠 CodeMind AI β€” Always Online\n"
"**17 Agents Β· 20 Functions Β· GQA + RoPE + SwiGLU Β· REST API**")
# ── API Key Gate ─────────────────────────────────────────────
with gr.Group():
gr.Markdown("### πŸ”‘ Enter Your API Key to Unlock")
with gr.Row():
ui_key = gr.Textbox(placeholder="Paste your API key...",
label="API Key", type="password", scale=4)
unlock_btn = gr.Button("πŸ”“ Unlock", variant="primary", scale=1)
# βœ… FIXED: key_msg and tabs_col are proper named components
key_msg = gr.Markdown("*Enter your API key above and click Unlock*")
tabs_col = gr.Column(visible=False)
with tabs_col:
with gr.Tabs():
with gr.Tab("πŸš€ Generate"):
p1 = gr.Textbox(label="Describe what to build", lines=3,
placeholder="e.g. binary search function")
l1 = gr.Dropdown(["python","javascript","java","cpp","go","rust"],
value="python", label="Language")
o1 = gr.Code(label="Generated Code", language="python", lines=18)
gr.Button("⚑ Generate", variant="primary").click(
lambda p,l: _str("generate",p,"code",lang=l), [p1,l1], o1)
with gr.Tab("πŸ› Bug Detect"):
with gr.Row():
i2 = gr.Code(label="Paste Code", language="python", lines=15)
o2 = gr.JSON(label="Bug Report")
gr.Button("πŸ” Find Bugs", variant="primary").click(
lambda c: orc.run("bugs",c)["result"], i2, o2)
with gr.Tab("πŸ” Security"):
with gr.Row():
i3 = gr.Code(label="Paste Code", language="python", lines=15)
o3 = gr.JSON(label="Security Report")
gr.Button("πŸ›‘οΈ Scan Security", variant="primary").click(
lambda c: orc.run("security",c)["result"], i3, o3)
with gr.Tab("πŸ“‹ Review"):
with gr.Row():
i4 = gr.Code(label="Code to Review", language="python", lines=15)
o4 = gr.JSON(label="Review Report")
gr.Button("πŸ“ Review Code", variant="primary").click(
lambda c: orc.run("review",c)["result"], i4, o4)
with gr.Tab("⚑ Optimize"):
with gr.Row():
i5 = gr.Code(label="Original Code", language="python", lines=14)
o5 = gr.Code(label="Optimized Code", language="python", lines=14)
gr.Button("πŸš€ Optimize", variant="primary").click(
lambda c: _str("optimize",c,"optimized"), i5, o5)
with gr.Tab("🌐 Translate"):
with gr.Row():
with gr.Column():
i6 = gr.Code(label="Python Code", language="python", lines=14)
t6 = gr.Dropdown(["javascript","java","cpp","typescript","go","rust"],
value="javascript", label="Target Language")
o6 = gr.Code(label="Translated Code", lines=14)
gr.Button("πŸ”„ Translate", variant="primary").click(
lambda c,t: _str("translate",c,"translated",target=t), [i6,t6], o6)
with gr.Tab("πŸ§ͺ Tests"):
with gr.Row():
with gr.Column():
i7 = gr.Code(label="Code to Test", language="python", lines=12)
f7 = gr.Radio(["pytest","unittest"], value="pytest", label="Framework")
o7 = gr.Code(label="Unit Tests", language="python", lines=12)
gr.Button("πŸ§ͺ Generate Tests", variant="primary").click(
lambda c,f: _str("tests",c,"tests",fw=f), [i7,f7], o7)
with gr.Tab("πŸ“ Docs"):
with gr.Row():
i8 = gr.Code(label="Code", language="python", lines=12)
o8 = gr.Code(label="Documented Code", language="python", lines=12)
gr.Button("πŸ“ Generate Docs", variant="primary").click(
lambda c: _str("docs",c,"docstrings"), i8, o8)
with gr.Tab("πŸ’¬ Explain"):
with gr.Row():
with gr.Column():
i9 = gr.Code(label="Code to Explain", language="python", lines=12)
l9 = gr.Radio(["beginner","expert"], value="beginner", label="Level")
o9 = gr.Textbox(label="Explanation", lines=12)
gr.Button("πŸ’‘ Explain Code", variant="primary").click(
lambda c,l: _str("explain",c,"explanation",level=l), [i9,l9], o9)
with gr.Tab("πŸ”§ Refactor"):
with gr.Row():
i10 = gr.Code(label="Messy Code", language="python", lines=13)
o10 = gr.Code(label="Clean Code", language="python", lines=13)
gr.Button("πŸ”§ Refactor", variant="primary").click(
lambda c: _str("refactor",c,"refactored"), i10, o10)
with gr.Tab("πŸ“ Complexity"):
with gr.Row():
i11 = gr.Code(label="Code", language="python", lines=12)
o11 = gr.JSON(label="Complexity Analysis")
gr.Button("πŸ“ Analyze", variant="primary").click(
lambda c: orc.run("complexity",c)["result"], i11, o11)
with gr.Tab("⚑ Profiler"):
with gr.Row():
i12 = gr.Code(label="Code", language="python", lines=12)
o12 = gr.JSON(label="Performance Report")
gr.Button("⚑ Profile", variant="primary").click(
lambda c: orc.run("performance",c)["result"], i12, o12)
with gr.Tab("βš™οΈ Async"):
with gr.Row():
i13 = gr.Code(label="Sync Code", language="python", lines=12)
o13 = gr.Code(label="Async Code", language="python", lines=12)
gr.Button("βš™οΈ Convert to Async", variant="primary").click(
lambda c: _str("async",c,"async_code"), i13, o13)
with gr.Tab("πŸ”¬ Full Pipeline"):
with gr.Row():
i14 = gr.Code(label="Code β€” all 17 agents run on this",
language="python", lines=12)
o14 = gr.JSON(label="Full Report")
gr.Button("πŸ”¬ Run All Agents", variant="primary").click(
orc.pipeline, i14, o14)
gr.Markdown("""
---
### πŸ“‘ Use via API from anywhere
```python
import requests
response = requests.post(
"https://YOUR_HF_USERNAME-codemindai.hf.space/api/bugs",
headers={"X-API-Key": "YOUR_API_KEY"},
json={"code": "paste your code here"}
)
print(response.json())
```
**All endpoints:** /api/generate Β· /api/bugs Β· /api/security Β· /api/review
Β· /api/optimize Β· /api/translate Β· /api/tests Β· /api/docs Β· /api/pipeline
Β· GET /health Β· GET /docs (Swagger)
""")
# βœ… FIXED: Unlock button correctly wired to key_msg and tabs_col
unlock_btn.click(_check_key, inputs=ui_key, outputs=[key_msg, tabs_col])
ui_key.submit(_check_key, inputs=ui_key, outputs=[key_msg, tabs_col])
# ══════════════════════════════════════════════════════════════════
# MOUNT GRADIO INSIDE FASTAPI & LAUNCH
# ══════════════════════════════════════════════════════════════════
app = gr.mount_gradio_app(fapi, ui, path="/")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860, log_level="warning")