|
|
import os |
|
|
import shutil |
|
|
from typing import Optional |
|
|
from fastapi import FastAPI, Request, Form, Query, HTTPException |
|
|
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.templating import Jinja2Templates |
|
|
import instaloader |
|
|
from instaloader import Post |
|
|
import uvicorn |
|
|
|
|
|
|
|
|
DOWNLOAD_DIR = "downloads" |
|
|
TEMPLATES_DIR = "templates" |
|
|
CHROMA_ENABLED = os.getenv("CHROMA_ENABLED", "0") == "1" |
|
|
|
|
|
os.makedirs(DOWNLOAD_DIR, exist_ok=True) |
|
|
app = FastAPI() |
|
|
templates = Jinja2Templates(directory=TEMPLATES_DIR) |
|
|
|
|
|
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
|
|
loader = instaloader.Instaloader(dirname_pattern=DOWNLOAD_DIR, download_comments=False, save_metadata=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def shortcode_from_url(url: str) -> str: |
|
|
if not url: |
|
|
raise ValueError("empty url") |
|
|
s = url.rstrip("/").split("/")[-1] |
|
|
if "?" in s: |
|
|
s = s.split("?")[0] |
|
|
return s |
|
|
|
|
|
def find_downloaded_video(shortcode: str) -> Optional[str]: |
|
|
|
|
|
for root, _, files in os.walk(DOWNLOAD_DIR): |
|
|
for f in files: |
|
|
if shortcode in f and f.lower().endswith((".mp4",)): |
|
|
return os.path.join(root, f) |
|
|
return None |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
def index(request: Request): |
|
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
@app.post("/api/download") |
|
|
def download_post_form(reel_url: str = Form(...)): |
|
|
return _download_and_respond(reel_url) |
|
|
|
|
|
@app.get("/api/download") |
|
|
def download_post_get(url: str = Query(...)): |
|
|
return _download_and_respond(url) |
|
|
|
|
|
def _download_and_respond(reel_url: str): |
|
|
try: |
|
|
shortcode = shortcode_from_url(reel_url) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=400, detail=f"Invalid URL: {e}") |
|
|
|
|
|
try: |
|
|
post = Post.from_shortcode(loader.context, shortcode) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=404, detail=f"Failed to fetch post: {e}") |
|
|
|
|
|
if not post.is_video: |
|
|
raise HTTPException(status_code=400, detail="Provided URL is not a video (Reel).") |
|
|
|
|
|
|
|
|
|
|
|
for root, _, files in os.walk(DOWNLOAD_DIR): |
|
|
for f in files: |
|
|
if shortcode in f: |
|
|
try: |
|
|
os.remove(os.path.join(root, f)) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
loader.download_post(post, target=DOWNLOAD_DIR) |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Download failed: {e}") |
|
|
|
|
|
video_path = find_downloaded_video(shortcode) |
|
|
if not video_path or not os.path.exists(video_path): |
|
|
raise HTTPException(status_code=500, detail="Video file not found after download.") |
|
|
|
|
|
filename = os.path.basename(video_path) |
|
|
return FileResponse(path=video_path, filename=filename, media_type="video/mp4") |
|
|
|
|
|
@app.get("/api/health") |
|
|
def health(): |
|
|
return JSONResponse({"status": "ok"}) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
uvicorn.run("app:app", host="0.0.0.0", port=7860, log_level="info") |
|
|
|