| | import os |
| | import subprocess |
| | from sys import argv |
| | from time import strftime, sleep |
| | import shutil |
| | from pathlib import Path |
| | import gradio as gr |
| | from huggingface_hub import snapshot_download, HfApi |
| | from apscheduler.schedulers.background import BackgroundScheduler |
| | from theme import blurple |
| |
|
| | |
| | HF_TOKEN = os.environ.get("HF_TOKEN") |
| | HOST_REPO = "lainlives/ztestzz" |
| | LLAMACPP_DIR = Path("./llama.cpp") |
| | CONVERT_SCRIPT = LLAMACPP_DIR / "convert_hf_to_gguf.py" |
| | QUANTIZE_BIN = LLAMACPP_DIR / "llama-quantize" |
| | TARGET_QUANTS = ["Q8_0", "Q6_K", "Q5_K_M", "Q5_K_S", "Q5_0", "Q4_K_M", "Q4_K_S", "Q4_0"] |
| |
|
| |
|
| | def format_log(msg): |
| | return f"[{strftime('%H:%M:%S')}] {msg}" |
| |
|
| |
|
| | def setup_ollama_keys(private_key_content): |
| | """ |
| | Writes the user's private key to ~/.ollama/id_ed25519 |
| | Required for 'ollama push' to work. |
| | """ |
| | if not private_key_content: |
| | return False, "β οΈ No Private Key provided. Pushing will likely fail." |
| |
|
| | ollama_dir = Path(os.path.expanduser("~/.ollama")) |
| | ollama_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | key_path = ollama_dir / "id_ed25519" |
| |
|
| | try: |
| | |
| | with open(key_path, "w") as f: |
| | f.write(private_key_content.strip()) |
| | os.chmod(key_path, 0o600) |
| |
|
| | return True, "π Private Key installed successfully." |
| | except Exception as e: |
| | return False, f"β Failed to install keys: {e}" |
| |
|
| |
|
| | def push_to_ollama(gguf_path, ollama_repo, tag_suffix): |
| | ollama_tag = f"{ollama_repo}:{tag_suffix.lower()}" |
| |
|
| | |
| | |
| | modelfile_path = gguf_path.parent / "Modelfile" |
| | with open(modelfile_path, "w") as f: |
| | f.write(f"FROM {gguf_path.resolve()}") |
| |
|
| | logs = [] |
| | logs.append(format_log(f"π³ Creating Ollama build: {ollama_tag}")) |
| |
|
| | try: |
| | |
| | |
| | create_cmd = ["ollama", "create", ollama_tag, "-f", str(modelfile_path)] |
| | subprocess.run(create_cmd, check=True, capture_output=True) |
| |
|
| | |
| | if modelfile_path.exists(): |
| | os.remove(modelfile_path) |
| |
|
| | logs.append(format_log(f"β¬οΈ Pushing to registry: {ollama_tag}...")) |
| |
|
| | |
| | |
| | push_cmd = ["ollama", "push", ollama_tag] |
| | push_result = subprocess.run(push_cmd, capture_output=True, text=True) |
| |
|
| | if push_result.returncode == 0: |
| | logs.append(format_log(f"β
Successfully pushed {ollama_tag}")) |
| | else: |
| | logs.append(format_log(f"β Push failed: {push_result.stderr}")) |
| |
|
| | |
| | subprocess.run(["ollama", "rm", ollama_tag], stdout=subprocess.DEVNULL) |
| |
|
| | except subprocess.CalledProcessError as e: |
| | |
| | logs.append(format_log(f"β Ollama Create Error: {e}")) |
| | except Exception as e: |
| | logs.append(format_log(f"β Error on {tag_suffix}: {str(e)}")) |
| |
|
| | return logs |
| |
|
| |
|
| | def start_ollama_daemon(ollama_key): |
| | print("β³ Starting Ollama daemon in background...") |
| | logs.append(format_log("β³ Starting Ollama daemon in background...")) |
| | env = os.environ.copy() |
| | |
| | success, auth_msg = setup_ollama_keys(ollama_key) |
| | logs.append(format_log(auth_msg)) |
| | yield "\n".join(logs) |
| | ollamastd = None |
| | ollamaerr = None |
| | if not success: |
| | logs.append(format_log("β Stopping: Authentication setup failed.")) |
| | yield "\n".join(logs) |
| | return |
| | subprocess.Popen(["ollama", "serve"], stdout=ollamastd, stderr=ollamaerr, env=env) |
| | logs.append(format_log("β³ Starting Ollama daemon in background...")) |
| | sleep(5) |
| | logs.append(format_log(ollamastd)) |
| | logs.append(format_log(ollamaerr)) |
| | print(format_log(ollamastd)) |
| | print(format_log(ollamaerr)) |
| |
|
| |
|
| | def stop_ollama_daemon(): |
| | print("β³ Stopping Ollama daemon...") |
| | subprocess.Popen(["pkill", "ollama"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| |
|
| |
|
| | def run_conversion(hf_repo, ollama_repo, hf_token, progress=gr.Progress()): |
| | logs = [] |
| |
|
| | work_dir = Path("conversion_work_dir") |
| | download_dir = work_dir / "downloads" |
| | output_dir = work_dir / "output" |
| |
|
| | if work_dir.exists(): |
| | shutil.rmtree(work_dir) |
| | os.makedirs(download_dir, exist_ok=True) |
| | os.makedirs(output_dir, exist_ok=True) |
| |
|
| | try: |
| | |
| | logs.append(format_log(f"β¬οΈ Downloading {hf_repo}...")) |
| | yield "\n".join(logs) |
| |
|
| | model_path = snapshot_download( |
| | repo_id=hf_repo, |
| | local_dir=download_dir, |
| | token=hf_token if hf_token else None |
| | ) |
| | logs.append(format_log("β
Download complete.")) |
| | yield "\n".join(logs) |
| |
|
| | |
| | bf16_path = output_dir / "model-bf16.gguf" |
| | logs.append(format_log("βοΈ Converting to BF16...")) |
| | yield "\n".join(logs) |
| |
|
| | cmd = ["python", str(CONVERT_SCRIPT), str(model_path), "--outtype", "bf16", "--outfile", str(bf16_path)] |
| | result = subprocess.run(cmd, capture_output=True, text=True) |
| |
|
| | if result.returncode == 0: |
| | logs.extend(push_to_ollama(bf16_path, ollama_repo, "bf16")) |
| | os.remove(bf16_path) |
| | logs.append(format_log("π§Ή Cleaned up BF16")) |
| | else: |
| | logs.append(format_log(f"β οΈ BF16 Conversion failed: {result.stderr}")) |
| | yield "\n".join(logs) |
| |
|
| | |
| | fp16_path = output_dir / "model-f16.gguf" |
| | logs.append(format_log("βοΈ Converting to FP16 (Master)...")) |
| | yield "\n".join(logs) |
| |
|
| | cmd = ["python", str(CONVERT_SCRIPT), str(model_path), "--outtype", "f16", "--outfile", str(fp16_path)] |
| | subprocess.run(cmd, check=True, capture_output=True) |
| |
|
| | logs.extend(push_to_ollama(fp16_path, ollama_repo, "f16")) |
| | yield "\n".join(logs) |
| |
|
| | |
| | for quant in TARGET_QUANTS: |
| | logs.append(format_log(f"--- {quant} ---")) |
| | yield "\n".join(logs) |
| |
|
| | final_gguf = output_dir / f"model-{quant}.gguf" |
| | q_cmd = [str(QUANTIZE_BIN), str(fp16_path), str(final_gguf), quant] |
| | q_result = subprocess.run(q_cmd, capture_output=True, text=True) |
| |
|
| | if q_result.returncode != 0: |
| | logs.append(format_log(f"β Quantize failed: {q_result.stderr}")) |
| | continue |
| |
|
| | logs.extend(push_to_ollama(final_gguf, ollama_repo, quant)) |
| | os.remove(final_gguf) |
| | logs.append(format_log(f"π§Ή Cleaned up {quant}")) |
| | yield "\n".join(logs) |
| |
|
| | if fp16_path.exists(): |
| | os.remove(fp16_path) |
| | logs.append(format_log("π§Ή Cleaned up f16")) |
| |
|
| | except Exception as e: |
| | logs.append(format_log(f"β CRITICAL ERROR: {str(e)}")) |
| |
|
| | finally: |
| | if work_dir.exists(): |
| | shutil.rmtree(work_dir) |
| | logs.append(format_log("π Job Done. Workspace cleared.")) |
| | yield "\n".join(logs) |
| |
|
| |
|
| | def run_pipeline(hf_repo, ollama_repo, hf_token, ollama_key, progress=gr.Progress()): |
| | print() |
| | start_ollama_daemon(ollama_key) |
| | |
| | for update in run_conversion(hf_repo, ollama_repo, hf_token, progress): |
| | yield update |
| | stop_ollama_daemon() |
| |
|
| |
|
| | |
| | with gr.Blocks(title="HF to Ollama") as demo: |
| | gr.Markdown("## Convert safetensor HF repos to an Ollama repo. Only creates 4bit to f16+bf16 GGUFs") |
| |
|
| | with gr.Row(): |
| | with gr.Column(): |
| | hf_input = gr.Textbox(label="Source HF Repo", placeholder="unsloth/Qwen3.5-9B", value="unsloth/Qwen3.5-0.8B") |
| | hf_token_input = gr.Textbox(label="HF Token (for gated models and faster download)", type="password") |
| |
|
| | with gr.Column(): |
| | ollama_input = gr.Textbox(label="Destination Ollama Repo", placeholder="fervent_mcclintock/Qwen3.5-9B", value="fervent_mcclintock/Qwen3.5-9B") |
| | ollama_key_input = gr.Textbox(label="Ollama Private Key", lines=5, type="password", placeholder="-----BEGIN OPENSSH PRIVATE KEY-----...") |
| | btn = gr.Button("Start", variant="primary") |
| |
|
| | logs = gr.TextArea(label="Logs", interactive=False, lines=10, autoscroll=True) |
| |
|
| | btn.click( |
| | fn=run_pipeline, |
| | inputs=[hf_input, ollama_input, hf_token_input, ollama_key_input], |
| | outputs=logs |
| | ) |
| |
|
| |
|
| | def restart_space(): |
| | HfApi().restart_space(repo_id=HOST_REPO, token=HF_TOKEN, factory_reboot=True) |
| |
|
| |
|
| | scheduler = BackgroundScheduler() |
| | scheduler.add_job(restart_space, "interval", seconds=21600) |
| | scheduler.start() |
| |
|
| | if __name__ == "__main__": |
| | demo.queue().launch(server_name="0.0.0.0", server_port=7860, theme=blurple) |