| import os, shutil, subprocess, threading, uuid, time, zipfile, gzip, glob | |
| import gradio as gr | |
| from transformers import pipeline | |
| LOG_FILE = "train.log" | |
| MODEL_DIR = "trained_model" | |
| ZIP_FILE = "trained_model.zip" | |
| ZIP_PART = ZIP_FILE + ".part" | |
| def _human(n): | |
| u=["B","KB","MB","GB"]; i=0; x=float(n) | |
| while x>=1024 and i<len(u)-1: x/=1024; i+=1 | |
| return f"{x:.1f} {u[i]}" | |
| def _read(path, fb="Waiting..."): | |
| try: | |
| with open(path,"r",encoding="utf-8",errors="ignore") as f: return f.read() | |
| except: return fb | |
| def _zip_dir_atomic(src, out_path, tmp_path): | |
| if os.path.exists(tmp_path): os.remove(tmp_path) | |
| with zipfile.ZipFile(tmp_path,"w",zipfile.ZIP_DEFLATED) as z: | |
| for root,_,files in os.walk(src): | |
| for fn in files: | |
| fp = os.path.join(root, fn) | |
| z.write(fp, arcname=os.path.relpath(fp, src)) | |
| if os.path.exists(out_path): os.remove(out_path) | |
| os.replace(tmp_path, out_path) | |
| def upload_file(f): | |
| if f is None: return "β No file.", "" | |
| os.makedirs("uploads", exist_ok=True) | |
| dst = os.path.join("uploads", f"dataset_{uuid.uuid4().hex}.jsonl") | |
| shutil.copy(f.name, dst) | |
| return f"β Uploaded β {dst}", dst | |
| def _train_single(dataset, log, quick, steps, subset): | |
| args = ["python","train.py","--dataset",dataset,"--output",MODEL_DIR] | |
| if quick: | |
| args += ["--quick","1","--max_steps",str(steps or 8),"--subset",str(subset or 32)] | |
| p = subprocess.Popen(args, stdout=log, stderr=subprocess.STDOUT) | |
| p.wait() | |
| log.write(f"\n β³ train.py exited {p.returncode} for {os.path.basename(dataset)}\n") | |
| return p.returncode == 0 | |
| def _worker(dataset_path, shards_folder, quick, steps, subset): | |
| with open(LOG_FILE,"w") as log: log.write("π₯ Starting training (C# AI)β¦\n") | |
| ok=True | |
| with open(LOG_FILE,"a") as log: | |
| if shards_folder: | |
| log.write(f"π Folder mode: {shards_folder}\n") | |
| paths = sorted(glob.glob(os.path.join(shards_folder,"*.jsonl"))) + \ | |
| sorted(glob.glob(os.path.join(shards_folder,"*.jsonl.gz"))) | |
| paths = [p for p in paths if "manifest" not in os.path.basename(p).lower()] | |
| if not paths: | |
| log.write("β No shards (*.jsonl / *.jsonl.gz).\n"); ok=False | |
| else: | |
| tmp="tmp_train.jsonl" | |
| for i,pth in enumerate(paths,1): | |
| log.write(f"\n[{i}/{len(paths)}] Shard: {os.path.basename(pth)}\n") | |
| if pth.endswith(".gz"): | |
| try: | |
| with gzip.open(pth,"rt",encoding="utf-8") as rf, open(tmp,"w",encoding="utf-8") as wf: | |
| for line in rf: wf.write(line) | |
| shard = tmp | |
| except Exception as e: | |
| log.write(f"β GZ read failed: {e}\n"); ok=False; break | |
| else: | |
| shard = pth | |
| if not _train_single(shard, log, quick, steps, subset): ok=False; break | |
| if os.path.exists(tmp): | |
| try: os.remove(tmp) | |
| except: pass | |
| else: | |
| if not dataset_path or not os.path.exists(dataset_path): | |
| log.write("β Upload a valid dataset.\n"); ok=False | |
| else: | |
| ok = _train_single(dataset_path, log, quick, steps, subset) | |
| if ok and os.path.isdir(MODEL_DIR): | |
| try: | |
| _zip_dir_atomic(MODEL_DIR, ZIP_FILE, ZIP_PART) | |
| sz = _human(os.path.getsize(ZIP_FILE)) | |
| log.write(f"\nβ Model zipped β {ZIP_FILE} ({sz})\n") | |
| except Exception as e: | |
| log.write(f"\nβ Zip failed: {e}\n") | |
| else: | |
| log.write("\nβ Training failed; no zip.\n") | |
| def start_training(dataset_path, shards_folder, quick, steps, subset): | |
| for p in (ZIP_FILE, ZIP_PART): | |
| if os.path.exists(p): | |
| try: os.remove(p) | |
| except: pass | |
| threading.Thread(target=_worker, args=(dataset_path, shards_folder, quick, steps, subset), daemon=True).start() | |
| return "π Training started. Use Refresh buttons." | |
| def read_logs(): return _read(LOG_FILE, "Waiting for logs...") | |
| def refresh_download(): | |
| if os.path.exists(ZIP_FILE): | |
| size=_human(os.path.getsize(ZIP_FILE)) | |
| return gr.update(visible=True, value=ZIP_FILE), f"*Ready:* {ZIP_FILE} β’ {size}" | |
| return gr.update(visible=False, value=None), "No trained model yet." | |
| def load_test_zip(z): | |
| if z is None: return "β No file.", "" | |
| import zipfile | |
| root = os.path.join("models", f"test_{uuid.uuid4().hex}") | |
| os.makedirs(root, exist_ok=True) | |
| try: | |
| with zipfile.ZipFile(z.name,"r") as zz: zz.extractall(root) | |
| return f"β Extracted to {root}", root | |
| except Exception as e: | |
| return f"β Extract failed: {e}", "" | |
| def clear_test_model(): return "Cleared. Will use trained_model/ if present.", "" | |
| def generate(prompt, model_path): | |
| if not prompt.strip(): return "Enter a prompt." | |
| try: | |
| if model_path and os.path.isdir(model_path): m, src = model_path, "(uploaded)" | |
| elif os.path.isdir(MODEL_DIR): m, src = MODEL_DIR, "(trained_model/)" | |
| else: m, src = "sshleifer/tiny-gpt2", "(tiny fallback)" | |
| gen = pipeline("text-generation", model=m, tokenizer="sshleifer/tiny-gpt2") | |
| out = gen(prompt, max_length=120, do_sample=True, temperature=0.7, truncation=True)[0]["generated_text"] | |
| return f"{out}\n\nβ using {src}" | |
| except Exception as e: | |
| return f"β Error: {e}" | |
| with gr.Blocks(title="C# AI Trainer (Quick Test Mode)") as app: | |
| gr.Markdown("## β‘ C# AI Trainer β includes *Quick Test* (few steps on a tiny model).") | |
| ds_state = gr.State("") | |
| folder_state = gr.State("") | |
| test_model_state = gr.State("") | |
| with gr.Tab("π§ Train"): | |
| with gr.Row(): | |
| file_in = gr.File(label="Upload dataset (.jsonl)", file_types=[".jsonl"]) | |
| up_btn = gr.Button("π€ Upload") | |
| with gr.Row(): | |
| shards_folder = gr.Textbox(value="", label="Folder with shards (optional)") | |
| use_folder = gr.Button("π Use Folder") | |
| with gr.Row(): | |
| quick = gr.Checkbox(value=True, label="β‘ Quick Test (tiny model, few steps)") | |
| steps = gr.Number(value=8, label="Max steps (Quick mode)", precision=0) | |
| subset = gr.Number(value=32, label="Subset rows (Quick mode)", precision=0) | |
| status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Row(): | |
| start_btn = gr.Button("π Start Training") | |
| refresh_logs = gr.Button("π Refresh Logs") | |
| refresh_dl = gr.Button("π¦ Refresh Download Area") | |
| logs = gr.Textbox(label="π Logs", lines=18) | |
| dl_btn = gr.DownloadButton(label="π₯ Download Trained Model (.zip)", visible=False, value=None) | |
| dl_info = gr.Markdown("No trained model yet.") | |
| up_btn.click(fn=upload_file, inputs=file_in, outputs=[status, ds_state]) | |
| use_folder.click(fn=lambda p: ("β Using folder." if p.strip() else "β Provide folder path.", p.strip()), | |
| inputs=shards_folder, outputs=[status, folder_state]) | |
| start_btn.click(fn=start_training, inputs=[ds_state, folder_state, quick, steps, subset], outputs=status | |
| ).then(fn=read_logs, outputs=logs | |
| ).then(fn=refresh_download, outputs=[dl_btn, dl_info]) | |
| refresh_logs.click(fn=read_logs, outputs=logs) | |
| refresh_dl.click(fn=refresh_download, outputs=[dl_btn, dl_info]) | |
| with gr.Tab("π Test"): | |
| with gr.Row(): | |
| zip_in = gr.File(label="Upload model ZIP", file_types=[".zip"]) | |
| load_btn = gr.Button("π¦ Load ZIP") | |
| clear_btn = gr.Button("π§Ή Clear") | |
| test_status = gr.Textbox(label="Test Model Status", interactive=False) | |
| prompt = gr.Textbox(label="Prompt", placeholder="e.g., Write a C# method that reverses a string.") | |
| go = gr.Button("π Generate") | |
| out = gr.Textbox(label="AI Response", lines=12) | |
| load_btn.click(fn=load_test_zip, inputs=zip_in, outputs=[test_status, test_model_state]) | |
| clear_btn.click(fn=clear_test_model, outputs=[test_status, test_model_state]) | |
| go.click(fn=generate, inputs=[prompt, test_model_state], outputs=out) | |
| app.launch() |