import os import subprocess import uuid import requests import asyncio import re import cv2 from fastapi import FastAPI, Request, Response, Form, HTTPException, BackgroundTasks, File, UploadFile from fastapi.responses import HTMLResponse, FileResponse from PIL import Image from io import BytesIO from pathlib import Path from bs4 import BeautifulSoup from urllib.parse import urlparse from tt_router import router as tt_router app = FastAPI() app.include_router(tt_router) cpu_semaphore = None @app.on_event("startup") async def startup_event(): global cpu_semaphore cpu_semaphore = asyncio.Semaphore(1) print("###### Semaphore initialized on the active event loop.") running_and_queued_count = 0 def get_queue_status(): global running_and_queued_count if running_and_queued_count == 0: return "Idle" waiting = max(0, running_and_queued_count - 1) return f"Processing (plus {waiting} in queue)" def generate_paths(url: str): """Generates unique input and output paths based on the URL extension.""" job_id = str(uuid.uuid4())[:8] ext = url.split('.')[-1].split('?')[0] if ext.lower() not in ['png', 'jpg', 'jpeg', 'webp', 'mp4']: ext = 'jpg' target_path = OUTPUT_DIR / f"input_{job_id}.{ext}" output_path = OUTPUT_DIR / f"output_{job_id}.{ext}" return target_path, output_path # Configuration SOURCE_FACE = "source.jpg" OUTPUT_DIR = Path("outputs") OUTPUT_DIR.mkdir(exist_ok=True) EXTENSIONS = ("*.png", "*.jpg", "*.jpeg", "*.webp", "*.mp4") def run_facefusion(target_path: Path, output_path: Path): """The actual CPU-heavy execution""" cmd = [ "python", "facefusion.py", "headless-run", "--source-paths", SOURCE_FACE, "--target-path", str(target_path), "--output-path", str(output_path) ] print(f"###### Executing: {' '.join(cmd)}") try: # We use subprocess.run here because it's called inside to_thread subprocess.run(cmd, check=True) # # Optional: Delete the input file to save space after processing # if target_path.exists(): # target_path.unlink() except Exception as e: print(f"###### Error during processing: {e}") async def background_worker(url: str, target_path: Path, output_path: Path, headers: dict): """Handles queuing, downloading, and processing""" global running_and_queued_count try: async with cpu_semaphore: print(f"###### Starting processing for {url}") try: response = requests.get(url, headers=headers, timeout=30, stream=True) response.raise_for_status() with open(target_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) except Exception as e: print(f"###### Download failed in background: {e}") return await asyncio.to_thread(run_facefusion, target_path, output_path) print(f"###### Done processing for {url}") finally: running_and_queued_count -= 1 @app.get("/", response_class=HTMLResponse) async def index(): status = get_queue_status() files = [] for ext in EXTENSIONS: files.extend(OUTPUT_DIR.glob(ext)) files.sort(key=lambda x: x.stat().st_mtime, reverse=True) cards_html = [] for f in files: cards_html.append(f"""
{f.name}
{f.name}
""") html_content = f""" FaceFusion Background Worker

FaceSwap Background Processor



System Status: {status}


TT page


Processed Files

{"".join(cards_html)}
""" return HTMLResponse(content=html_content) @app.get("/thumbnail/{filename}") async def get_thumbnail(filename: str): file_path = OUTPUT_DIR / filename if not file_path.exists(): return Response(status_code=404) # 1. Extract Frame/Image if file_path.suffix.lower() in ['.mp4', '.mov', '.avi']: cap = cv2.VideoCapture(str(file_path)) success, frame = cap.read() cap.release() if not success: return Response(status_code=404) img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) else: img = Image.open(file_path) img.thumbnail((300, 300)) buffer = BytesIO() img.save(buffer, format="JPEG", quality=80) return Response(content=buffer.getvalue(), media_type="image/jpeg") @app.post("/process") async def process_swap(background_tasks: BackgroundTasks, url: str = Form(...)): global running_and_queued_count target_path, output_path = generate_paths(url) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,video/mp4,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Connection": "keep-alive", } print(f"###### Queueing job {url}") running_and_queued_count += 1 background_tasks.add_task(background_worker, url, target_path, output_path, headers) return HTMLResponse(content=f"""

Job Submitted!

Your file is being processed in the background.

It will appear in the list in a few moments.

Back to Home """) @app.post("/process-bulk") async def process_bulk(background_tasks: BackgroundTasks, urls_text: str = Form(...)): global running_and_queued_count # Split by newline, strip spaces, and remove empty strings urls = [line.strip() for line in urls_text.splitlines() if line.strip()] if not urls: return HTMLResponse(content="

No URLs provided

Back") headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", } count = 0 for url in urls: target_path, output_path = generate_paths(url) print(f"###### Queueing bulk job {url}") running_and_queued_count += 1 background_tasks.add_task(background_worker, url, target_path, output_path, headers) count += 1 return HTMLResponse(content=f"""

Bulk Jobs Queued!

Added {count} URLs to the processing queue.

Back to Home """) @app.post("/process-gallery") async def process_gallery(background_tasks: BackgroundTasks, page_url: str = Form(...)): global running_and_queued_count headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", } # reconstruct url match = re.search(r'post-(\d+)', page_url) if match: post_id = match.group(1) parsed = urlparse(page_url) base_url = f"{parsed.scheme}://{parsed.netloc}" page_url = f"{base_url}/posts/{post_id}/show" try: response = requests.get(page_url, headers=headers, timeout=15) response.raise_for_status() except Exception as e: raise HTTPException(status_code=400, detail=f"Could not reach Page: {e}") soup = BeautifulSoup(response.text, 'html.parser') urls_to_process = set() for a in soup.select("a.js-main-image-link"): href = a.get('href') if href: urls_to_process.add(href) for img in soup.select("img.img-front"): src = img.get('data-src') if src: urls_to_process.add(src) for img in soup.select(".js-lbImage"): src = img.get('data-src') or img.get('href') if src: urls_to_process.add(src) if not urls_to_process: return HTMLResponse(content="

No images found

Tried both selectors.

Back") count = 0 for image_url in urls_to_process: target_path, output_path = generate_paths(image_url) print(f"###### Queueing job {image_url}") running_and_queued_count += 1 background_tasks.add_task(background_worker, image_url, target_path, output_path, headers) count += 1 return HTMLResponse(content=f"""

Batch Started!

Found {count} images to process.

The system will churn through them one by one.

Go to Home to see progress """) async def background_worker_local(target_path: Path, output_path: Path): """Handles queuing and processing for files already on disk""" global running_and_queued_count try: async with cpu_semaphore: print(f"###### Starting processing for local file: {target_path}") await asyncio.to_thread(run_facefusion, target_path, output_path) print(f"###### Done processing local file") finally: running_and_queued_count -= 1 @app.post("/upload") async def upload_file( background_tasks: BackgroundTasks, file: UploadFile = File(...) ): global running_and_queued_count # Generate unique ID for this upload job_id = str(uuid.uuid4())[:8] ext = file.filename.split('.')[-1] # Create paths target_path = OUTPUT_DIR / f"input_{job_id}.{ext}" output_path = OUTPUT_DIR / f"output_{job_id}.{ext}" # Save the uploaded file to disk try: with open(target_path, "wb") as buffer: content = await file.read() buffer.write(content) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not save file: {e}") print(f"###### Received upload: {file.filename}, queueing as {job_id}") # Increment queue and add task # Note: We pass None for headers since we don't need to download anything running_and_queued_count += 1 background_tasks.add_task(background_worker_local, target_path, output_path) return HTMLResponse(content=f"""

Upload Successful!

Your file {file.filename} is in the queue.

Back to Home """) @app.get("/download/{filename}") async def download_file(filename: str): file_path = OUTPUT_DIR / filename print(f"###### Searching for file at: {file_path.absolute()}") if file_path.exists(): return FileResponse(path=file_path) raise HTTPException(status_code=404, detail="File not found.")