Spaces:
Running
Running
| import os | |
| os.environ["HF_HOME"] = "/tmp/hf_cache" | |
| os.makedirs("/tmp/hf_cache", exist_ok=True) | |
| import uuid | |
| import yaml | |
| import json | |
| import shutil | |
| import torch | |
| from pathlib import Path | |
| from PIL import Image | |
| from fastapi import FastAPI | |
| from fastapi.responses import JSONResponse | |
| from huggingface_hub import hf_hub_download, whoami | |
| from fastapi import FastAPI, Query | |
| from huggingface_hub import list_repo_files, hf_hub_download, upload_file | |
| import io | |
| import requests | |
| from fastapi import BackgroundTasks | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import zipfile | |
| import tempfile # ✅ Add this! | |
| app = FastAPI() | |
| print("entry point for the app ") | |
| # CORS setup to allow requests from your frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Replace "*" with your frontend domain in production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def health_check(): | |
| return {"status": "✅ FastAPI running on Hugging Face Spaces!"} | |
| def healthz(): | |
| return {"ok": True} | |
| import gradio as gr | |
| def create_gradio_ui(): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## ✅ FastAPI + Gradio is running\n\nUse `/train-from-hf` endpoint to start training.\n\nVisit `/docs` for API reference.") | |
| return demo | |
| # Mount Gradio app at /gradio | |
| app = gr.mount_gradio_app(app, create_gradio_ui(), path="/gradio") | |
| # @app.get("/docs", include_in_schema=False) | |
| # def custom_docs(): | |
| # return JSONResponse(get_openapi(title="LoRA Autorun API", version="1.0.0", routes=app.routes)) | |
| # REPO_ID = "rahul7star/ohamlab" | |
| # FOLDER = "demo" | |
| # BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/" | |
| # #show all images in a DIR at UI FE | |
| # @app.get("/images") | |
| # def list_images(): | |
| # try: | |
| # all_files = list_repo_files(REPO_ID) | |
| # folder_prefix = FOLDER.rstrip("/") + "/" | |
| # files_in_folder = [ | |
| # f for f in all_files | |
| # if f.startswith(folder_prefix) | |
| # and "/" not in f[len(folder_prefix):] # no subfolder files | |
| # and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
| # ] | |
| # urls = [BASE_URL + f for f in files_in_folder] | |
| # return {"images": urls} | |
| # except Exception as e: | |
| # return {"error": str(e)} | |
| # from datetime import datetime | |
| # import tempfile | |
| # import uuid | |
| # # upload zip from UI | |
| # @app.post("/upload-zip") | |
| # async def upload_zip(file: UploadFile = File(...)): | |
| # if not file.filename.endswith(".zip"): | |
| # return {"error": "Please upload a .zip file"} | |
| # # Save the ZIP to /tmp | |
| # temp_zip_path = f"/tmp/{file.filename}" | |
| # with open(temp_zip_path, "wb") as f: | |
| # f.write(await file.read()) | |
| # # Create a unique subfolder name inside 'demo/' | |
| # timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| # unique_id = uuid.uuid4().hex[:6] | |
| # folder_name = f"upload_{timestamp}_{unique_id}" | |
| # hf_folder_prefix = f"demo/{folder_name}" | |
| # try: | |
| # with tempfile.TemporaryDirectory() as extract_dir: | |
| # # Extract zip | |
| # with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref: | |
| # zip_ref.extractall(extract_dir) | |
| # uploaded_files = [] | |
| # # Upload all extracted files | |
| # for root_dir, _, files in os.walk(extract_dir): | |
| # for name in files: | |
| # file_path = os.path.join(root_dir, name) | |
| # relative_path = os.path.relpath(file_path, extract_dir) | |
| # repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/") | |
| # upload_file( | |
| # path_or_fileobj=file_path, | |
| # path_in_repo=repo_path, | |
| # repo_id="rahul7star/ohamlab", | |
| # repo_type="model", | |
| # commit_message=f"Upload {relative_path} to {folder_name}", | |
| # token=True, | |
| # ) | |
| # uploaded_files.append(repo_path) | |
| # return { | |
| # "message": f"✅ Uploaded {len(uploaded_files)} files", | |
| # "folder": folder_name, | |
| # "files": uploaded_files, | |
| # } | |
| # except Exception as e: | |
| # return {"error": f"❌ Failed to process zip: {str(e)}"} | |
| # # upload a single file from UI | |
| # from typing import List | |
| # from fastapi import UploadFile, File, APIRouter | |
| # import os | |
| # from fastapi import UploadFile, File, APIRouter | |
| # from typing import List | |
| # from datetime import datetime | |
| # import uuid, os | |
| # @app.post("/upload") | |
| # async def upload_images( | |
| # background_tasks: BackgroundTasks, | |
| # files: List[UploadFile] = File(...) | |
| # ): | |
| # # Step 1: Generate dynamic folder name | |
| # timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| # unique_id = uuid.uuid4().hex[:6] | |
| # folder_name = f"upload_{timestamp}_{unique_id}" | |
| # hf_folder_prefix = f"demo/{folder_name}" | |
| # responses = [] | |
| # # Step 2: Save and upload each image | |
| # for file in files: | |
| # filename = file.filename | |
| # contents = await file.read() | |
| # temp_path = f"/tmp/{filename}" | |
| # with open(temp_path, "wb") as f: | |
| # f.write(contents) | |
| # try: | |
| # upload_file( | |
| # path_or_fileobj=temp_path, | |
| # path_in_repo=f"{hf_folder_prefix}/{filename}", | |
| # repo_id=T_REPO_ID, | |
| # repo_type="model", | |
| # commit_message=f"Upload {filename} to {hf_folder_prefix}", | |
| # token=True, | |
| # ) | |
| # responses.append({ | |
| # "filename": filename, | |
| # "status": "✅ uploaded", | |
| # "path": f"{hf_folder_prefix}/{filename}" | |
| # }) | |
| # except Exception as e: | |
| # responses.append({ | |
| # "filename": filename, | |
| # "status": f"❌ failed: {str(e)}" | |
| # }) | |
| # os.remove(temp_path) | |
| # # Step 3: Add filter job to background | |
| # def run_filter(): | |
| # try: | |
| # result = filter_and_rename_images(folder=hf_folder_prefix) | |
| # print(f"🧼 Filter result: {result}") | |
| # except Exception as e: | |
| # print(f"❌ Filter failed: {str(e)}") | |
| # background_tasks.add_task(run_filter) | |
| # return { | |
| # "message": f"{len(files)} file(s) uploaded", | |
| # "upload_folder": hf_folder_prefix, | |
| # "results": responses, | |
| # "note": "Filtering started in background" | |
| # } | |
| # #Tranining Data set start fitering data for traninig | |
| # T_REPO_ID = "rahul7star/ohamlab" | |
| # DESCRIPTION_TEXT = ( | |
| # "Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. " | |
| # "He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background." | |
| # ) | |
| # def is_image_file(filename: str) -> bool: | |
| # return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp")) | |
| # @app.post("/filter-images") | |
| # def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")): | |
| # try: | |
| # all_files = list_repo_files(T_REPO_ID) | |
| # folder_prefix = folder.rstrip("/") + "/" | |
| # filter_folder = f"filter-{folder.rstrip('/')}" | |
| # filter_prefix = filter_folder + "/" | |
| # # Filter images only directly in the folder (no subfolders) | |
| # image_files = [ | |
| # f for f in all_files | |
| # if f.startswith(folder_prefix) | |
| # and "/" not in f[len(folder_prefix):] # no deeper path | |
| # and is_image_file(f) | |
| # ] | |
| # if not image_files: | |
| # return {"error": f"No images found in folder '{folder}'"} | |
| # uploaded_files = [] | |
| # for idx, orig_path in enumerate(image_files, start=1): | |
| # # Download image content bytes (uses local cache) | |
| # local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path) | |
| # with open(local_path, "rb") as f: | |
| # file_bytes = f.read() | |
| # # Rename images as image1.jpeg, image2.jpeg, ... | |
| # new_image_name = f"image{idx}.jpeg" | |
| # # Upload renamed image from memory | |
| # upload_file( | |
| # path_or_fileobj=io.BytesIO(file_bytes), | |
| # path_in_repo=filter_prefix + new_image_name, | |
| # repo_id=T_REPO_ID, | |
| # repo_type="model", | |
| # commit_message=f"Upload renamed image {new_image_name} to {filter_folder}", | |
| # token=True, | |
| # ) | |
| # uploaded_files.append(filter_prefix + new_image_name) | |
| # # Create and upload text file for each image | |
| # txt_filename = f"image{idx}.txt" | |
| # upload_file( | |
| # path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")), | |
| # path_in_repo=filter_prefix + txt_filename, | |
| # repo_id=T_REPO_ID, | |
| # repo_type="model", | |
| # commit_message=f"Upload text file {txt_filename} to {filter_folder}", | |
| # token=True, | |
| # ) | |
| # uploaded_files.append(filter_prefix + txt_filename) | |
| # return { | |
| # "message": f"Processed and uploaded {len(image_files)} images and text files.", | |
| # "files": uploaded_files, | |
| # } | |
| # except Exception as e: | |
| # return {"error": str(e)} | |
| # # ========== CONFIGURATION ========== | |
| # REPO_ID = "rahul7star/ohamlab" | |
| # FOLDER_IN_REPO = "filter-demo/upload_20250708_041329_9c5c81" | |
| # CONCEPT_SENTENCE = "ohamlab style" | |
| # LORA_NAME = "ohami_filter_autorun" | |
| # # ========== FASTAPI APP ========== | |
| # # ========== HELPERS ========== | |
| # def create_dataset(images, *captions): | |
| # destination_folder = f"datasets_{uuid.uuid4()}" | |
| # os.makedirs(destination_folder, exist_ok=True) | |
| # jsonl_file_path = os.path.join(destination_folder, "metadata.jsonl") | |
| # with open(jsonl_file_path, "a") as jsonl_file: | |
| # for index, image in enumerate(images): | |
| # new_image_path = shutil.copy(str(image), destination_folder) | |
| # caption = captions[index] | |
| # file_name = os.path.basename(new_image_path) | |
| # data = {"file_name": file_name, "prompt": caption} | |
| # jsonl_file.write(json.dumps(data) + "\n") | |
| # return destination_folder | |
| # def recursive_update(d, u): | |
| # for k, v in u.items(): | |
| # if isinstance(v, dict) and v: | |
| # d[k] = recursive_update(d.get(k, {}), v) | |
| # else: | |
| # d[k] = v | |
| # return d | |
| # def start_training( | |
| # lora_name, | |
| # concept_sentence, | |
| # steps, | |
| # lr, | |
| # rank, | |
| # model_to_train, | |
| # low_vram, | |
| # dataset_folder, | |
| # sample_1, | |
| # sample_2, | |
| # sample_3, | |
| # use_more_advanced_options, | |
| # more_advanced_options, | |
| # ): | |
| # try: | |
| # user = whoami() | |
| # username = user.get("name", "anonymous") | |
| # push_to_hub = True | |
| # except: | |
| # username = "anonymous" | |
| # push_to_hub = False | |
| # slugged_lora_name = lora_name.replace(" ", "_").lower() | |
| # # Load base config | |
| # config = { | |
| # "config": { | |
| # "name": slugged_lora_name, | |
| # "process": [ | |
| # { | |
| # "model": { | |
| # "low_vram": low_vram, | |
| # "is_flux": True, | |
| # "quantize": True, | |
| # "name_or_path": "black-forest-labs/FLUX.1-dev" | |
| # }, | |
| # "network": { | |
| # "linear": rank, | |
| # "linear_alpha": rank, | |
| # "type": "lora" | |
| # }, | |
| # "train": { | |
| # "steps": steps, | |
| # "lr": lr, | |
| # "skip_first_sample": True, | |
| # "batch_size": 1, | |
| # "dtype": "bf16", | |
| # "gradient_accumulation_steps": 1, | |
| # "gradient_checkpointing": True, | |
| # "noise_scheduler": "flowmatch", | |
| # "optimizer": "adamw8bit", | |
| # "ema_config": { | |
| # "use_ema": True, | |
| # "ema_decay": 0.99 | |
| # } | |
| # }, | |
| # "datasets": [ | |
| # {"folder_path": dataset_folder} | |
| # ], | |
| # "save": { | |
| # "dtype": "float16", | |
| # "save_every": 10000, | |
| # "push_to_hub": push_to_hub, | |
| # "hf_repo_id": f"{username}/{slugged_lora_name}", | |
| # "hf_private": True, | |
| # "max_step_saves_to_keep": 4 | |
| # }, | |
| # "sample": { | |
| # "guidance_scale": 3.5, | |
| # "sample_every": steps, | |
| # "sample_steps": 28, | |
| # "width": 1024, | |
| # "height": 1024, | |
| # "walk_seed": True, | |
| # "seed": 42, | |
| # "sampler": "flowmatch", | |
| # "prompts": [p for p in [sample_1, sample_2, sample_3] if p] | |
| # }, | |
| # "trigger_word": concept_sentence | |
| # } | |
| # ] | |
| # } | |
| # } | |
| # # Apply advanced YAML overrides if any | |
| # if use_more_advanced_options and more_advanced_options: | |
| # advanced_config = yaml.safe_load(more_advanced_options) | |
| # config["config"]["process"][0] = recursive_update(config["config"]["process"][0], advanced_config) | |
| # # Save YAML config | |
| # os.makedirs("tmp_configs", exist_ok=True) | |
| # config_path = f"tmp_configs/{uuid.uuid4()}_{slugged_lora_name}.yaml" | |
| # with open(config_path, "w") as f: | |
| # yaml.dump(config, f) | |
| # # Simulate training | |
| # print(f"[INFO] Starting training with config: {config_path}") | |
| # print(json.dumps(config, indent=2)) | |
| # return f"Training started successfully with config: {config_path}" | |
| # # ========== MAIN ENDPOINT ========== | |
| # @app.post("/train-from-hf") | |
| # def auto_run_lora_from_repo(): | |
| # try: | |
| # local_dir = Path(f"/tmp/{LORA_NAME}-{uuid.uuid4()}") | |
| # os.makedirs(local_dir, exist_ok=True) | |
| # hf_hub_download( | |
| # repo_id=REPO_ID, | |
| # repo_type="dataset", | |
| # subfolder=FOLDER_IN_REPO, | |
| # local_dir=local_dir, | |
| # local_dir_use_symlinks=False, | |
| # force_download=False, | |
| # etag_timeout=10, | |
| # allow_patterns=["*.jpg", "*.png", "*.jpeg"], | |
| # ) | |
| # image_dir = local_dir / FOLDER_IN_REPO | |
| # image_paths = list(image_dir.rglob("*.jpg")) + list(image_dir.rglob("*.jpeg")) + list(image_dir.rglob("*.png")) | |
| # if not image_paths: | |
| # return JSONResponse(status_code=400, content={"error": "No images found in the HF repo folder."}) | |
| # captions = [ | |
| # f"Autogenerated caption for {img.stem} in the {CONCEPT_SENTENCE} [trigger]" for img in image_paths | |
| # ] | |
| # dataset_path = create_dataset(image_paths, *captions) | |
| # result = start_training( | |
| # lora_name=LORA_NAME, | |
| # concept_sentence=CONCEPT_SENTENCE, | |
| # steps=1000, | |
| # lr=4e-4, | |
| # rank=16, | |
| # model_to_train="dev", | |
| # low_vram=True, | |
| # dataset_folder=dataset_path, | |
| # sample_1=f"A stylized portrait using {CONCEPT_SENTENCE}", | |
| # sample_2=f"A cat in the {CONCEPT_SENTENCE}", | |
| # sample_3=f"A selfie processed in {CONCEPT_SENTENCE}", | |
| # use_more_advanced_options=True, | |
| # more_advanced_options=""" | |
| # training: | |
| # seed: 42 | |
| # precision: bf16 | |
| # batch_size: 2 | |
| # augmentation: | |
| # flip: true | |
| # color_jitter: true | |
| # """ | |
| # ) | |
| # return {"message": result} | |
| # except Exception as e: | |
| # return JSONResponse(status_code=500, content={"error": str(e)}) | |