Unity_ai / app.py
Percy3822's picture
Update app.py
d400db2 verified
import os, shutil, subprocess, threading, uuid, time, zipfile, gzip, glob
import gradio as gr
from transformers import pipeline
LOG_FILE = "train.log"
MODEL_DIR = "trained_model"
ZIP_FILE = "trained_model.zip"
ZIP_PART = ZIP_FILE + ".part"
def _human(n):
u=["B","KB","MB","GB"]; i=0; x=float(n)
while x>=1024 and i<len(u)-1: x/=1024; i+=1
return f"{x:.1f} {u[i]}"
def _read(path, fb="Waiting..."):
try:
with open(path,"r",encoding="utf-8",errors="ignore") as f: return f.read()
except: return fb
def _zip_dir_atomic(src, out_path, tmp_path):
if os.path.exists(tmp_path): os.remove(tmp_path)
with zipfile.ZipFile(tmp_path,"w",zipfile.ZIP_DEFLATED) as z:
for root,_,files in os.walk(src):
for fn in files:
fp = os.path.join(root, fn)
z.write(fp, arcname=os.path.relpath(fp, src))
if os.path.exists(out_path): os.remove(out_path)
os.replace(tmp_path, out_path)
def upload_file(f):
if f is None: return "❌ No file.", ""
os.makedirs("uploads", exist_ok=True)
dst = os.path.join("uploads", f"dataset_{uuid.uuid4().hex}.jsonl")
shutil.copy(f.name, dst)
return f"βœ… Uploaded β†’ {dst}", dst
def _train_single(dataset, log, quick, steps, subset):
args = ["python","train.py","--dataset",dataset,"--output",MODEL_DIR]
if quick:
args += ["--quick","1","--max_steps",str(steps or 8),"--subset",str(subset or 32)]
p = subprocess.Popen(args, stdout=log, stderr=subprocess.STDOUT)
p.wait()
log.write(f"\n ↳ train.py exited {p.returncode} for {os.path.basename(dataset)}\n")
return p.returncode == 0
def _worker(dataset_path, shards_folder, quick, steps, subset):
with open(LOG_FILE,"w") as log: log.write("πŸ”₯ Starting training (C# AI)…\n")
ok=True
with open(LOG_FILE,"a") as log:
if shards_folder:
log.write(f"πŸ“‚ Folder mode: {shards_folder}\n")
paths = sorted(glob.glob(os.path.join(shards_folder,"*.jsonl"))) + \
sorted(glob.glob(os.path.join(shards_folder,"*.jsonl.gz")))
paths = [p for p in paths if "manifest" not in os.path.basename(p).lower()]
if not paths:
log.write("❌ No shards (*.jsonl / *.jsonl.gz).\n"); ok=False
else:
tmp="tmp_train.jsonl"
for i,pth in enumerate(paths,1):
log.write(f"\n[{i}/{len(paths)}] Shard: {os.path.basename(pth)}\n")
if pth.endswith(".gz"):
try:
with gzip.open(pth,"rt",encoding="utf-8") as rf, open(tmp,"w",encoding="utf-8") as wf:
for line in rf: wf.write(line)
shard = tmp
except Exception as e:
log.write(f"❌ GZ read failed: {e}\n"); ok=False; break
else:
shard = pth
if not _train_single(shard, log, quick, steps, subset): ok=False; break
if os.path.exists(tmp):
try: os.remove(tmp)
except: pass
else:
if not dataset_path or not os.path.exists(dataset_path):
log.write("❌ Upload a valid dataset.\n"); ok=False
else:
ok = _train_single(dataset_path, log, quick, steps, subset)
if ok and os.path.isdir(MODEL_DIR):
try:
_zip_dir_atomic(MODEL_DIR, ZIP_FILE, ZIP_PART)
sz = _human(os.path.getsize(ZIP_FILE))
log.write(f"\nβœ… Model zipped β†’ {ZIP_FILE} ({sz})\n")
except Exception as e:
log.write(f"\n❌ Zip failed: {e}\n")
else:
log.write("\n❌ Training failed; no zip.\n")
def start_training(dataset_path, shards_folder, quick, steps, subset):
for p in (ZIP_FILE, ZIP_PART):
if os.path.exists(p):
try: os.remove(p)
except: pass
threading.Thread(target=_worker, args=(dataset_path, shards_folder, quick, steps, subset), daemon=True).start()
return "πŸš€ Training started. Use Refresh buttons."
def read_logs(): return _read(LOG_FILE, "Waiting for logs...")
def refresh_download():
if os.path.exists(ZIP_FILE):
size=_human(os.path.getsize(ZIP_FILE))
return gr.update(visible=True, value=ZIP_FILE), f"*Ready:* {ZIP_FILE} β€’ {size}"
return gr.update(visible=False, value=None), "No trained model yet."
def load_test_zip(z):
if z is None: return "❌ No file.", ""
import zipfile
root = os.path.join("models", f"test_{uuid.uuid4().hex}")
os.makedirs(root, exist_ok=True)
try:
with zipfile.ZipFile(z.name,"r") as zz: zz.extractall(root)
return f"βœ… Extracted to {root}", root
except Exception as e:
return f"❌ Extract failed: {e}", ""
def clear_test_model(): return "Cleared. Will use trained_model/ if present.", ""
def generate(prompt, model_path):
if not prompt.strip(): return "Enter a prompt."
try:
if model_path and os.path.isdir(model_path): m, src = model_path, "(uploaded)"
elif os.path.isdir(MODEL_DIR): m, src = MODEL_DIR, "(trained_model/)"
else: m, src = "sshleifer/tiny-gpt2", "(tiny fallback)"
gen = pipeline("text-generation", model=m, tokenizer="sshleifer/tiny-gpt2")
out = gen(prompt, max_length=120, do_sample=True, temperature=0.7, truncation=True)[0]["generated_text"]
return f"{out}\n\nβ€” using {src}"
except Exception as e:
return f"❌ Error: {e}"
with gr.Blocks(title="C# AI Trainer (Quick Test Mode)") as app:
gr.Markdown("## ⚑ C# AI Trainer β€” includes *Quick Test* (few steps on a tiny model).")
ds_state = gr.State("")
folder_state = gr.State("")
test_model_state = gr.State("")
with gr.Tab("🧠 Train"):
with gr.Row():
file_in = gr.File(label="Upload dataset (.jsonl)", file_types=[".jsonl"])
up_btn = gr.Button("πŸ“€ Upload")
with gr.Row():
shards_folder = gr.Textbox(value="", label="Folder with shards (optional)")
use_folder = gr.Button("πŸ“‚ Use Folder")
with gr.Row():
quick = gr.Checkbox(value=True, label="⚑ Quick Test (tiny model, few steps)")
steps = gr.Number(value=8, label="Max steps (Quick mode)", precision=0)
subset = gr.Number(value=32, label="Subset rows (Quick mode)", precision=0)
status = gr.Textbox(label="Status", interactive=False)
with gr.Row():
start_btn = gr.Button("πŸš€ Start Training")
refresh_logs = gr.Button("πŸ” Refresh Logs")
refresh_dl = gr.Button("πŸ“¦ Refresh Download Area")
logs = gr.Textbox(label="πŸ“œ Logs", lines=18)
dl_btn = gr.DownloadButton(label="πŸ“₯ Download Trained Model (.zip)", visible=False, value=None)
dl_info = gr.Markdown("No trained model yet.")
up_btn.click(fn=upload_file, inputs=file_in, outputs=[status, ds_state])
use_folder.click(fn=lambda p: ("βœ… Using folder." if p.strip() else "❌ Provide folder path.", p.strip()),
inputs=shards_folder, outputs=[status, folder_state])
start_btn.click(fn=start_training, inputs=[ds_state, folder_state, quick, steps, subset], outputs=status
).then(fn=read_logs, outputs=logs
).then(fn=refresh_download, outputs=[dl_btn, dl_info])
refresh_logs.click(fn=read_logs, outputs=logs)
refresh_dl.click(fn=refresh_download, outputs=[dl_btn, dl_info])
with gr.Tab("πŸš€ Test"):
with gr.Row():
zip_in = gr.File(label="Upload model ZIP", file_types=[".zip"])
load_btn = gr.Button("πŸ“¦ Load ZIP")
clear_btn = gr.Button("🧹 Clear")
test_status = gr.Textbox(label="Test Model Status", interactive=False)
prompt = gr.Textbox(label="Prompt", placeholder="e.g., Write a C# method that reverses a string.")
go = gr.Button("πŸ” Generate")
out = gr.Textbox(label="AI Response", lines=12)
load_btn.click(fn=load_test_zip, inputs=zip_in, outputs=[test_status, test_model_state])
clear_btn.click(fn=clear_test_model, outputs=[test_status, test_model_state])
go.click(fn=generate, inputs=[prompt, test_model_state], outputs=out)
app.launch()