Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import re | |
| from typing import Dict, List | |
| from urllib.parse import quote, unquote, urlparse | |
| from urllib.error import HTTPError | |
| from urllib.request import Request, urlopen | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.responses import HTMLResponse, Response | |
| app = FastAPI() | |
| DEFAULT_GROUP_LINKS = [ | |
| "https://huggingface.co/spaces/CyberAl/Traffic-Tracker/tree/main/data", | |
| "https://huggingface.co/spaces/niangmariame513/traffic-monitor/tree/main/data", | |
| "https://huggingface.co/spaces/danielle2035/TRAFFIC_ROAD_APP/tree/main/data", | |
| "https://huggingface.co/spaces/Rafiatou/trafficvision-group10/tree/main/data", | |
| "https://huggingface.co/spaces/Binta26/computer_vision/tree/main/data", | |
| "https://huggingface.co/datasets/ccspoet/ProjectCV/tree/main/Data", | |
| "https://huggingface.co/spaces/AhmedSouley01/traffic-monitoring/tree/main/data", | |
| "https://huggingface.co/datasets/conde621gmail/dataset/tree/main", | |
| "https://huggingface.co/datasets/ioget/aims-traffic-cv-data/tree/main", | |
| "https://huggingface.co/datasets/luclintos/traffic_object_detection/tree/main", | |
| ] | |
| VIDEO_EXTENSIONS = (".mp4", ".webm", ".mov", ".avi", ".mkv", ".m4v") | |
| def configured_group_links() -> List[str]: | |
| raw = os.getenv("HF_GROUP_LINKS", "").strip() | |
| if not raw: | |
| return DEFAULT_GROUP_LINKS | |
| if raw.startswith("["): | |
| try: | |
| links = json.loads(raw) | |
| return [str(link).strip() for link in links if str(link).strip()] | |
| except json.JSONDecodeError as exc: | |
| raise HTTPException(status_code=500, detail=f"Invalid HF_GROUP_LINKS JSON: {exc}") | |
| return [link.strip() for link in re.split(r"[\n,]+", raw) if link.strip()] | |
| def parse_hf_tree_url(url: str) -> Dict[str, str]: | |
| parsed = urlparse(url.strip()) | |
| if parsed.netloc != "huggingface.co": | |
| raise HTTPException(status_code=400, detail="Only huggingface.co links are supported.") | |
| parts = [unquote(part) for part in parsed.path.strip("/").split("/") if part] | |
| if len(parts) < 5 or parts[3] != "tree": | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Expected a Hugging Face tree URL like https://huggingface.co/spaces/user/repo/tree/main/data", | |
| ) | |
| repo_kind = parts[0] | |
| if repo_kind not in {"spaces", "datasets"}: | |
| raise HTTPException(status_code=400, detail="Only Hugging Face spaces and datasets are supported.") | |
| owner = parts[1] | |
| repo = parts[2] | |
| revision = parts[4] | |
| path_parts = parts[5:] | |
| folder_path = "/".join(path_parts) or "" | |
| return { | |
| "repo_kind": repo_kind, | |
| "owner": owner, | |
| "repo": repo, | |
| "revision": revision, | |
| "folder_path": folder_path, | |
| "repo_id": f"{owner}/{repo}", | |
| } | |
| def hf_api_prefix(repo_kind: str) -> str: | |
| return "spaces" if repo_kind == "spaces" else "datasets" | |
| def hf_raw_prefix(repo_kind: str) -> str: | |
| return "spaces" if repo_kind == "spaces" else "datasets" | |
| def fetch_json(url: str): | |
| req = Request(url, headers={"User-Agent": "TrafficSense-dashboard"}) | |
| with urlopen(req, timeout=20) as response: | |
| return json.loads(response.read().decode("utf-8")) | |
| def discover_source(tree_url: str) -> Dict: | |
| info = parse_hf_tree_url(tree_url) | |
| api_path = quote(info["folder_path"], safe="/") | |
| repo_id = quote(info["repo_id"], safe="/") | |
| api_url = ( | |
| f"https://huggingface.co/api/{hf_api_prefix(info['repo_kind'])}/" | |
| f"{repo_id}/tree/{quote(info['revision'], safe='')}/{api_path}" | |
| ).rstrip("/") + "?recursive=1" | |
| warning = None | |
| try: | |
| items = fetch_json(api_url) | |
| except HTTPError as exc: | |
| if exc.code != 404: | |
| raise | |
| warning = f"Folder '{info['folder_path']}' was not found." | |
| items = [] | |
| files = [] | |
| for item in items: | |
| if item.get("type") != "file": | |
| continue | |
| file_path = item.get("path", "") | |
| lower_path = file_path.lower() | |
| raw_url = ( | |
| f"https://huggingface.co/{hf_raw_prefix(info['repo_kind'])}/{info['repo_id']}" | |
| f"/resolve/{quote(info['revision'], safe='')}/{quote(file_path, safe='/')}" | |
| ) | |
| kind = "other" | |
| if lower_path.endswith(".csv"): | |
| kind = "csv" | |
| elif lower_path.endswith(VIDEO_EXTENSIONS): | |
| kind = "video" | |
| files.append({ | |
| "name": file_path.split("/")[-1], | |
| "path": file_path, | |
| "size": item.get("size"), | |
| "kind": kind, | |
| "url": raw_url, | |
| }) | |
| return { | |
| "link": tree_url, | |
| "repo": info["repo_id"], | |
| "folder": info["folder_path"], | |
| "warning": warning, | |
| "csv_files": [file for file in files if file["kind"] == "csv"], | |
| "video_files": [file for file in files if file["kind"] == "video"], | |
| } | |
| def read_root(): | |
| with open("index.html", "r", encoding="utf-8") as f: | |
| return f.read() | |
| def group_sources(): | |
| sources = [] | |
| errors = [] | |
| for link in configured_group_links(): | |
| try: | |
| sources.append(discover_source(link)) | |
| except Exception as exc: | |
| errors.append({"link": link, "error": str(exc)}) | |
| return {"sources": sources, "errors": errors} | |
| def hf_source(url: str = Query(..., min_length=1)): | |
| return discover_source(url) | |
| def proxy(url: str = Query(..., min_length=1)): | |
| parsed = urlparse(url) | |
| if parsed.netloc != "huggingface.co" or "/resolve/" not in parsed.path: | |
| raise HTTPException(status_code=400, detail="Only Hugging Face resolve URLs can be proxied.") | |
| req = Request(url, headers={"User-Agent": "TrafficSense-dashboard"}) | |
| with urlopen(req, timeout=30) as response: | |
| content_type = response.headers.get("content-type", "application/octet-stream") | |
| return Response(content=response.read(), media_type=content_type) | |