tt-ff / app.py
Arnold Manzano
Attempt to run downloader
8ac8653
import os
import subprocess
import uuid
import requests
import asyncio
import re
import cv2
from fastapi import FastAPI, Request, Response, Form, HTTPException, BackgroundTasks, File, UploadFile
from fastapi.responses import HTMLResponse, FileResponse
from PIL import Image
from io import BytesIO
from pathlib import Path
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from tt_router import router as tt_router
app = FastAPI()
app.include_router(tt_router)
cpu_semaphore = None
@app.on_event("startup")
async def startup_event():
global cpu_semaphore
cpu_semaphore = asyncio.Semaphore(1)
print("###### Semaphore initialized on the active event loop.")
running_and_queued_count = 0
def get_queue_status():
global running_and_queued_count
if running_and_queued_count == 0:
return "Idle"
waiting = max(0, running_and_queued_count - 1)
return f"Processing (plus {waiting} in queue)"
def generate_paths(url: str):
"""Generates unique input and output paths based on the URL extension."""
job_id = str(uuid.uuid4())[:8]
ext = url.split('.')[-1].split('?')[0]
if ext.lower() not in ['png', 'jpg', 'jpeg', 'webp', 'mp4']:
ext = 'jpg'
target_path = OUTPUT_DIR / f"input_{job_id}.{ext}"
output_path = OUTPUT_DIR / f"output_{job_id}.{ext}"
return target_path, output_path
# Configuration
SOURCE_FACE = "source.jpg"
OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True)
EXTENSIONS = ("*.png", "*.jpg", "*.jpeg", "*.webp", "*.mp4")
def run_facefusion(target_path: Path, output_path: Path):
"""The actual CPU-heavy execution"""
cmd = [
"python", "facefusion.py", "headless-run",
"--source-paths", SOURCE_FACE,
"--target-path", str(target_path),
"--output-path", str(output_path)
]
print(f"###### Executing: {' '.join(cmd)}")
try:
# We use subprocess.run here because it's called inside to_thread
subprocess.run(cmd, check=True)
# # Optional: Delete the input file to save space after processing
# if target_path.exists():
# target_path.unlink()
except Exception as e:
print(f"###### Error during processing: {e}")
async def background_worker(url: str, target_path: Path, output_path: Path, headers: dict):
"""Handles queuing, downloading, and processing"""
global running_and_queued_count
try:
async with cpu_semaphore:
print(f"###### Starting processing for {url}")
try:
response = requests.get(url, headers=headers, timeout=30, stream=True)
response.raise_for_status()
with open(target_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
except Exception as e:
print(f"###### Download failed in background: {e}")
return
await asyncio.to_thread(run_facefusion, target_path, output_path)
print(f"###### Done processing for {url}")
finally:
running_and_queued_count -= 1
@app.get("/", response_class=HTMLResponse)
async def index():
status = get_queue_status()
files = []
for ext in EXTENSIONS:
files.extend(OUTPUT_DIR.glob(ext))
files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
cards_html = []
for f in files:
cards_html.append(f"""
<div class="card">
<a href="/download/{f.name}" target="_blank">
<img src="/thumbnail/{f.name}" loading="lazy" alt="{f.name}">
<div class="filename">{f.name}</div>
</a>
</div>
""")
html_content = f"""
<html>
<head>
<title>FaceFusion Background Worker</title>
<style>
body {{ font-family: 'Segoe UI', sans-serif; margin: 40px; background: #f8f9fa; color: #333; }}
.status-bar {{ background: white; padding: 15px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.05); margin-bottom: 20px; }}
.grid {{
display: grid;
grid-template-columns: repeat(auto-fill, minmax(180px, 1fr));
gap: 20px;
}}
.card {{
background: white; border-radius: 10px; overflow: hidden;
box-shadow: 0 4px 6px rgba(0,0,0,0.1); transition: transform 0.2s;
}}
.card:hover {{ transform: translateY(-5px); }}
.card img {{ width: 100%; height: 140px; object-fit: cover; display: block; }}
.card a {{ text-decoration: none; color: inherit; }}
.filename {{ padding: 10px; font-size: 12px; font-weight: bold; text-align: center; word-break: break-all; }}
button {{ background: #007bff; color: white; border: none; padding: 10px 20px; border-radius: 5px; cursor: pointer; }}
</style>
</head>
<body>
<h1>FaceSwap Background Processor</h1>
<form action="/process" method="post">
<input type="text" name="url" placeholder="Paste Image/Video URL..." style="width:300px;" required>
<button type="submit">Start Swapping</button>
</form>
<form action="/process-bulk" method="post">
<textarea name="urls_text" rows="8" placeholder="Paste multiple links here (one per line)..." style="width:100%; max-width:500px; padding:10px; border-radius:8px; border:1.5px solid #ccc;"></textarea>
<br><br>
<button type="submit">Queue All Links</button>
</form>
<form action="/process-gallery" method="post">
<input type="text" name="page_url" placeholder="Paste Gallery Page URL..." required>
<button type="submit">Bulk Swap Gallery</button>
</form>
<form action="/upload" method="post" enctype="multipart/form-data">
<input type="file" name="file" accept="video/mp4,image/*" required>
<button type="submit">Upload & Swap</button>
</form>
<p><strong>System Status:</strong> {status}</p>
<hr>
<p><a href="/tt">TT page</a></p>
<hr>
<h2>Processed Files</h2>
<p><button onclick="location.reload()">Refresh List</button></p>
<div class="grid">
{"".join(cards_html)}
</div>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.get("/thumbnail/{filename}")
async def get_thumbnail(filename: str):
file_path = OUTPUT_DIR / filename
if not file_path.exists():
return Response(status_code=404)
# 1. Extract Frame/Image
if file_path.suffix.lower() in ['.mp4', '.mov', '.avi']:
cap = cv2.VideoCapture(str(file_path))
success, frame = cap.read()
cap.release()
if not success:
return Response(status_code=404)
img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
else:
img = Image.open(file_path)
img.thumbnail((300, 300))
buffer = BytesIO()
img.save(buffer, format="JPEG", quality=80)
return Response(content=buffer.getvalue(), media_type="image/jpeg")
@app.post("/process")
async def process_swap(background_tasks: BackgroundTasks, url: str = Form(...)):
global running_and_queued_count
target_path, output_path = generate_paths(url)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,video/mp4,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
}
print(f"###### Queueing job {url}")
running_and_queued_count += 1
background_tasks.add_task(background_worker, url, target_path, output_path, headers)
return HTMLResponse(content=f"""
<h1>Job Submitted!</h1>
<p>Your file is being processed in the background.</p>
<p>It will appear in the <a href="/">list</a> in a few moments.</p>
<a href="/">Back to Home</a>
""")
@app.post("/process-bulk")
async def process_bulk(background_tasks: BackgroundTasks, urls_text: str = Form(...)):
global running_and_queued_count
# Split by newline, strip spaces, and remove empty strings
urls = [line.strip() for line in urls_text.splitlines() if line.strip()]
if not urls:
return HTMLResponse(content="<h1>No URLs provided</h1><a href='/'>Back</a>")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
}
count = 0
for url in urls:
target_path, output_path = generate_paths(url)
print(f"###### Queueing bulk job {url}")
running_and_queued_count += 1
background_tasks.add_task(background_worker, url, target_path, output_path, headers)
count += 1
return HTMLResponse(content=f"""
<h1>Bulk Jobs Queued!</h1>
<p>Added <strong>{count}</strong> URLs to the processing queue.</p>
<a href="/">Back to Home</a>
""")
@app.post("/process-gallery")
async def process_gallery(background_tasks: BackgroundTasks, page_url: str = Form(...)):
global running_and_queued_count
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
}
# reconstruct url
match = re.search(r'post-(\d+)', page_url)
if match:
post_id = match.group(1)
parsed = urlparse(page_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
page_url = f"{base_url}/posts/{post_id}/show"
try:
response = requests.get(page_url, headers=headers, timeout=15)
response.raise_for_status()
except Exception as e:
raise HTTPException(status_code=400, detail=f"Could not reach Page: {e}")
soup = BeautifulSoup(response.text, 'html.parser')
urls_to_process = set()
for a in soup.select("a.js-main-image-link"):
href = a.get('href')
if href:
urls_to_process.add(href)
for img in soup.select("img.img-front"):
src = img.get('data-src')
if src:
urls_to_process.add(src)
for img in soup.select(".js-lbImage"):
src = img.get('data-src') or img.get('href')
if src:
urls_to_process.add(src)
if not urls_to_process:
return HTMLResponse(content="<h1>No images found</h1><p>Tried both selectors.</p><a href='/'>Back</a>")
count = 0
for image_url in urls_to_process:
target_path, output_path = generate_paths(image_url)
print(f"###### Queueing job {image_url}")
running_and_queued_count += 1
background_tasks.add_task(background_worker, image_url, target_path, output_path, headers)
count += 1
return HTMLResponse(content=f"""
<h1>Batch Started!</h1>
<p>Found <strong>{count}</strong> images to process.</p>
<p>The system will churn through them one by one.</p>
<a href="/">Go to Home to see progress</a>
""")
async def background_worker_local(target_path: Path, output_path: Path):
"""Handles queuing and processing for files already on disk"""
global running_and_queued_count
try:
async with cpu_semaphore:
print(f"###### Starting processing for local file: {target_path}")
await asyncio.to_thread(run_facefusion, target_path, output_path)
print(f"###### Done processing local file")
finally:
running_and_queued_count -= 1
@app.post("/upload")
async def upload_file(
background_tasks: BackgroundTasks,
file: UploadFile = File(...)
):
global running_and_queued_count
# Generate unique ID for this upload
job_id = str(uuid.uuid4())[:8]
ext = file.filename.split('.')[-1]
# Create paths
target_path = OUTPUT_DIR / f"input_{job_id}.{ext}"
output_path = OUTPUT_DIR / f"output_{job_id}.{ext}"
# Save the uploaded file to disk
try:
with open(target_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Could not save file: {e}")
print(f"###### Received upload: {file.filename}, queueing as {job_id}")
# Increment queue and add task
# Note: We pass None for headers since we don't need to download anything
running_and_queued_count += 1
background_tasks.add_task(background_worker_local, target_path, output_path)
return HTMLResponse(content=f"""
<h1>Upload Successful!</h1>
<p>Your file <strong>{file.filename}</strong> is in the queue.</p>
<a href="/">Back to Home</a>
""")
@app.get("/download/{filename}")
async def download_file(filename: str):
file_path = OUTPUT_DIR / filename
print(f"###### Searching for file at: {file_path.absolute()}")
if file_path.exists():
return FileResponse(path=file_path)
raise HTTPException(status_code=404, detail="File not found.")