Spaces:
Sleeping
Sleeping
| import os, shutil, subprocess, zipfile, time | |
| from pathlib import Path | |
| import gradio as gr | |
| ROOT = Path(".").resolve() | |
| DATASET = ROOT / "dataset.jsonl" | |
| LOG = ROOT / "train.log" | |
| OUT_DIR = ROOT / "trained_model" | |
| ZIP = ROOT / "trained_model.zip" | |
| PID = ROOT / "TRAIN_PID" | |
| DONE = ROOT / "TRAIN_DONE" | |
| ERRF = ROOT / "TRAIN_ERROR" | |
| def ls_workspace(): | |
| rows = [] | |
| for p in sorted(ROOT.iterdir(), key=lambda x: (x.is_file(), x.name.lower())): | |
| sz = p.stat().st_size if p.exists() else 0 | |
| rows.append(f"{'[DIR]' if p.is_dir() else ' '}\t{sz:>10}\t{p.name}") | |
| return "\n".join(rows) or "(empty)" | |
| def upload_dataset(f): | |
| if not f: return "❌ No file.", ls_workspace() | |
| shutil.copy(f.name, DATASET) | |
| return f"✅ Uploaded → {DATASET.name}", ls_workspace() | |
| def start_training(): # non-blocking | |
| # clean previous | |
| for p in [OUT_DIR, ZIP, DONE, ERRF, PID]: | |
| if isinstance(p, Path) and p.is_dir(): | |
| shutil.rmtree(p, ignore_errors=True) | |
| elif isinstance(p, Path) and p.exists(): | |
| p.unlink(missing_ok=True) | |
| LOG.write_text("🔥 Training started in background…\n", encoding="utf-8") | |
| cmd = [ | |
| "python", "train.py", | |
| "--dataset", str(DATASET), | |
| "--output", str(OUT_DIR), | |
| "--model_name", "Salesforce/codegen-350M-multi", | |
| "--epochs", "1", | |
| "--batch_size", "2", | |
| "--block_size", "256", | |
| "--learning_rate", "5e-5", | |
| "--subset", "0", | |
| ] | |
| with open(LOG, "a", encoding="utf-8") as lf: | |
| proc = subprocess.Popen(cmd, stdout=lf, stderr=subprocess.STDOUT) | |
| PID.write_text(str(proc.pid)) | |
| return "🚀 Training started. Use “Refresh Logs/Download”.", ls_workspace() | |
| def read_logs(): | |
| return LOG.read_text(encoding="utf-8")[-20000:] if LOG.exists() else "⏳ Waiting…" | |
| def _zip_if_ready(): | |
| """Zip only when DONE flag exists and zip not created yet.""" | |
| if DONE.exists() and OUT_DIR.exists() and not ZIP.exists(): | |
| with zipfile.ZipFile(ZIP, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| for p in OUT_DIR.rglob("*"): | |
| z.write(p, arcname=p.relative_to(OUT_DIR)) | |
| return ZIP.exists() | |
| def refresh_status_and_download(): | |
| status = "⏳ Training…" | |
| if ERRF.exists(): | |
| status = f"❌ Error: {ERRF.read_text(encoding='utf-8')[-500:]}" | |
| elif DONE.exists(): | |
| status = "✅ Training complete." | |
| _zip_if_ready() | |
| files = [str(ZIP)] if ZIP.exists() else [] | |
| return status, gr.Files.update(value=files, visible=bool(files)), ls_workspace() | |
| # ---- Test tab ---- | |
| def list_models(): | |
| out = [] | |
| for p in ROOT.iterdir(): | |
| if p.is_dir() and (p / "config.json").exists() and ( | |
| (p / "tokenizer.json").exists() or (p / "tokenizer_config.json").exists() | |
| ): | |
| out.append(str(p)) | |
| if OUT_DIR.exists() and str(OUT_DIR) not in out: | |
| out.insert(0, str(OUT_DIR)) | |
| return sorted(out) | |
| def import_zip(z): | |
| if not z: return "❌ No zip.", list_models() | |
| dest = ROOT / f"imported_{int(time.time())}" | |
| dest.mkdir(parents=True, exist_ok=True) | |
| with zipfile.ZipFile(z.name, "r") as zp: | |
| zp.extractall(dest) | |
| return f"✅ Imported to {dest.name}", list_models() | |
| def generate(model_path, prompt): | |
| if not model_path: return "❌ Select a model." | |
| if not prompt or not prompt.strip(): return "❌ Enter a prompt." | |
| try: | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| tok = AutoTokenizer.from_pretrained(model_path, use_fast=True) | |
| if tok.pad_token_id is None and tok.eos_token_id is not None: | |
| tok.pad_token = tok.eos_token | |
| model = AutoModelForCausalLM.from_pretrained(model_path) | |
| gen = pipeline("text-generation", model=model, tokenizer=tok) | |
| out = gen(prompt, max_new_tokens=220, do_sample=True, temperature=0.2, top_p=0.9, | |
| repetition_penalty=1.2, no_repeat_ngram_size=4, | |
| eos_token_id=tok.eos_token_id, pad_token_id=tok.pad_token_id, | |
| truncation=True)[0]["generated_text"] | |
| return out | |
| except Exception as e: | |
| return f"❌ Error: {e}" | |
| with gr.Blocks(title="Python AI — Train & Test") as app: | |
| gr.Markdown("## 🧠 Python AI — Train & Test\nBackground training with reliable zipping.\n") | |
| with gr.Tab("Train"): | |
| with gr.Row(): | |
| ds = gr.File(label="📥 Upload JSONL", file_types=[".jsonl", ".jsonl.gz", ".json"]) | |
| ws = gr.Textbox(label="Workspace", lines=16, value=ls_workspace()) | |
| up_status = gr.Textbox(label="Upload Status", interactive=False) | |
| start = gr.Button("🚀 Start Training", variant="primary") | |
| logs = gr.Textbox(label="📜 Logs (click Refresh)", lines=18) | |
| refresh_logs_btn = gr.Button("Refresh Logs") | |
| status = gr.Textbox(label="Status", interactive=False) | |
| downloads = gr.Files(label="📦 Downloads (zips)", value=[], interactive=False) | |
| refresh_dl_btn = gr.Button("Refresh Status & Download") | |
| ds.change(upload_dataset, inputs=ds, outputs=[up_status, ws]) | |
| start.click(start_training, outputs=[status, ws]) | |
| refresh_logs_btn.click(read_logs, outputs=logs) | |
| refresh_dl_btn.click(refresh_status_and_download, outputs=[status, downloads, ws]) | |
| with gr.Tab("Test"): | |
| refresh_btn = gr.Button("↻ Refresh Model List") | |
| model_list = gr.Dropdown(choices=list_models(), label="Available AIs", interactive=True) | |
| zip_in = gr.File(label="Or upload model .zip", file_types=[".zip"]) | |
| import_status = gr.Textbox(label="Import Status", interactive=False) | |
| prompt = gr.Textbox(label="Prompt", lines=8, placeholder="### Instruction:\nPython: write a function ...\n### Response:\n") | |
| go = gr.Button("Generate") | |
| out = gr.Textbox(label="AI Response", lines=20) | |
| refresh_btn.click(list_models, outputs=model_list) | |
| zip_in.change(import_zip, inputs=zip_in, outputs=[import_status, model_list]) | |
| go.click(generate, inputs=[model_list, prompt], outputs=out) | |
| app.launch() |