ORCA-ROBOT / app.py
SalexAI's picture
Update app.py
eb8307e verified
raw
history blame
22 kB
from fastapi import FastAPI, Request, Form, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import httpx
import json
import logging
import os
import asyncio
import secrets
import shutil
import tempfile
import html as html_escape_lib
from typing import Optional, List
from datetime import datetime, timezone
from huggingface_hub import HfApi, hf_hub_download
# ==================================================
# APP SETUP
# ==================================================
app = FastAPI()
logging.basicConfig(level=logging.INFO)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ==================================================
# CONFIG & CONSTANTS
# ==================================================
# Hugging Face Configuration
HF_TOKEN = os.environ.get("hf1")
HF_REPO_ID = "SalexAI/fundata-2.0"
HF_REPO_TYPE = "dataset"
# Division Configuration
TOTAL_DIVISIONS = 25
VALID_DIVISIONS = set(range(1, TOTAL_DIVISIONS + 1))
# Local Temporary Storage (for processing before upload)
TEMP_DIR = tempfile.mkdtemp()
# ==================================================
# ADMIN AUTH
# ==================================================
ADMIN_KEY = os.environ.get("ADMIN_KEY")
ADMIN_COOKIE = "admin_session"
ADMIN_SESSIONS = set()
def admin_enabled() -> bool:
return bool(ADMIN_KEY and str(ADMIN_KEY).strip())
def is_admin(req: Request) -> bool:
return req.cookies.get(ADMIN_COOKIE) in ADMIN_SESSIONS
def secure_equals(a: str, b: str) -> bool:
try:
return secrets.compare_digest(a.encode("utf-8"), b.encode("utf-8"))
except Exception:
return False
# ==================================================
# DEFAULT MAPS
# ==================================================
DEFAULT_ALBUM_PUBLISHERS = {
"B2c5n8hH4uWRoAW": "Alex Rose",
"B2c5yeZFhHXzdFg": "Central Space Program",
"B2cGI9HKKtaAF3T": "Sam Holden",
"B2c59UlCquMMGkJ": "Alex Rose",
"B2cJ0DiRHusi12z": "Alex Rose",
"B2c5ON9t3uz8kT7": "Cole Vandepoll",
}
DEFAULT_ALBUM_CATEGORIES = {
"B2c5n8hH4uWRoAW": "Fun",
"B2c5yeZFhHXzdFg": "Rockets",
"B2cGI9HKKtaAF3T": "Sam Content Library",
"B2c59UlCquMMGkJ": "Serious",
"B2cJ0DiRHusi12z": "Music",
"B2c5ON9t3uz8kT7": "Cole Content Creator",
}
ALBUM_PUBLISHERS = dict(DEFAULT_ALBUM_PUBLISHERS)
ALBUM_CATEGORIES = dict(DEFAULT_ALBUM_CATEGORIES)
# ==================================================
# LOCKS & STATE
# ==================================================
# We use in-memory cache for the index to avoid spamming HF API,
# but we write back to HF on changes.
INDEX_CACHE = {"videos": []}
CONFIG_CACHE = {}
DATA_LOCK = asyncio.Lock()
# ==================================================
# HTTP CLIENT
# ==================================================
async def get_client() -> httpx.AsyncClient:
if not hasattr(app.state, "client"):
app.state.client = httpx.AsyncClient(timeout=30.0)
return app.state.client
# ==================================================
# HUGGING FACE STORAGE HELPERS
# ==================================================
api = HfApi(token=HF_TOKEN)
def get_hf_url(path_in_repo: str) -> str:
"""Returns the direct download URL for a file in the dataset."""
return f"https://huggingface.co/datasets/{HF_REPO_ID}/resolve/main/{path_in_repo}"
async def sync_pull_json(filename: str, default: dict) -> dict:
"""Download JSON from HF. Returns default if not found."""
try:
# Run blocking HF call in thread
loop = asyncio.get_event_loop()
local_path = await loop.run_in_executor(
None,
lambda: hf_hub_download(
repo_id=HF_REPO_ID,
filename=filename,
repo_type=HF_REPO_TYPE,
token=HF_TOKEN,
local_dir=TEMP_DIR
)
)
with open(local_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logging.warning(f"Could not pull {filename} from HF (using default): {e}")
return default
async def sync_push_json(filename: str, data: dict):
"""Upload JSON to HF."""
if not HF_TOKEN:
logging.error("No HF_TOKEN set, cannot save data.")
return
try:
# Save to temp
temp_path = os.path.join(TEMP_DIR, filename)
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# Upload
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=filename,
repo_id=HF_REPO_ID,
repo_type=HF_REPO_TYPE,
commit_message=f"Update {filename}"
)
)
except Exception as e:
logging.error(f"Failed to push {filename} to HF: {e}")
async def sync_push_file(local_path: str, remote_path: str):
"""Upload media file to HF."""
if not HF_TOKEN:
return
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: api.upload_file(
path_or_fileobj=local_path,
path_in_repo=remote_path,
repo_id=HF_REPO_ID,
repo_type=HF_REPO_TYPE,
commit_message=f"Add media {remote_path}"
)
)
except Exception as e:
logging.error(f"Failed to push media {remote_path}: {e}")
# ==================================================
# STATE MANAGEMENT
# ==================================================
async def load_state_from_hf():
global INDEX_CACHE, CONFIG_CACHE, ALBUM_PUBLISHERS, ALBUM_CATEGORIES
async with DATA_LOCK:
INDEX_CACHE = await sync_pull_json("index.json", {"videos": []})
default_config = {
"album_publishers": dict(DEFAULT_ALBUM_PUBLISHERS),
"album_categories": dict(DEFAULT_ALBUM_CATEGORIES),
}
CONFIG_CACHE = await sync_pull_json("admin_config.json", default_config)
ALBUM_PUBLISHERS = dict(CONFIG_CACHE.get("album_publishers", {}))
ALBUM_CATEGORIES = dict(CONFIG_CACHE.get("album_categories", {}))
async def save_index():
# Caller should hold lock usually, but atomic write helps
await sync_push_json("index.json", INDEX_CACHE)
async def save_config():
async with DATA_LOCK:
await sync_push_json("admin_config.json", CONFIG_CACHE)
# ==================================================
# BACKFILL / UTILS
# ==================================================
async def backfill_index_categories():
async with DATA_LOCK:
changed = False
for v in INDEX_CACHE.get("videos", []):
token = v.get("source_album", "")
correct_category = ALBUM_CATEGORIES.get(token, "Uncategorized")
correct_publisher = ALBUM_PUBLISHERS.get(token, "Unknown")
if v.get("publisher") in (None, "", "Unknown") and correct_publisher != "Unknown":
v["publisher"] = correct_publisher
changed = True
if v.get("category") in (None, "", v.get("publisher")) or v.get("category") != correct_category:
v["category"] = correct_category
changed = True
# Ensure allowed_divs exists
if "allowed_divs" not in v:
v["allowed_divs"] = [] # Default to empty (Global)
changed = True
if changed:
await save_index()
logging.info("Backfilled metadata and division fields")
# ==================================================
# BASE62 & ICLOUD HELPERS
# ==================================================
BASE_62_MAP = {c: i for i, c in enumerate("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz")}
def base62_to_int(token: str) -> int:
n = 0
for c in token:
n = n * 62 + BASE_62_MAP[c]
return n
ICLOUD_HEADERS = {"Origin": "https://www.icloud.com", "Content-Type": "text/plain"}
ICLOUD_PAYLOAD = '{"streamCtag":null}'
async def get_base_url(token: str) -> str:
if token and token[0] == "A":
n = base62_to_int(token[1])
else:
n = base62_to_int(token[1:3])
return f"https://p{n:02d}-sharedstreams.icloud.com/{token}/sharedstreams/"
async def get_redirected_base_url(base_url: str, token: str) -> str:
client = await get_client()
r = await client.post(f"{base_url}webstream", headers=ICLOUD_HEADERS, data=ICLOUD_PAYLOAD, follow_redirects=False)
if r.status_code == 330:
host = r.json().get("X-Apple-MMe-Host")
return f"https://{host}/{token}/sharedstreams/"
if r.status_code == 200:
return base_url
r.raise_for_status()
async def post_json(path: str, base_url: str, payload: str) -> dict:
client = await get_client()
r = await client.post(f"{base_url}{path}", headers=ICLOUD_HEADERS, data=payload)
r.raise_for_status()
return r.json()
async def get_metadata(base_url: str) -> list:
return (await post_json("webstream", base_url, ICLOUD_PAYLOAD)).get("photos", [])
async def get_asset_urls(base_url: str, guids: list) -> dict:
payload = json.dumps({"photoGuids": guids})
return (await post_json("webasseturls", base_url, payload)).get("items", {})
async def download_to_temp(url: str, filename: str) -> str:
path = os.path.join(TEMP_DIR, filename)
client = await get_client()
async with client.stream("GET", url) as r:
r.raise_for_status()
with open(path, "wb") as f:
async for chunk in r.aiter_bytes():
f.write(chunk)
return path
# ==================================================
# POLL + INGEST ALBUM
# ==================================================
async def poll_album(token: str):
try:
base_url = await get_base_url(token)
base_url = await get_redirected_base_url(base_url, token)
metadata = await get_metadata(base_url)
guids = [p["photoGuid"] for p in metadata]
assets = await get_asset_urls(base_url, guids)
async with DATA_LOCK:
known = {v["id"] for v in INDEX_CACHE.get("videos", [])}
publisher = ALBUM_PUBLISHERS.get(token, "Unknown")
category = ALBUM_CATEGORIES.get(token, "Uncategorized")
new_entries = []
for photo in metadata:
if photo.get("mediaAssetType", "").lower() != "video":
continue
vid = photo["photoGuid"]
if vid in known:
continue
derivatives = photo.get("derivatives", {})
best = max(
(d for k, d in derivatives.items() if k.lower() != "posterframe"),
key=lambda d: int(d.get("fileSize") or 0),
default=None,
)
if not best or best["checksum"] not in assets:
continue
asset = assets[best["checksum"]]
video_url = f"https://{asset['url_location']}{asset['url_path']}"
# Download to temp then Push to HF
temp_vid = await download_to_temp(video_url, f"{vid}.mp4")
hf_vid_path = f"videos/{token}/{vid}.mp4"
await sync_push_file(temp_vid, hf_vid_path)
os.remove(temp_vid)
# Handle Thumbnail
hf_thumb_path = ""
pf = derivatives.get("PosterFrame")
if pf and pf.get("checksum") in assets:
pf_asset = assets[pf["checksum"]]
poster_url = f"https://{pf_asset['url_location']}{pf_asset['url_path']}"
temp_thumb = await download_to_temp(poster_url, f"{vid}.jpg")
hf_thumb_path = f"videos/{token}/{vid}.jpg"
await sync_push_file(temp_thumb, hf_thumb_path)
os.remove(temp_thumb)
new_entries.append({
"id": vid,
"name": photo.get("caption") or "Untitled",
"video_url": get_hf_url(hf_vid_path),
"thumbnail": get_hf_url(hf_thumb_path) if hf_thumb_path else "",
"upload_date": photo.get("creationDate") or datetime.now(timezone.utc).isoformat(),
"category": category,
"publisher": publisher,
"source_album": token,
"allowed_divs": [] # Default: Allowed for all
})
if new_entries:
async with DATA_LOCK:
INDEX_CACHE["videos"].extend(new_entries)
await save_index()
logging.info(f"Added {len(new_entries)} videos from {token}")
except Exception as e:
logging.error(f"Error polling {token}: {e}")
# ==================================================
# STARTUP
# ==================================================
@app.on_event("startup")
async def start_polling():
if not HF_TOKEN:
logging.error("CRITICAL: 'hf1' environment variable not set. Dataset storage will fail.")
await load_state_from_hf()
await backfill_index_categories()
async def loop():
while True:
tokens = list(ALBUM_PUBLISHERS.keys())
for token in tokens:
await poll_album(token)
await asyncio.sleep(60)
asyncio.create_task(loop())
# ==================================================
# FEED (With Div Support)
# ==================================================
@app.get("/feed/videos")
async def get_video_feed(div: Optional[int] = Query(None, description="School Division (1-25)")):
"""
Returns videos.
If 'div' is provided, returns:
1. Videos with NO specific allowed_divs (Global).
2. Videos where 'div' is explicitly in allowed_divs.
"""
async with DATA_LOCK:
videos = INDEX_CACHE.get("videos", [])
# If no div specified, return everything? Or everything that isn't restricted?
# Usually feed returns everything accessible.
if div is None:
return {"videos": videos}
# Validate Div
if div not in VALID_DIVISIONS:
# If invalid div provided, maybe return empty or error?
# For safety, let's return empty or just global ones.
return {"videos": [v for v in videos if not v.get("allowed_divs")]}
filtered_videos = []
for v in videos:
allowed = v.get("allowed_divs", [])
# Include if: No restrictions defined OR Div is in list
if not allowed or div in allowed:
filtered_videos.append(v)
return {"videos": filtered_videos}
# ==================================================
# ADMIN: LOGIN
# ==================================================
@app.get("/admin/login", response_class=HTMLResponse)
async def admin_login_page():
if not admin_enabled():
return HTMLResponse("Admin disabled (ADMIN_KEY missing)", status_code=503)
return """
<html><body style="background:#111;color:#fff;display:flex;justify-content:center;align-items:center;height:100vh;font-family:sans-serif;">
<form method="post" style="padding:20px;border:1px solid #333;border-radius:10px;background:#1a1a1a;">
<h2>Admin</h2><input type="password" name="key" placeholder="Key" style="padding:10px;width:100%;margin-bottom:10px;">
<button style="padding:10px;width:100%;cursor:pointer;">Login</button>
</form></body></html>
"""
@app.post("/admin/login")
async def admin_login(key: str = Form(...)):
if not admin_enabled() or not secure_equals(key.strip(), str(ADMIN_KEY).strip()):
return HTMLResponse("Unauthorized", status_code=401)
session = secrets.token_hex(16)
ADMIN_SESSIONS.add(session)
resp = RedirectResponse("/admin", status_code=302)
resp.set_cookie(ADMIN_COOKIE, session, httponly=True)
return resp
@app.get("/admin/logout")
async def admin_logout(req: Request):
if (s := req.cookies.get(ADMIN_COOKIE)) in ADMIN_SESSIONS: ADMIN_SESSIONS.remove(s)
return RedirectResponse("/admin/login")
# ==================================================
# ADMIN DASHBOARD
# ==================================================
def esc(v) -> str: return html_escape_lib.escape("" if v is None else str(v), quote=True)
ADMIN_TEMPLATE = """
<html>
<head>
<title>Admin</title>
<style>
body{font-family:sans-serif;background:#111;color:#ddd;padding:20px;}
table{width:100%;border-collapse:collapse;margin-top:20px;}
th,td{border-bottom:1px solid #333;padding:8px;text-align:left;}
input{background:#222;border:1px solid #444;color:#fff;padding:5px;border-radius:4px;width:100%;}
button{background:#444;color:#fff;border:none;padding:5px 10px;cursor:pointer;border-radius:4px;}
.pill{padding:4px 8px;background:#004400;border-radius:10px;font-size:0.8em;}
</style>
</head>
<body>
<h1>Admin Panel <a href="/admin/logout" style="font-size:0.5em;color:#888;">Logout</a></h1>
<h3>Albums</h3>
<table>
<tr><th>Token</th><th>Publisher</th><th>Category</th><th>Action</th></tr>
__ALBUM_ROWS__
<tr>
<td><input id="n_t" placeholder="Token"></td>
<td><input id="n_p" placeholder="Publisher"></td>
<td><input id="n_c" placeholder="Category"></td>
<td><button onclick="addAlbum()">Add</button></td>
</tr>
</table>
<h3>Videos</h3>
<p style="font-size:0.8em;color:#888;">Allowed Divs: Comma separated (e.g. <code>1,5,25</code>). Leave empty for ALL.</p>
<table>
<tr><th>ID</th><th>Name</th><th>Divs (1-25)</th><th>Category</th><th>Publisher</th><th>Action</th></tr>
__VIDEO_ROWS__
</table>
<script>
async function api(ep, data) {
await fetch(ep, {method:'POST', headers:{'Content-Type':'application/json'}, body:JSON.stringify(data)});
location.reload();
}
document.querySelectorAll('input[data-id]').forEach(i => {
i.addEventListener('change', (e) => api('/admin/update', {
id: e.target.dataset.id,
field: e.target.dataset.field,
value: e.target.value
}));
});
function addAlbum(){
api('/admin/albums/add', {
token: document.getElementById('n_t').value,
publisher: document.getElementById('n_p').value,
category: document.getElementById('n_c').value
});
}
</script>
</body>
</html>
"""
@app.get("/admin", response_class=HTMLResponse)
async def admin_dash(req: Request):
if not is_admin(req): return RedirectResponse("/admin/login")
async with DATA_LOCK:
videos = INDEX_CACHE.get("videos", [])
v_rows = ""
for v in videos:
divs = ",".join(map(str, v.get("allowed_divs", [])))
v_rows += f"""<tr>
<td>{esc(v['id'])[:8]}...</td>
<td><input data-id="{v['id']}" data-field="name" value="{esc(v.get('name'))}"></td>
<td><input data-id="{v['id']}" data-field="allowed_divs" value="{esc(divs)}" placeholder="All"></td>
<td><input data-id="{v['id']}" data-field="category" value="{esc(v.get('category'))}"></td>
<td><input data-id="{v['id']}" data-field="publisher" value="{esc(v.get('publisher'))}"></td>
<td><button onclick="api('/admin/videos/delete', {{id:'{v['id']}'}})" style="background:#500;">Del</button></td>
</tr>"""
a_rows = ""
for t, p in ALBUM_PUBLISHERS.items():
c = ALBUM_CATEGORIES.get(t, "")
a_rows += f"<tr><td>{t}</td><td>{p}</td><td>{c}</td><td>-</td></tr>"
return ADMIN_TEMPLATE.replace("__VIDEO_ROWS__", v_rows).replace("__ALBUM_ROWS__", a_rows)
# ==================================================
# ADMIN ACTIONS
# ==================================================
@app.post("/admin/update")
async def admin_update(req: Request, payload: dict):
if not is_admin(req): return JSONResponse({}, 403)
vid_id = payload.get("id")
field = payload.get("field")
value = payload.get("value")
async with DATA_LOCK:
for v in INDEX_CACHE["videos"]:
if v["id"] == vid_id:
if field == "allowed_divs":
# Parse comma string to list of ints
try:
if not value.strip():
v[field] = []
else:
# Filter only valid integers 1-25
nums = [int(x.strip()) for x in value.split(",") if x.strip().isdigit()]
v[field] = [n for n in nums if n in VALID_DIVISIONS]
except:
pass # Ignore bad input
else:
v[field] = value
await save_index()
return {"ok": True}
return {"error": "not found"}
@app.post("/admin/videos/delete")
async def admin_delete(req: Request, payload: dict):
if not is_admin(req): return JSONResponse({}, 403)
vid_id = payload.get("id")
async with DATA_LOCK:
# We only remove from index. We don't delete actual files from HF to avoid complexity
# (Git operations are heavy), but removing from index hides them from feed.
original_len = len(INDEX_CACHE["videos"])
INDEX_CACHE["videos"] = [v for v in INDEX_CACHE["videos"] if v["id"] != vid_id]
if len(INDEX_CACHE["videos"]) < original_len:
await save_index()
return {"ok": True}
@app.post("/admin/albums/add")
async def admin_album_add(req: Request, payload: dict):
if not is_admin(req): return JSONResponse({}, 403)
t = payload.get("token")
if t:
async with DATA_LOCK:
CONFIG_CACHE["album_publishers"][t] = payload.get("publisher", "Unknown")
CONFIG_CACHE["album_categories"][t] = payload.get("category", "Uncategorized")
global ALBUM_PUBLISHERS, ALBUM_CATEGORIES
ALBUM_PUBLISHERS = CONFIG_CACHE["album_publishers"]
ALBUM_CATEGORIES = CONFIG_CACHE["album_categories"]
await save_config()
return {"ok": True}