Spaces:
Sleeping
Sleeping
| import os, shutil, subprocess, zipfile | |
| from pathlib import Path | |
| from datetime import datetime | |
| import gradio as gr | |
| ROOT = Path(__file__).resolve().parent | |
| DATA = ROOT / "dataset.jsonl" | |
| LOG = ROOT / "train.log" | |
| RUNS = ROOT / "runs" | |
| RUNS.mkdir(exist_ok=True) | |
| # ---------- helpers ---------- | |
| def ls_workspace() -> str: | |
| rows = [] | |
| for p in sorted(ROOT.iterdir(), key=lambda x: (x.is_file(), x.name.lower())): | |
| try: size = p.stat().st_size | |
| except Exception: size = 0 | |
| rows.append(f"{'[DIR]' if p.is_dir() else ' '}\t{size:>10}\t{p.name}") | |
| return "\n".join(rows) or "(empty)" | |
| def list_models(): | |
| out = [] | |
| for base in [ROOT, RUNS]: | |
| if not base.exists(): | |
| continue | |
| for p in base.iterdir(): | |
| if p.is_dir() and (p / "config.json").exists() and ( | |
| (p / "tokenizer.json").exists() or (p / "tokenizer_config.json").exists() | |
| ): | |
| out.append(str(p)) | |
| # ensure uniqueness & sorted | |
| return sorted(set(out)) | |
| def dropdown_update_safe(models, prefer=None): | |
| val = prefer if (prefer and prefer in models) else (models[0] if models else None) | |
| return gr.update(choices=models, value=val) | |
| # ---------- training ---------- | |
| def upload_dataset(file): | |
| if not file: | |
| return "❌ No file selected.", ls_workspace() | |
| if hasattr(file, "name") and os.path.isfile(file.name): | |
| shutil.copy(file.name, DATA) | |
| return f"✅ Uploaded → {DATA.name}", ls_workspace() | |
| return "⚠ Unexpected item; please upload a .jsonl file.", ls_workspace() | |
| def start_training(run_name): | |
| # Unique run folder and zip | |
| run_id = (run_name or "").strip() or datetime.now().strftime("run_%Y%m%d_%H%M%S") | |
| out_dir = RUNS / run_id | |
| zip_path = RUNS / f"{run_id}.zip" | |
| # Clean previous artifacts only for this run | |
| if out_dir.exists(): | |
| shutil.rmtree(out_dir, ignore_errors=True) | |
| if zip_path.exists(): | |
| zip_path.unlink() | |
| LOG.write_text(f"🔥 Training started…\nRun: {run_id}\n", encoding="utf-8") | |
| cmd = [ | |
| "python", str(ROOT / "train.py"), | |
| "--dataset", str(DATA), | |
| "--output", str(out_dir), | |
| "--zip_path", str(zip_path), | |
| "--model_name", "Salesforce/codegen-350M-multi", | |
| "--epochs", "1", | |
| "--batch_size", "2", | |
| "--block_size", "256", | |
| "--learning_rate", "5e-5", | |
| ] | |
| with open(LOG, "a", encoding="utf-8") as lf: | |
| code = subprocess.Popen(cmd, stdout=lf, stderr=subprocess.STDOUT).wait() | |
| models = list_models() | |
| model_update = dropdown_update_safe(models, prefer=str(out_dir) if out_dir.exists() else None) | |
| if code == 0 and zip_path.exists(): | |
| info = f"✅ Training complete. Saved: {out_dir.name} | Zip: {zip_path.name}" | |
| dl_update = gr.update(value=str(zip_path), visible=True) | |
| else: | |
| info = f"❌ Training failed (exit {code}). Check logs below." | |
| dl_update = gr.update(value=None, visible=False) | |
| return info, dl_update, ls_workspace(), read_logs(), model_update | |
| def read_logs(): | |
| return LOG.read_text(encoding="utf-8")[-20000:] if LOG.exists() else "⏳ Waiting…" | |
| def refresh_download(): | |
| # We don’t know which run user wants; show the newest zip if any | |
| zips = sorted(RUNS.glob("*.zip"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| latest = zips[0] if zips else None | |
| models = list_models() | |
| return ( | |
| gr.update(value=(str(latest) if latest else None), visible=bool(latest)), | |
| ls_workspace(), | |
| dropdown_update_safe(models) | |
| ) | |
| # ---------- testing ---------- | |
| def import_zip(zfile): | |
| if not zfile: | |
| return "❌ No zip selected.", list_models() | |
| dest = ROOT / "imported_model" | |
| if dest.exists(): | |
| shutil.rmtree(dest, ignore_errors=True) | |
| dest.mkdir(parents=True, exist_ok=True) | |
| with zipfile.ZipFile(zfile.name, "r") as z: | |
| z.extractall(dest) | |
| return f"✅ Imported to {dest.name}", list_models() | |
| def generate(model_path, prompt): | |
| # 1) Validate inputs | |
| if not model_path: | |
| return "❌ Select a model from the dropdown first." | |
| if not Path(model_path).exists(): | |
| return f"❌ Model folder not found: {model_path}" | |
| if not prompt or not prompt.strip(): | |
| return "❌ Enter a prompt." | |
| try: | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| import torch | |
| tok = AutoTokenizer.from_pretrained(model_path, use_fast=True) | |
| # ensure pad token | |
| if tok.pad_token_id is None: | |
| if tok.eos_token_id is not None: | |
| tok.pad_token = tok.eos_token | |
| else: | |
| tok.add_special_tokens({"pad_token": "[PAD]"}) | |
| model = AutoModelForCausalLM.from_pretrained(model_path) | |
| # align embeddings if we added tokens | |
| if getattr(model, "config", None) and getattr(model.config, "vocab_size", None) and len(tok) > model.config.vocab_size: | |
| model.resize_token_embeddings(len(tok)) | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tok, | |
| device_map="auto" if torch.cuda.is_available() else None, | |
| ) | |
| out = pipe( | |
| prompt.strip(), | |
| max_new_tokens=120, | |
| do_sample=True, | |
| temperature=0.4, | |
| top_p=0.9, | |
| repetition_penalty=1.15, | |
| no_repeat_ngram_size=4, | |
| eos_token_id=tok.eos_token_id, | |
| pad_token_id=tok.pad_token_id, | |
| truncation=True | |
| )[0]["generated_text"] | |
| return out | |
| except Exception as e: | |
| import traceback | |
| return "❌ Error during generation:\n" + "".join(traceback.format_exception_only(type(e), e)) | |
| # ---------- UI ---------- | |
| with gr.Blocks(title="Python AI — Train & Test") as app: | |
| gr.Markdown("## 🧠 Python AI — Train & Test\n• Unique run folders • Safe download • Reliable generation\n") | |
| # ---- Test tab first so Train can target its dropdown | |
| with gr.Tab("Test"): | |
| gr.Markdown("### Pick a model folder or upload a .zip, then prompt it") | |
| refresh_btn = gr.Button("↻ Refresh Model List") | |
| model_list = gr.Dropdown( | |
| choices=list_models(), | |
| label="Available AIs", | |
| interactive=True, | |
| allow_custom_value=True # no warnings when empty | |
| ) | |
| zip_in = gr.File(label="Or upload a model .zip", file_types=[".zip"]) | |
| import_status = gr.Textbox(label="Import Status", interactive=False) | |
| prompt = gr.Textbox(label="Prompt", lines=8, placeholder="### Instruction:\nPython: write a function ...\n### Response:\n") | |
| go = gr.Button("Generate") | |
| out = gr.Textbox(label="AI Response", lines=20) | |
| # ---- Train tab | |
| with gr.Tab("Train"): | |
| with gr.Row(): | |
| ds = gr.File(label="📥 Upload JSONL", file_types=[".jsonl"]) | |
| ws = gr.Textbox(label="Workspace", lines=16, value=ls_workspace()) | |
| run_name = gr.Textbox(label="Run name (optional)", placeholder="e.g., python_small_v1") | |
| up_status = gr.Textbox(label="Upload Status", interactive=False) | |
| start = gr.Button("🚀 Start Training", variant="primary") | |
| logs = gr.Textbox(label="📜 Training Logs", lines=18, value=read_logs()) | |
| status = gr.Textbox(label="Status", interactive=False) | |
| download_file = gr.File(label="📦 Latest trained zip", visible=False) | |
| refresh_dl_btn = gr.Button("Refresh Download") | |
| # wiring | |
| ds.change(upload_dataset, inputs=ds, outputs=[up_status, ws]) | |
| start.click( | |
| start_training, | |
| inputs=[run_name], | |
| outputs=[status, download_file, ws, logs, model_list] | |
| ) | |
| refresh_dl_btn.click( | |
| refresh_download, | |
| outputs=[download_file, ws, model_list] | |
| ) | |
| refresh_btn.click(lambda: dropdown_update_safe(list_models()), outputs=model_list) | |
| zip_in.change(import_zip, inputs=zip_in, outputs=[import_status, model_list]) | |
| go.click(generate, inputs=[model_list, prompt], outputs=out) | |
| app.queue(default_concurrency_limit=1).launch() |