fcetool / app.py
GitHub Action Deploy
Auto-deploy from api/ folder (61d877d)
e241afb
import os
import time
import re
from urllib.parse import urlparse
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from asyncio import Semaphore
import asyncio
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from huggingface_hub import HfApi
import requests
import firmware_content_extractor as fce
def get_real_ip(request: Request):
forwarded_for = request.headers.get("x-forwarded-for")
if forwarded_for:
return forwarded_for.split(",")[0]
return request.client.host
limiter = Limiter(key_func=get_real_ip)
app = FastAPI()
app.state.limiter = limiter
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
)
SUPPORTED_TARGETS = {"boot.img","init_boot.img","dtbo.img","super_empty.img","vbmeta.img","vendor_boot.img","vendor_kernel_boot.img","preloader.img","recovery.img","logo.img","abl.img","hyp.img","modem.img","tz.img","xbl.img","lk.img","tee.img","md1img.img","preloader_emmc.img","preloader_raw.img","preloader_ufs.img","vbmeta_system.img","vbmeta_vendor.img","system_dlkm.img","vendor_dlkm.img","aop.img","aop_config.img","bluetooth.img","cpucp.img","cpucp_dtb.img","devcfg.img","dsp.img","featenabler.img","imagefv.img","keymaster.img","qupfw.img","shrm.img","uefi.img","uefisecapp.img","xbl_config.img","xbl_ramdump.img","scp.img","spmfw.img","sspm.img"}
extraction_semaphore = Semaphore(4)
TEMP_DIR = "/tmp/extracted"
os.makedirs(TEMP_DIR, exist_ok=True)
HF_TOKEN = os.getenv("HF_TOKEN")
DATASET_REPO = "offici5l/fcetool"
if HF_TOKEN:
hf_api = HfApi(token=HF_TOKEN)
else:
hf_api = None
def sanitize_path(path: str) -> str:
path = re.sub(r'[<>:"|?*]', '_', path)
path = path.replace(' ', '_')
return path
def generate_storage_path(url: str) -> str:
parsed = urlparse(url)
domain = parsed.netloc
path = parsed.path.lstrip('/')
if path.endswith('.zip'):
path = path[:-4]
path = sanitize_path(path)
domain = sanitize_path(domain)
full_path = f"{domain}/{path}"
return full_path
def check_file_in_dataset(storage_path: str, target: str) -> bool:
if not hf_api:
return False
try:
path_in_repo = f"{storage_path}/{target}"
exists = hf_api.file_exists(
repo_id=DATASET_REPO,
filename=path_in_repo,
repo_type="dataset"
)
return exists
except Exception:
try:
url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{storage_path}/{target}"
response = requests.get(url, stream=True, timeout=5)
response.close()
return response.status_code == 200
except:
return False
async def upload_to_dataset(file_path: str, storage_path: str, target: str) -> str:
if not hf_api:
raise Exception("HF_TOKEN not configured")
path_in_repo = f"{storage_path}/{target}"
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: hf_api.upload_file(
path_or_fileobj=file_path,
path_in_repo=path_in_repo,
repo_id=DATASET_REPO,
repo_type="dataset",
commit_message=f"Add {target} from {storage_path}"
)
)
download_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{path_in_repo}"
return download_url
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={
"status": "error",
"message": "Rate limit exceeded. Please try again later."
}
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={
"status": "error",
"message": exc.detail
}
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={
"status": "error",
"message": f"Internal Server Error: {str(exc)}"
}
)
@app.post("/extract")
@limiter.limit("3/minute")
async def extract_target(request: Request, payload: dict):
if extraction_semaphore.locked():
return JSONResponse(
status_code=429,
content={
"status": "error",
"message": "Server is at full capacity. Please try again in 1-2 minutes."
}
)
url = payload.get("url")
target = payload.get("target")
if not url or not target:
return JSONResponse(
status_code=400,
content={
"status": "error",
"message": "Missing 'url' or 'target' parameter in JSON body."
}
)
if target not in SUPPORTED_TARGETS:
return JSONResponse(
status_code=400,
content={
"status": "error",
"message": f"Unsupported target type. Supported: {', '.join(SUPPORTED_TARGETS)}"
}
)
start_time = time.time()
storage_path = generate_storage_path(url)
if hf_api and check_file_in_dataset(storage_path, target):
cache_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{storage_path}/{target}"
return JSONResponse(
status_code=200,
content={
"status": "cached",
"message": "File already exists in dataset (from cache)",
"download_url": cache_url,
"target": target,
"duration_seconds": int(time.time() - start_time)
}
)
folder_name = storage_path.replace('/', '_')
out_dir = os.path.join(TEMP_DIR, folder_name)
raw_file_path = os.path.normpath(os.path.join(out_dir, target))
os.makedirs(out_dir, exist_ok=True)
try:
async with extraction_semaphore:
result = await fce.extract_async(url, target, out_dir)
if result.get("success") and os.path.exists(raw_file_path):
if hf_api:
try:
download_url = await upload_to_dataset(raw_file_path, storage_path, target)
os.remove(raw_file_path)
if os.path.exists(out_dir) and not os.listdir(out_dir):
os.rmdir(out_dir)
end_time = time.time()
duration = int(end_time - start_time)
return JSONResponse(
status_code=200,
content={
"status": "completed",
"message": "Extraction completed and uploaded to dataset",
"download_url": download_url,
"target": target,
"duration_seconds": duration
}
)
except Exception as upload_error:
if os.path.exists(raw_file_path):
os.remove(raw_file_path)
if os.path.exists(out_dir) and not os.listdir(out_dir):
os.rmdir(out_dir)
return JSONResponse(
status_code=500,
content={
"status": "failed",
"message": f"Upload to dataset failed: {str(upload_error)}",
"duration_seconds": int(time.time() - start_time)
}
)
else:
if os.path.exists(raw_file_path):
os.remove(raw_file_path)
if os.path.exists(out_dir) and not os.listdir(out_dir):
os.rmdir(out_dir)
return JSONResponse(
status_code=500,
content={
"status": "failed",
"message": "HF_TOKEN not configured. Cannot upload to dataset.",
"duration_seconds": int(time.time() - start_time)
}
)
else:
if os.path.exists(out_dir) and not os.listdir(out_dir):
os.rmdir(out_dir)
return JSONResponse(
status_code=400,
content={
"status": "failed",
"message": result.get("error", "Extraction failed"),
"duration_seconds": int(time.time() - start_time)
}
)
except Exception as e:
if os.path.exists(out_dir) and not os.listdir(out_dir):
os.rmdir(out_dir)
return JSONResponse(
status_code=500,
content={
"status": "error",
"message": str(e)
}
)
@app.get("/files/{storage_path:path}/{target}")
async def get_file_info(storage_path: str, target: str):
if check_file_in_dataset(storage_path, target):
download_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{storage_path}/{target}"
return JSONResponse(
status_code=200,
content={
"status": "exists",
"message": "File found in dataset",
"download_url": download_url,
"target": target
}
)
else:
return JSONResponse(
status_code=404,
content={
"status": "error",
"message": "File not found in dataset"
}
)
@app.head("/health")
async def health_check():
return JSONResponse(
status_code=200,
content={
"status": "ok",
"message": "Service is healthy"
}
)
@app.get("/")
def home():
hf_status = "enabled" if hf_api else "disabled"
return JSONResponse(
status_code=200,
content={
"status": "online",
"message": "Service is running",
"mode": "Direct-Upload-to-Dataset",
"method": "POST /extract",
"dataset": DATASET_REPO,
"hf_integration": hf_status
}
)