| |
| |
| |
| |
|
|
| import spaces |
| import gradio as gr |
| import os |
| import json |
| from pathlib import Path |
| from typing import List, Optional, Generator |
|
|
| |
| MODEL_REPO = os.environ.get( |
| "MODEL_REPO", |
| "DavidAU/Qwen3.6-27B-Heretic-Uncensored-FINETUNE-NEO-CODE-Di-IMatrix-MAX-GGUF", |
| ) |
| MODEL_FILE = os.environ.get( |
| "MODEL_FILE", |
| "Qwen3.6-27B-NEO-CODE-HERE-2T-OT-Q4_K_M.gguf", |
| ) |
| N_CTX = int(os.environ.get("N_CTX", "8192")) |
| N_GPU = int(os.environ.get("N_GPU_LAYERS", "-1")) |
|
|
| SYSTEM_PROMPT = """You are codeMax, an uncensored 27B coding assistant. You excel at: |
| - Writing production code in Python, TypeScript, JavaScript, Rust, Go, C/C++, SQL, shell |
| - Debugging, refactoring, reviewing complex codebases |
| - Explaining algorithms, architecture, system design |
| - Reading shared files and answering about them |
| |
| Rules: put code in ```language blocks. Be precise and thorough.""" |
|
|
| |
| _llm = None |
| _model_path: Optional[str] = None |
|
|
|
|
| def _download(): |
| global _model_path |
| if _model_path is not None: |
| return _model_path |
| from huggingface_hub import hf_hub_download |
| print(f"[MODEL] Downloading {MODEL_FILE}...") |
| _model_path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE) |
| print(f"[MODEL] Cached -> {_model_path}") |
| return _model_path |
|
|
|
|
| def _load(): |
| global _llm |
| if _llm is not None: |
| return _llm |
| from llama_cpp import Llama |
| path = _download() |
| print(f"[MODEL] Loading (n_gpu_layers={N_GPU})...") |
| _llm = Llama( |
| model_path=path, |
| n_ctx=N_CTX, |
| n_gpu_layers=N_GPU, |
| chat_format="chatml", |
| verbose=False, |
| seed=-1, |
| ) |
| print("[MODEL] Ready.") |
| return _llm |
|
|
|
|
| |
| def _read_text(path, enc="utf-8"): |
| """Safe text reader with encoding fallback.""" |
| for e in [enc, "utf-8-sig", "latin-1", "cp1252", "utf-16"]: |
| try: |
| with open(path, "r", encoding=e) as f: |
| return f.read() |
| except UnicodeError: |
| continue |
| with open(path, "r", encoding="utf-8", errors="replace") as f: |
| return f.read() |
|
|
|
|
| def parse_file(file_path, file_name): |
| """Parse a single uploaded file into context-ready markdown.""" |
| suffix = Path(file_name).suffix.lower() |
| name = Path(file_name).name |
|
|
| |
| if suffix in ( |
| ".txt", ".md", ".py", ".ts", ".tsx", ".js", ".jsx", ".mjs", |
| ".css", ".scss", ".less", ".html", ".htm", ".xml", ".svg", |
| ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf", |
| ".sh", ".bash", ".zsh", ".fish", ".ps1", ".bat", |
| ".sql", ".prisma", |
| ".c", ".cpp", ".cc", ".cxx", ".h", ".hpp", ".hh", |
| ".java", ".kt", ".kts", ".scala", ".groovy", |
| ".go", ".rs", ".rb", ".php", ".pl", ".pm", |
| ".swift", ".r", ".lua", ".zig", ".nim", ".dart", |
| ".env", ".gitignore", ".editorconfig", |
| ".tf", ".tfvars", ".hcl", ".vue", ".svelte", ".astro", |
| ): |
| content = _read_text(file_path) |
| lang = suffix.lstrip(".") |
| if suffix == ".md": |
| lang = "markdown" |
| elif suffix in (".yml",): |
| lang = "yaml" |
| elif suffix in (".tf", ".tfvars"): |
| lang = "hcl" |
| elif suffix in (".htm",): |
| lang = "html" |
| return f"### `{name}`\n```{lang}\n{content}\n```\n\n---\n" |
|
|
| |
| if suffix == ".json": |
| content = _read_text(file_path) |
| try: |
| parsed = json.loads(content) |
| content = json.dumps(parsed, indent=2, ensure_ascii=False) |
| except Exception: |
| pass |
| return f"### `{name}`\n```json\n{content}\n```\n\n---\n" |
|
|
| |
| if suffix == ".csv": |
| import csv |
| import io |
| content = _read_text(file_path) |
| rows = list(csv.reader(io.StringIO(content))) |
| if len(rows) > 51: |
| rows = rows[:50] + [[f"... {len(rows) - 50} rows truncated"]] |
| widths = [max(len(str(c)) for c in col) for col in zip(*rows)] |
| sep = "|-" + "-|-".join("-" * x for x in widths) + "-|" |
| table = "\n".join( |
| "| " + " | ".join(str(c).ljust(widths[i]) for i, c in enumerate(row)) + " |" |
| for row in rows |
| ) |
| table = table.split("\n", 1) |
| table.insert(1, sep) |
| return f"### `{name}`\n" + "\n".join(table) + "\n\n---\n" |
|
|
| |
| if suffix == ".docx": |
| try: |
| import docx |
| doc = docx.Document(file_path) |
| text = "\n\n".join(p.text for p in doc.paragraphs if p.text.strip()) |
| return f"### `{name}`\n{text}\n\n---\n" |
| except Exception as e: |
| return f"### `{name}` (DOCX error: {e})\n\n---\n" |
|
|
| |
| if suffix == ".pdf": |
| try: |
| import pdfplumber |
| with pdfplumber.open(file_path) as pdf: |
| text = "\n\n".join( |
| page.extract_text() or "" for page in pdf.pages |
| ) |
| return f"### `{name}`\n{text}\n\n---\n" |
| except Exception as e: |
| return f"### `{name}` (PDF error: {e})\n\n---\n" |
|
|
| |
| try: |
| content = _read_text(file_path) |
| return f"### `{name}`\n```\n{content[:10000]}\n```\n\n---\n" |
| except Exception as e: |
| return f"### `{name}` (could not read: {e})\n\n---\n" |
|
|
|
|
| def parse_all_files(files): |
| """Parse all uploaded files into a single context string.""" |
| if not files: |
| return "" |
| parts = [] |
| for f in files: |
| if isinstance(f, dict): |
| path = f.get("path") or f.get("name") |
| name = f.get("orig_name") or f.get("name", "unknown") |
| elif hasattr(f, "name"): |
| path = f.name |
| name = getattr(f, "orig_name", Path(path).name) |
| else: |
| path = str(f) |
| name = Path(path).name |
| try: |
| parts.append(parse_file(path, name)) |
| except Exception as e: |
| parts.append(f"### `{name}`\nParse error: {e}\n\n---\n") |
| return "\n".join(parts) |
|
|
|
|
| |
| @spaces.GPU |
| def respond(message, history, uploaded_files): |
| """Generate a streaming response from the model.""" |
| file_context = parse_all_files(uploaded_files) |
| messages = [{"role": "system", "content": SYSTEM_PROMPT}] |
|
|
| |
| if file_context: |
| messages.append({ |
| "role": "user", |
| "content": "[Uploaded files — read carefully]\n\n" + file_context, |
| }) |
| messages.append({ |
| "role": "assistant", |
| "content": "Got it! I've read through all the uploaded files.", |
| }) |
|
|
| |
| for entry in history: |
| role = entry.get("role", "user") |
| content = entry.get("content", "") |
| if content: |
| messages.append({"role": role, "content": content}) |
|
|
| |
| messages.append({"role": "user", "content": message}) |
|
|
| |
| llm = _load() |
| stream = llm.create_chat_completion( |
| messages=messages, |
| temperature=0.6, |
| top_p=0.8, |
| top_k=20, |
| max_tokens=4096, |
| stream=True, |
| ) |
|
|
| partial = "" |
| for chunk in stream: |
| if chunk.get("choices"): |
| delta = chunk["choices"][0].get("delta", {}) |
| content = delta.get("content", "") |
| if content: |
| partial += content |
| yield history + [ |
| {"role": "user", "content": message}, |
| {"role": "assistant", "content": partial}, |
| ] |
|
|
| yield history + [ |
| {"role": "user", "content": message}, |
| {"role": "assistant", "content": partial}, |
| ] |
|
|
|
|
| |
| def create_demo(): |
| with gr.Blocks(title="codeMax — Qwen 3.6 27B Coder") as demo: |
| gr.Markdown( |
| "# codeMax\n" |
| "**Qwen 3.6 27B · Uncensored · Code-optimized · ZeroGPU**\n" |
| "Drop code, docs, JSON, CSVs — chat with a 27B coding LLM." |
| ) |
|
|
| with gr.Row(equal_height=True): |
| |
| with gr.Column(scale=3): |
| chatbot = gr.Chatbot( |
| label="Chat", |
| height=580, |
| avatar_images=( |
| None, |
| "https://huggingface.co/front/assets/huggingface_logo-noborder.svg", |
| ), |
| ) |
| with gr.Row(): |
| msg = gr.Textbox( |
| placeholder="Ask about code, share files, or chat...", |
| scale=8, |
| show_label=False, |
| container=False, |
| ) |
| send = gr.Button(">", scale=1, variant="primary", min_width=48) |
| clear_btn = gr.Button("Clear", size="sm") |
|
|
| |
| with gr.Column(scale=1): |
| gr.Markdown("### Drop Files") |
| files = gr.File( |
| file_count="multiple", |
| label="Code, docs, data...", |
| file_types=[ |
| ".py", ".ts", ".tsx", ".js", ".jsx", ".json", |
| ".md", ".txt", ".csv", ".yaml", ".yml", ".toml", |
| ".html", ".css", ".xml", ".sql", ".sh", ".go", |
| ".rs", ".rb", ".java", ".c", ".cpp", ".h", |
| ".docx", ".pdf", ".env", ".cfg", ".ini", |
| ], |
| ) |
| uploaded_info = gr.Markdown("_No files uploaded._") |
|
|
| |
| def update_info(uploaded): |
| if not uploaded: |
| return "_No files uploaded._" |
| names = [] |
| for f in uploaded: |
| if isinstance(f, dict): |
| names.append(f.get("orig_name", "?")) |
| else: |
| names.append(getattr(f, "orig_name", Path(str(f)).name)) |
| return "**Uploaded:**\n" + "\n".join(f"- `{n}`" for n in names) |
|
|
| def stream_response(message, history, current_files): |
| for h in respond(message, history, current_files): |
| yield h |
|
|
| msg.submit(stream_response, [msg, chatbot, files], [chatbot]).then( |
| lambda: "", None, [msg] |
| ) |
| send.click(stream_response, [msg, chatbot, files], [chatbot]).then( |
| lambda: "", None, [msg] |
| ) |
| clear_btn.click(lambda: [], None, chatbot, queue=False) |
| files.change(update_info, files, uploaded_info) |
|
|
| return demo |
|
|
|
|
| if __name__ == "__main__": |
| demo = create_demo() |
| demo.queue(default_concurrency_limit=1, max_size=4) |
| demo.launch( |
| css=""" |
| footer { display: none !important; } |
| """ |
| ) |