from threading import Thread from copy import deepcopy from uuid import uuid4 from time import sleep import json import time import re import os def syncmethod(func): def function(*args, **kwargs): th = Thread(target=func, args=args, kwargs=kwargs) th.start() return function @syncmethod def interval(fc, t): while True: sleep(t) fc() class FileManager: def __init__(self, base_url: str, expires_minutes: int = 60): self.data_file = "collector_data.json" self.base_url = base_url.rstrip("/") self.expires_minutes = expires_minutes os.makedirs("fl", exist_ok=True) if not os.path.exists(self.data_file): self._write_json() else: self.check_files() # ------------------------------------------------------------------ def _format_path(self, path: str) -> str: p = re.sub(r'\s+', '-', path) p = re.sub(r'[!?#*Ççêwrong_char]', '', p) return p def _read_json(self) -> list: with open(self.data_file, "r") as f: return json.load(f) def _write_json(self, data: list = None): with open(self.data_file, "w") as f: json.dump(data or [], f, indent=2) # ------------------------------------------------------------------ def start(self, gap_minutes: int = 5): interval(self.check_files, gap_minutes * 60) def check_files(self): data = self._read_json() now = time.time() keep = [] for entry in data: if entry["expiresAt"] < now: try: os.remove(entry["path"]) except Exception as e: print(f"[cleanup] {e}") else: keep.append(entry) self._write_json(keep) # ------------------------------------------------------------------ def _revalidate(self, path: str): data = self._read_json() for entry in data: if entry["path"] == path: entry["expiresAt"] = time.time() + self.expires_minutes * 60 self._write_json(data) def _add_path(self, path: str) -> dict: new_path = self._format_path(path) if path != new_path: os.rename(path, new_path) data = self._read_json() data = [e for e in data if e["path"] != new_path] entry = { "path": new_path, "filename": new_path.split("/")[-1], "id": str(uuid4()), "expiresAt": time.time() + self.expires_minutes * 60, } data.append(entry) self._write_json(data) return entry # ------------------------------------------------------------------ def save_file(self, data: bytes, filename: str) -> dict: filename = self._format_path(filename) file_path = f"fl/{uuid4()}_{filename}" with open(file_path, "wb") as f: f.write(data) return self._add_path(file_path) def get_url(self, entry: dict) -> str: return f"{self.base_url}/file=./{entry['path']}" def get_entry_by_id(self, uid: str) -> dict | None: for entry in self._read_json(): if entry["id"] == uid: self._revalidate(entry["path"]) return entry return None