import json import os import re from typing import Dict, List from urllib.parse import quote, unquote, urlparse from urllib.error import HTTPError from urllib.request import Request, urlopen from fastapi import FastAPI, HTTPException, Query from fastapi.responses import HTMLResponse, Response app = FastAPI() DEFAULT_GROUP_LINKS = [ "https://huggingface.co/spaces/CyberAl/Traffic-Tracker/tree/main/data", "https://huggingface.co/spaces/niangmariame513/traffic-monitor/tree/main/data", "https://huggingface.co/spaces/danielle2035/TRAFFIC_ROAD_APP/tree/main/data", "https://huggingface.co/spaces/Rafiatou/trafficvision-group10/tree/main/data", "https://huggingface.co/spaces/Binta26/computer_vision/tree/main/data", "https://huggingface.co/datasets/ccspoet/ProjectCV/tree/main/Data", "https://huggingface.co/spaces/AhmedSouley01/traffic-monitoring/tree/main/data", "https://huggingface.co/datasets/conde621gmail/dataset/tree/main", "https://huggingface.co/datasets/ioget/aims-traffic-cv-data/tree/main", "https://huggingface.co/datasets/luclintos/traffic_object_detection/tree/main", ] VIDEO_EXTENSIONS = (".mp4", ".webm", ".mov", ".avi", ".mkv", ".m4v") def configured_group_links() -> List[str]: raw = os.getenv("HF_GROUP_LINKS", "").strip() if not raw: return DEFAULT_GROUP_LINKS if raw.startswith("["): try: links = json.loads(raw) return [str(link).strip() for link in links if str(link).strip()] except json.JSONDecodeError as exc: raise HTTPException(status_code=500, detail=f"Invalid HF_GROUP_LINKS JSON: {exc}") return [link.strip() for link in re.split(r"[\n,]+", raw) if link.strip()] def parse_hf_tree_url(url: str) -> Dict[str, str]: parsed = urlparse(url.strip()) if parsed.netloc != "huggingface.co": raise HTTPException(status_code=400, detail="Only huggingface.co links are supported.") parts = [unquote(part) for part in parsed.path.strip("/").split("/") if part] if len(parts) < 5 or parts[3] != "tree": raise HTTPException( status_code=400, detail="Expected a Hugging Face tree URL like https://huggingface.co/spaces/user/repo/tree/main/data", ) repo_kind = parts[0] if repo_kind not in {"spaces", "datasets"}: raise HTTPException(status_code=400, detail="Only Hugging Face spaces and datasets are supported.") owner = parts[1] repo = parts[2] revision = parts[4] path_parts = parts[5:] folder_path = "/".join(path_parts) or "" return { "repo_kind": repo_kind, "owner": owner, "repo": repo, "revision": revision, "folder_path": folder_path, "repo_id": f"{owner}/{repo}", } def hf_api_prefix(repo_kind: str) -> str: return "spaces" if repo_kind == "spaces" else "datasets" def hf_raw_prefix(repo_kind: str) -> str: return "spaces" if repo_kind == "spaces" else "datasets" def fetch_json(url: str): req = Request(url, headers={"User-Agent": "TrafficSense-dashboard"}) with urlopen(req, timeout=20) as response: return json.loads(response.read().decode("utf-8")) def discover_source(tree_url: str) -> Dict: info = parse_hf_tree_url(tree_url) api_path = quote(info["folder_path"], safe="/") repo_id = quote(info["repo_id"], safe="/") api_url = ( f"https://huggingface.co/api/{hf_api_prefix(info['repo_kind'])}/" f"{repo_id}/tree/{quote(info['revision'], safe='')}/{api_path}" ).rstrip("/") + "?recursive=1" warning = None try: items = fetch_json(api_url) except HTTPError as exc: if exc.code != 404: raise warning = f"Folder '{info['folder_path']}' was not found." items = [] files = [] for item in items: if item.get("type") != "file": continue file_path = item.get("path", "") lower_path = file_path.lower() raw_url = ( f"https://huggingface.co/{hf_raw_prefix(info['repo_kind'])}/{info['repo_id']}" f"/resolve/{quote(info['revision'], safe='')}/{quote(file_path, safe='/')}" ) kind = "other" if lower_path.endswith(".csv"): kind = "csv" elif lower_path.endswith(VIDEO_EXTENSIONS): kind = "video" files.append({ "name": file_path.split("/")[-1], "path": file_path, "size": item.get("size"), "kind": kind, "url": raw_url, }) return { "link": tree_url, "repo": info["repo_id"], "folder": info["folder_path"], "warning": warning, "csv_files": [file for file in files if file["kind"] == "csv"], "video_files": [file for file in files if file["kind"] == "video"], } @app.get("/", response_class=HTMLResponse) def read_root(): with open("index.html", "r", encoding="utf-8") as f: return f.read() @app.get("/api/group-sources") def group_sources(): sources = [] errors = [] for link in configured_group_links(): try: sources.append(discover_source(link)) except Exception as exc: errors.append({"link": link, "error": str(exc)}) return {"sources": sources, "errors": errors} @app.get("/api/hf-source") def hf_source(url: str = Query(..., min_length=1)): return discover_source(url) @app.get("/api/proxy") def proxy(url: str = Query(..., min_length=1)): parsed = urlparse(url) if parsed.netloc != "huggingface.co" or "/resolve/" not in parsed.path: raise HTTPException(status_code=400, detail="Only Hugging Face resolve URLs can be proxied.") req = Request(url, headers={"User-Agent": "TrafficSense-dashboard"}) with urlopen(req, timeout=30) as response: content_type = response.headers.get("content-type", "application/octet-stream") return Response(content=response.read(), media_type=content_type)