| from threading import Thread |
| from copy import deepcopy |
| from uuid import uuid4 |
| from time import sleep |
| import json |
| import time |
| import re |
| import os |
|
|
|
|
| def syncmethod(func): |
| def function(*args, **kwargs): |
| th = Thread(target=func, args=args, kwargs=kwargs) |
| th.start() |
| return function |
|
|
|
|
| @syncmethod |
| def interval(fc, t): |
| while True: |
| sleep(t) |
| fc() |
|
|
|
|
| class FileManager: |
| def __init__(self, base_url: str, expires_minutes: int = 60): |
| self.data_file = "collector_data.json" |
| self.base_url = base_url.rstrip("/") |
| self.expires_minutes = expires_minutes |
| os.makedirs("fl", exist_ok=True) |
| if not os.path.exists(self.data_file): |
| self._write_json() |
| else: |
| self.check_files() |
|
|
| |
| def _format_path(self, path: str) -> str: |
| p = re.sub(r'\s+', '-', path) |
| p = re.sub(r'[!?#*Ççêwrong_char]', '', p) |
| return p |
|
|
| def _read_json(self) -> list: |
| with open(self.data_file, "r") as f: |
| return json.load(f) |
|
|
| def _write_json(self, data: list = None): |
| with open(self.data_file, "w") as f: |
| json.dump(data or [], f, indent=2) |
|
|
| |
| def start(self, gap_minutes: int = 5): |
| interval(self.check_files, gap_minutes * 60) |
|
|
| def check_files(self): |
| data = self._read_json() |
| now = time.time() |
| keep = [] |
| for entry in data: |
| if entry["expiresAt"] < now: |
| try: |
| os.remove(entry["path"]) |
| except Exception as e: |
| print(f"[cleanup] {e}") |
| else: |
| keep.append(entry) |
| self._write_json(keep) |
|
|
| |
| def _revalidate(self, path: str): |
| data = self._read_json() |
| for entry in data: |
| if entry["path"] == path: |
| entry["expiresAt"] = time.time() + self.expires_minutes * 60 |
| self._write_json(data) |
|
|
| def _add_path(self, path: str) -> dict: |
| new_path = self._format_path(path) |
| if path != new_path: |
| os.rename(path, new_path) |
| data = self._read_json() |
| data = [e for e in data if e["path"] != new_path] |
| entry = { |
| "path": new_path, |
| "filename": new_path.split("/")[-1], |
| "id": str(uuid4()), |
| "expiresAt": time.time() + self.expires_minutes * 60, |
| } |
| data.append(entry) |
| self._write_json(data) |
| return entry |
|
|
| |
| def save_file(self, data: bytes, filename: str) -> dict: |
| filename = self._format_path(filename) |
| file_path = f"fl/{uuid4()}_{filename}" |
| with open(file_path, "wb") as f: |
| f.write(data) |
| return self._add_path(file_path) |
|
|
| def get_url(self, entry: dict) -> str: |
| return f"{self.base_url}/file=./{entry['path']}" |
|
|
| def get_entry_by_id(self, uid: str) -> dict | None: |
| for entry in self._read_json(): |
| if entry["id"] == uid: |
| self._revalidate(entry["path"]) |
| return entry |
| return None |
|
|