codeMax / app.py
kirikir13's picture
Update app.py
d3330ee verified
Raw
History Blame Contribute Delete
11.5 kB
# =============================================================================
# codeMax — Qwen 3.6 27B CoderBot on ZeroGPU (Gradio 6)
# Model: DavidAU/Qwen3.6-27B-Heretic-Uncensored-FINETUNE-NEO-CODE-Di-IMatrix-MAX-GGUF
# =============================================================================
import spaces
import gradio as gr
import os
import json
from pathlib import Path
from typing import List, Optional, Generator
# --- Config ----------------------------------------------------------------
MODEL_REPO = os.environ.get(
"MODEL_REPO",
"DavidAU/Qwen3.6-27B-Heretic-Uncensored-FINETUNE-NEO-CODE-Di-IMatrix-MAX-GGUF",
)
MODEL_FILE = os.environ.get(
"MODEL_FILE",
"Qwen3.6-27B-NEO-CODE-HERE-2T-OT-Q4_K_M.gguf",
)
N_CTX = int(os.environ.get("N_CTX", "8192"))
N_GPU = int(os.environ.get("N_GPU_LAYERS", "-1"))
SYSTEM_PROMPT = """You are codeMax, an uncensored 27B coding assistant. You excel at:
- Writing production code in Python, TypeScript, JavaScript, Rust, Go, C/C++, SQL, shell
- Debugging, refactoring, reviewing complex codebases
- Explaining algorithms, architecture, system design
- Reading shared files and answering about them
Rules: put code in ```language blocks. Be precise and thorough."""
# --- Model singleton -------------------------------------------------------
_llm = None
_model_path: Optional[str] = None
def _download():
global _model_path
if _model_path is not None:
return _model_path
from huggingface_hub import hf_hub_download
print(f"[MODEL] Downloading {MODEL_FILE}...")
_model_path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE)
print(f"[MODEL] Cached -> {_model_path}")
return _model_path
def _load():
global _llm
if _llm is not None:
return _llm
from llama_cpp import Llama
path = _download()
print(f"[MODEL] Loading (n_gpu_layers={N_GPU})...")
_llm = Llama(
model_path=path,
n_ctx=N_CTX,
n_gpu_layers=N_GPU,
chat_format="chatml",
verbose=False,
seed=-1,
)
print("[MODEL] Ready.")
return _llm
# --- File readers ----------------------------------------------------------
def _read_text(path, enc="utf-8"):
"""Safe text reader with encoding fallback."""
for e in [enc, "utf-8-sig", "latin-1", "cp1252", "utf-16"]:
try:
with open(path, "r", encoding=e) as f:
return f.read()
except UnicodeError:
continue
with open(path, "r", encoding="utf-8", errors="replace") as f:
return f.read()
def parse_file(file_path, file_name):
"""Parse a single uploaded file into context-ready markdown."""
suffix = Path(file_name).suffix.lower()
name = Path(file_name).name
# ---- Code / text files ----
if suffix in (
".txt", ".md", ".py", ".ts", ".tsx", ".js", ".jsx", ".mjs",
".css", ".scss", ".less", ".html", ".htm", ".xml", ".svg",
".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf",
".sh", ".bash", ".zsh", ".fish", ".ps1", ".bat",
".sql", ".prisma",
".c", ".cpp", ".cc", ".cxx", ".h", ".hpp", ".hh",
".java", ".kt", ".kts", ".scala", ".groovy",
".go", ".rs", ".rb", ".php", ".pl", ".pm",
".swift", ".r", ".lua", ".zig", ".nim", ".dart",
".env", ".gitignore", ".editorconfig",
".tf", ".tfvars", ".hcl", ".vue", ".svelte", ".astro",
):
content = _read_text(file_path)
lang = suffix.lstrip(".")
if suffix == ".md":
lang = "markdown"
elif suffix in (".yml",):
lang = "yaml"
elif suffix in (".tf", ".tfvars"):
lang = "hcl"
elif suffix in (".htm",):
lang = "html"
return f"### `{name}`\n```{lang}\n{content}\n```\n\n---\n"
# ---- JSON ----
if suffix == ".json":
content = _read_text(file_path)
try:
parsed = json.loads(content)
content = json.dumps(parsed, indent=2, ensure_ascii=False)
except Exception:
pass
return f"### `{name}`\n```json\n{content}\n```\n\n---\n"
# ---- CSV ----
if suffix == ".csv":
import csv
import io
content = _read_text(file_path)
rows = list(csv.reader(io.StringIO(content)))
if len(rows) > 51:
rows = rows[:50] + [[f"... {len(rows) - 50} rows truncated"]]
widths = [max(len(str(c)) for c in col) for col in zip(*rows)]
sep = "|-" + "-|-".join("-" * x for x in widths) + "-|"
table = "\n".join(
"| " + " | ".join(str(c).ljust(widths[i]) for i, c in enumerate(row)) + " |"
for row in rows
)
table = table.split("\n", 1)
table.insert(1, sep)
return f"### `{name}`\n" + "\n".join(table) + "\n\n---\n"
# ---- Word documents ----
if suffix == ".docx":
try:
import docx
doc = docx.Document(file_path)
text = "\n\n".join(p.text for p in doc.paragraphs if p.text.strip())
return f"### `{name}`\n{text}\n\n---\n"
except Exception as e:
return f"### `{name}` (DOCX error: {e})\n\n---\n"
# ---- PDF ----
if suffix == ".pdf":
try:
import pdfplumber
with pdfplumber.open(file_path) as pdf:
text = "\n\n".join(
page.extract_text() or "" for page in pdf.pages
)
return f"### `{name}`\n{text}\n\n---\n"
except Exception as e:
return f"### `{name}` (PDF error: {e})\n\n---\n"
# ---- Fallback ----
try:
content = _read_text(file_path)
return f"### `{name}`\n```\n{content[:10000]}\n```\n\n---\n"
except Exception as e:
return f"### `{name}` (could not read: {e})\n\n---\n"
def parse_all_files(files):
"""Parse all uploaded files into a single context string."""
if not files:
return ""
parts = []
for f in files:
if isinstance(f, dict):
path = f.get("path") or f.get("name")
name = f.get("orig_name") or f.get("name", "unknown")
elif hasattr(f, "name"):
path = f.name
name = getattr(f, "orig_name", Path(path).name)
else:
path = str(f)
name = Path(path).name
try:
parts.append(parse_file(path, name))
except Exception as e:
parts.append(f"### `{name}`\nParse error: {e}\n\n---\n")
return "\n".join(parts)
# --- Generation (wrapped with @spaces.GPU for ZeroGPU) ---------------------
@spaces.GPU
def respond(message, history, uploaded_files):
"""Generate a streaming response from the model."""
file_context = parse_all_files(uploaded_files)
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
# Inject uploaded files as context
if file_context:
messages.append({
"role": "user",
"content": "[Uploaded files — read carefully]\n\n" + file_context,
})
messages.append({
"role": "assistant",
"content": "Got it! I've read through all the uploaded files.",
})
# Append conversation history (Gradio 6 "messages" format)
for entry in history:
role = entry.get("role", "user")
content = entry.get("content", "")
if content:
messages.append({"role": role, "content": content})
# Append the current user message
messages.append({"role": "user", "content": message})
# Stream from the model
llm = _load()
stream = llm.create_chat_completion(
messages=messages,
temperature=0.6,
top_p=0.8,
top_k=20,
max_tokens=4096,
stream=True,
)
partial = ""
for chunk in stream:
if chunk.get("choices"):
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
partial += content
yield history + [
{"role": "user", "content": message},
{"role": "assistant", "content": partial},
]
yield history + [
{"role": "user", "content": message},
{"role": "assistant", "content": partial},
]
# --- Gradio UI ------------------------------------------------------------
def create_demo():
with gr.Blocks(title="codeMax — Qwen 3.6 27B Coder") as demo:
gr.Markdown(
"# codeMax\n"
"**Qwen 3.6 27B · Uncensored · Code-optimized · ZeroGPU**\n"
"Drop code, docs, JSON, CSVs — chat with a 27B coding LLM."
)
with gr.Row(equal_height=True):
# ---- Left column: chat ----
with gr.Column(scale=3):
chatbot = gr.Chatbot(
label="Chat",
height=580,
avatar_images=(
None,
"https://huggingface.co/front/assets/huggingface_logo-noborder.svg",
),
)
with gr.Row():
msg = gr.Textbox(
placeholder="Ask about code, share files, or chat...",
scale=8,
show_label=False,
container=False,
)
send = gr.Button(">", scale=1, variant="primary", min_width=48)
clear_btn = gr.Button("Clear", size="sm")
# ---- Right column: file upload ----
with gr.Column(scale=1):
gr.Markdown("### Drop Files")
files = gr.File(
file_count="multiple",
label="Code, docs, data...",
file_types=[
".py", ".ts", ".tsx", ".js", ".jsx", ".json",
".md", ".txt", ".csv", ".yaml", ".yml", ".toml",
".html", ".css", ".xml", ".sql", ".sh", ".go",
".rs", ".rb", ".java", ".c", ".cpp", ".h",
".docx", ".pdf", ".env", ".cfg", ".ini",
],
)
uploaded_info = gr.Markdown("_No files uploaded._")
# ---- Event handlers ----
def update_info(uploaded):
if not uploaded:
return "_No files uploaded._"
names = []
for f in uploaded:
if isinstance(f, dict):
names.append(f.get("orig_name", "?"))
else:
names.append(getattr(f, "orig_name", Path(str(f)).name))
return "**Uploaded:**\n" + "\n".join(f"- `{n}`" for n in names)
def stream_response(message, history, current_files):
for h in respond(message, history, current_files):
yield h
msg.submit(stream_response, [msg, chatbot, files], [chatbot]).then(
lambda: "", None, [msg]
)
send.click(stream_response, [msg, chatbot, files], [chatbot]).then(
lambda: "", None, [msg]
)
clear_btn.click(lambda: [], None, chatbot, queue=False)
files.change(update_info, files, uploaded_info)
return demo
if __name__ == "__main__":
demo = create_demo()
demo.queue(default_concurrency_limit=1, max_size=4)
demo.launch(
css="""
footer { display: none !important; }
"""
)