triflix commited on
Commit
42d87fb
·
verified ·
1 Parent(s): ffc33c7

Create main.py

Browse files
Files changed (1) hide show
  1. main.py +212 -0
main.py ADDED
@@ -0,0 +1,212 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import shutil
3
+ import time
4
+ import glob
5
+ import asyncio
6
+ import mimetypes
7
+ import pathlib
8
+ import re
9
+ from datetime import datetime, timedelta
10
+
11
+ from fastapi import FastAPI, UploadFile, File, Form, Request, HTTPException
12
+ from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, RedirectResponse
13
+ from fastapi.staticfiles import StaticFiles
14
+ from fastapi.templating import Jinja2Templates
15
+
16
+ # CONFIG
17
+ API_VERSION = "1.0"
18
+ TMP_DIR = os.environ.get("TMP_DIR", "/tmp") # container /tmp by default
19
+ MAX_BYTES = 2 * 1024 * 1024 * 1024 # 2GB
20
+ CLEANUP_INTERVAL_SECONDS = 600 # run cleanup every 10 minutes
21
+ EXPIRE_SECONDS = 3 * 3600 # 3 hours
22
+ CHUNK_SIZE = 1024 * 1024 # 1MB chunks
23
+
24
+ # Blacklist of extensions (lowercase, without dot)
25
+ DISALLOWED_EXT = {
26
+ "bat", "exe", "cmd", "sh", "msi", "ps1", "com", "scr"
27
+ }
28
+
29
+ # ensure tmp dir exists
30
+ os.makedirs(TMP_DIR, exist_ok=True)
31
+
32
+ app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None) # restrict /docs
33
+
34
+ templates = Jinja2Templates(directory="templates")
35
+ app.mount("/static", StaticFiles(directory="static"), name="static")
36
+
37
+
38
+ def sanitize_slug(s: str) -> str:
39
+ s = re.sub(r"[^\w\-\.]", "", s) # allow letters, numbers, underscore, hyphen, dot
40
+ return s[:128]
41
+
42
+
43
+ def file_exists_for_slug(slug: str):
44
+ pattern = os.path.join(TMP_DIR, f"{slug}.*")
45
+ matches = glob.glob(pattern)
46
+ return matches[0] if matches else None
47
+
48
+
49
+ def make_file_path(slug: str, filename: str):
50
+ _, ext = os.path.splitext(filename)
51
+ ext = ext.lower()
52
+ return os.path.join(TMP_DIR, f"{slug}{ext}")
53
+
54
+
55
+ def gen_slug(length=8):
56
+ import secrets, string
57
+ alphabet = string.ascii_lowercase + string.digits
58
+ return ''.join(secrets.choice(alphabet) for _ in range(length))
59
+
60
+
61
+ async def save_upload_to_tmp(upload_file: UploadFile, dest_path: str):
62
+ total = 0
63
+ # write in chunks
64
+ with open(dest_path, "wb") as f:
65
+ while True:
66
+ chunk = await upload_file.read(CHUNK_SIZE)
67
+ if not chunk:
68
+ break
69
+ f.write(chunk)
70
+ total += len(chunk)
71
+ if total > MAX_BYTES:
72
+ # cleanup partial file
73
+ f.close()
74
+ try:
75
+ os.remove(dest_path)
76
+ except Exception:
77
+ pass
78
+ raise HTTPException(status_code=413, detail="File exceeds max size (2GB).")
79
+ return total
80
+
81
+
82
+ @app.on_event("startup")
83
+ async def startup_event():
84
+ # launch cleanup background task
85
+ loop = asyncio.get_event_loop()
86
+ loop.create_task(cleaner_task())
87
+
88
+
89
+ async def cleaner_task():
90
+ """
91
+ Periodically remove files older than EXPIRE_SECONDS to keep /tmp tidy.
92
+ """
93
+ while True:
94
+ try:
95
+ now = time.time()
96
+ for path in glob.glob(os.path.join(TMP_DIR, "*")):
97
+ try:
98
+ # only remove files (and ignore directories)
99
+ if os.path.isfile(path):
100
+ mtime = os.path.getmtime(path)
101
+ if now - mtime > EXPIRE_SECONDS:
102
+ os.remove(path)
103
+ except Exception:
104
+ continue
105
+ except Exception:
106
+ pass
107
+ await asyncio.sleep(CLEANUP_INTERVAL_SECONDS)
108
+
109
+
110
+ @app.get("/", response_class=HTMLResponse)
111
+ async def index(request: Request):
112
+ return templates.TemplateResponse("index.html", {
113
+ "request": request,
114
+ "api_version": API_VERSION,
115
+ "max_bytes": MAX_BYTES,
116
+ "expire_seconds": EXPIRE_SECONDS
117
+ })
118
+
119
+
120
+ @app.post("/api/upload")
121
+ async def api_upload(file: UploadFile = File(...), custom_slug: str = Form(None)):
122
+ filename = file.filename or "upload"
123
+ _, ext = os.path.splitext(filename)
124
+ ext_l = ext.lower().lstrip(".")
125
+ if ext_l in DISALLOWED_EXT:
126
+ raise HTTPException(status_code=400, detail=f"Disallowed file type: {ext}")
127
+
128
+ # choose slug
129
+ if custom_slug:
130
+ slug = sanitize_slug(custom_slug)
131
+ if not slug:
132
+ raise HTTPException(status_code=400, detail="Invalid custom slug.")
133
+ if file_exists_for_slug(slug):
134
+ raise HTTPException(status_code=409, detail="Slug already exists.")
135
+ else:
136
+ # generate until free
137
+ for _ in range(8):
138
+ slug = gen_slug(8)
139
+ if not file_exists_for_slug(slug):
140
+ break
141
+ else:
142
+ # fallback long slug
143
+ slug = gen_slug(16)
144
+
145
+ dest = make_file_path(slug, filename)
146
+
147
+ # save file (enforces max size)
148
+ try:
149
+ bytes_written = await save_upload_to_tmp(file, dest)
150
+ except HTTPException as e:
151
+ raise e
152
+ except Exception as e:
153
+ # cleanup if any partial file
154
+ try:
155
+ if os.path.exists(dest):
156
+ os.remove(dest)
157
+ except Exception:
158
+ pass
159
+ raise HTTPException(status_code=500, detail="Failed to save file.")
160
+
161
+ # set mtime so cleanup knows created time (already set)
162
+ expires_at = datetime.utcnow() + timedelta(seconds=EXPIRE_SECONDS)
163
+ url = f"/f/{slug}"
164
+ return JSONResponse({
165
+ "slug": slug,
166
+ "url": url,
167
+ "filename": filename,
168
+ "size": bytes_written,
169
+ "expires_at": int(expires_at.timestamp())
170
+ })
171
+
172
+
173
+ @app.get("/f/{slug}")
174
+ async def serve_file(slug: str, dl: int = 0):
175
+ # find file by slug
176
+ path = file_exists_for_slug(slug)
177
+ if not path:
178
+ raise HTTPException(status_code=404, detail="File not found or expired.")
179
+ # serve with correct media_type
180
+ mime_type, _ = mimetypes.guess_type(path)
181
+ headers = {}
182
+ filename = os.path.basename(path)
183
+ if dl:
184
+ # force download
185
+ return FileResponse(path, media_type=mime_type or "application/octet-stream",
186
+ filename=filename)
187
+ # decide inline vs attachment by mime
188
+ inline_media = {"image", "video", "audio", "text", "application/pdf"}
189
+ mt = mime_type or ""
190
+ if any(mt.startswith(p) for p in inline_media) or mt == "application/pdf":
191
+ return FileResponse(path, media_type=mime_type or "application/octet-stream",
192
+ filename=filename)
193
+ else:
194
+ return FileResponse(path, media_type=mime_type or "application/octet-stream",
195
+ filename=filename)
196
+
197
+
198
+ @app.get("/api/info")
199
+ async def api_info():
200
+ curl_example = (
201
+ "curl -X POST -H \"Accept: application/json\" "
202
+ "-F \"file=@/path/to/file\" "
203
+ "https://your-space-url/f --output -"
204
+ )
205
+ return {
206
+ "version": API_VERSION,
207
+ "upload_endpoint": "/api/upload",
208
+ "file_endpoint_example": "/f/<slug>",
209
+ "max_size_bytes": MAX_BYTES,
210
+ "expiry_seconds": EXPIRE_SECONDS,
211
+ "curl_example": curl_example,
212
+ }