media2url / fileManager.py
no-name-here's picture
Upload 5 files
511fc24 verified
from threading import Thread
from copy import deepcopy
from uuid import uuid4
from time import sleep
import json
import time
import re
import os
def syncmethod(func):
def function(*args, **kwargs):
th = Thread(target=func, args=args, kwargs=kwargs)
th.start()
return function
@syncmethod
def interval(fc, t):
while True:
sleep(t)
fc()
class FileManager:
def __init__(self, base_url: str, expires_minutes: int = 60):
self.data_file = "collector_data.json"
self.base_url = base_url.rstrip("/")
self.expires_minutes = expires_minutes
os.makedirs("fl", exist_ok=True)
if not os.path.exists(self.data_file):
self._write_json()
else:
self.check_files()
# ------------------------------------------------------------------
def _format_path(self, path: str) -> str:
p = re.sub(r'\s+', '-', path)
p = re.sub(r'[!?#*Ççêwrong_char]', '', p)
return p
def _read_json(self) -> list:
with open(self.data_file, "r") as f:
return json.load(f)
def _write_json(self, data: list = None):
with open(self.data_file, "w") as f:
json.dump(data or [], f, indent=2)
# ------------------------------------------------------------------
def start(self, gap_minutes: int = 5):
interval(self.check_files, gap_minutes * 60)
def check_files(self):
data = self._read_json()
now = time.time()
keep = []
for entry in data:
if entry["expiresAt"] < now:
try:
os.remove(entry["path"])
except Exception as e:
print(f"[cleanup] {e}")
else:
keep.append(entry)
self._write_json(keep)
# ------------------------------------------------------------------
def _revalidate(self, path: str):
data = self._read_json()
for entry in data:
if entry["path"] == path:
entry["expiresAt"] = time.time() + self.expires_minutes * 60
self._write_json(data)
def _add_path(self, path: str) -> dict:
new_path = self._format_path(path)
if path != new_path:
os.rename(path, new_path)
data = self._read_json()
data = [e for e in data if e["path"] != new_path]
entry = {
"path": new_path,
"filename": new_path.split("/")[-1],
"id": str(uuid4()),
"expiresAt": time.time() + self.expires_minutes * 60,
}
data.append(entry)
self._write_json(data)
return entry
# ------------------------------------------------------------------
def save_file(self, data: bytes, filename: str) -> dict:
filename = self._format_path(filename)
file_path = f"fl/{uuid4()}_{filename}"
with open(file_path, "wb") as f:
f.write(data)
return self._add_path(file_path)
def get_url(self, entry: dict) -> str:
return f"{self.base_url}/file=./{entry['path']}"
def get_entry_by_id(self, uid: str) -> dict | None:
for entry in self._read_json():
if entry["id"] == uid:
self._revalidate(entry["path"])
return entry
return None