Spaces:
Sleeping
Sleeping
| from threading import Thread | |
| from copy import deepcopy | |
| from uuid import uuid4 | |
| from time import sleep | |
| import json | |
| import time | |
| import re | |
| import os | |
| def syncmethod(func): | |
| def function(*args, **kwargs): | |
| th = Thread(target=func, args=args, kwargs=kwargs) | |
| th.start() | |
| return function | |
| def interval(fc, t): | |
| while True: | |
| sleep(t) | |
| fc() | |
| class FileManager: | |
| def __init__(self, base_url: str, expires_minutes: int = 60): | |
| self.data_file = "collector_data.json" | |
| self.base_url = base_url.rstrip("/") | |
| self.expires_minutes = expires_minutes | |
| os.makedirs("fl", exist_ok=True) | |
| if not os.path.exists(self.data_file): | |
| self._write_json() | |
| else: | |
| self.check_files() | |
| # ------------------------------------------------------------------ | |
| def _format_path(self, path: str) -> str: | |
| p = re.sub(r'\s+', '-', path) | |
| p = re.sub(r'[!?#*Ççêwrong_char]', '', p) | |
| return p | |
| def _read_json(self) -> list: | |
| with open(self.data_file, "r") as f: | |
| return json.load(f) | |
| def _write_json(self, data: list = None): | |
| with open(self.data_file, "w") as f: | |
| json.dump(data or [], f, indent=2) | |
| # ------------------------------------------------------------------ | |
| def start(self, gap_minutes: int = 5): | |
| interval(self.check_files, gap_minutes * 60) | |
| def check_files(self): | |
| data = self._read_json() | |
| now = time.time() | |
| keep = [] | |
| for entry in data: | |
| if entry["expiresAt"] < now: | |
| try: | |
| os.remove(entry["path"]) | |
| except Exception as e: | |
| print(f"[cleanup] {e}") | |
| else: | |
| keep.append(entry) | |
| self._write_json(keep) | |
| # ------------------------------------------------------------------ | |
| def _revalidate(self, path: str): | |
| data = self._read_json() | |
| for entry in data: | |
| if entry["path"] == path: | |
| entry["expiresAt"] = time.time() + self.expires_minutes * 60 | |
| self._write_json(data) | |
| def _add_path(self, path: str) -> dict: | |
| new_path = self._format_path(path) | |
| if path != new_path: | |
| os.rename(path, new_path) | |
| data = self._read_json() | |
| data = [e for e in data if e["path"] != new_path] | |
| entry = { | |
| "path": new_path, | |
| "filename": new_path.split("/")[-1], | |
| "id": str(uuid4()), | |
| "expiresAt": time.time() + self.expires_minutes * 60, | |
| } | |
| data.append(entry) | |
| self._write_json(data) | |
| return entry | |
| # ------------------------------------------------------------------ | |
| def save_file(self, data: bytes, filename: str) -> dict: | |
| filename = self._format_path(filename) | |
| file_path = f"fl/{uuid4()}_{filename}" | |
| with open(file_path, "wb") as f: | |
| f.write(data) | |
| return self._add_path(file_path) | |
| def get_url(self, entry: dict) -> str: | |
| return f"{self.base_url}/file=./{entry['path']}" | |
| def get_entry_by_id(self, uid: str) -> dict | None: | |
| for entry in self._read_json(): | |
| if entry["id"] == uid: | |
| self._revalidate(entry["path"]) | |
| return entry | |
| return None | |