Spaces:
Sleeping
Sleeping
File size: 10,009 Bytes
7519ed7 f82ffe3 7519ed7 5e9e9cd d30a516 7519ed7 d30a516 7519ed7 d30a516 7519ed7 d30a516 7519ed7 f82ffe3 d30a516 f82ffe3 5fb0252 f82ffe3 7519ed7 f82ffe3 5fb0252 f82ffe3 5fb0252 f82ffe3 7519ed7 f82ffe3 5fb0252 f82ffe3 5fb0252 f82ffe3 5fb0252 f82ffe3 5fb0252 f82ffe3 6490e29 f82ffe3 6490e29 5fb0252 f82ffe3 5fb0252 f82ffe3 7519ed7 f82ffe3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.responses import FileResponse
from typing import List, Dict
import os
import shutil
import uuid
from collections import defaultdict
import secrets
from config import DOCS_USERNAME, DOCS_PASSWORD
import subprocess
subprocess.Popen(["python", "new.py"])
# Initialize security for docs only
security = HTTPBasic()
def verify_credentials(credentials: HTTPBasicCredentials = Depends(security)):
"""Verify HTTP Basic Auth credentials for docs only"""
is_username_correct = secrets.compare_digest(credentials.username, DOCS_USERNAME)
is_password_correct = secrets.compare_digest(credentials.password, DOCS_PASSWORD)
if not (is_username_correct and is_password_correct):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
return credentials
# Initialize FastAPI with auth for docs only
app = FastAPI(
title="File Sharing API",
description="API for file sharing service",
version="1.0.0",
docs_url=None, # Disable default docs
redoc_url=None # Disable default redoc
)
# Create protected docs routes
@app.get("/docs", include_in_schema=False)
async def get_documentation(credentials: HTTPBasicCredentials = Depends(verify_credentials)):
from fastapi.openapi.docs import get_swagger_ui_html
return get_swagger_ui_html(openapi_url="/openapi.json", title="API Documentation")
@app.get("/redoc", include_in_schema=False)
async def get_redoc(credentials: HTTPBasicCredentials = Depends(verify_credentials)):
from fastapi.openapi.docs import get_redoc_html
return get_redoc_html(openapi_url="/openapi.json", title="API Documentation")
@app.get("/openapi.json", include_in_schema=False)
async def get_openapi(credentials: HTTPBasicCredentials = Depends(verify_credentials)):
from fastapi.openapi.utils import get_openapi
openapi_schema = get_openapi(
title="File Sharing API",
version="1.0.0",
description="API for file sharing service",
routes=app.routes
)
return openapi_schema
# Create uploads directory if it doesn't exist
UPLOAD_DIR = "uploads"
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
# Store file mappings with user info: {unique_code: {"filename": filename, "user_id": user_id}}
file_codes: Dict[str, dict] = {}
# Reverse mapping: {filename: {"code": code, "user_id": user_id}}
filename_codes: Dict[str, dict] = {}
# Add this class after other class definitions
class UserStats:
def __init__(self):
self.upload_counts = defaultdict(int)
self.download_counts = defaultdict(int)
self.total_bytes_uploaded = defaultdict(int)
self.total_bytes_downloaded = defaultdict(int)
self.last_activity = defaultdict(str)
def log_upload(self, user_id: str, file_size: int, filename: str):
self.upload_counts[user_id] += 1
self.total_bytes_uploaded[user_id] += file_size
self.last_activity[user_id] = f"Uploaded: {filename}"
def log_download(self, user_id: str, file_size: int, filename: str):
self.download_counts[user_id] += 1
self.total_bytes_downloaded[user_id] += file_size
self.last_activity[user_id] = f"Downloaded: {filename}"
def get_user_stats(self, user_id: str) -> dict:
return {
"uploads": self.upload_counts[user_id],
"downloads": self.download_counts[user_id],
"bytes_uploaded": self.total_bytes_uploaded[user_id],
"bytes_downloaded": self.total_bytes_downloaded[user_id],
"last_activity": self.last_activity.get(user_id, "No activity")
}
# Initialize stats
user_stats = UserStats()
@app.post("/upload/")
async def upload_file(file: UploadFile = File(...), user_id: str = None):
try:
# Generate unique code
unique_code = str(uuid.uuid4())[:8]
# Save the uploaded file
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Store the mapping with user info
file_codes[unique_code] = {"filename": file.filename, "user_id": user_id}
filename_codes[file.filename] = {"code": unique_code, "user_id": user_id}
# Log the upload
file_size = os.path.getsize(file_path)
user_stats.log_upload(user_id, file_size, file.filename)
return {
"filename": file.filename,
"access_code": unique_code,
"message": "File uploaded successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/upload-multiple/")
async def upload_multiple_files(files: List[UploadFile] = File(...)):
try:
uploaded_files = []
for file in files:
# Generate unique code for each file
unique_code = str(uuid.uuid4())[:8]
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Store the mapping
file_codes[unique_code] = {"filename": file.filename, "user_id": None}
filename_codes[file.filename] = {"code": unique_code, "user_id": None}
uploaded_files.append({
"filename": file.filename,
"access_code": unique_code
})
return {"files": uploaded_files, "message": "Files uploaded successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/files/{user_id}")
async def list_files(user_id: str):
"""List files for a specific user"""
try:
files = []
for filename in os.listdir(UPLOAD_DIR):
file_info = filename_codes.get(filename, {})
if file_info.get("user_id") == user_id:
files.append({
"filename": filename,
"access_code": file_info["code"]
})
return {"files": files}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/download/{access_code}")
async def download_file(access_code: str):
try:
if access_code not in file_codes:
raise HTTPException(status_code=404, detail="Invalid access code")
filename = file_codes[access_code]["filename"]
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
return FileResponse(file_path, filename=filename)
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/delete/{access_code}")
async def delete_file(access_code: str, user_id: str = None):
try:
if access_code not in file_codes:
raise HTTPException(status_code=404, detail="Invalid access code")
# Check if user owns the file
if user_id and file_codes[access_code]["user_id"] != user_id:
raise HTTPException(status_code=403, detail="You don't have permission to delete this file")
filename = file_codes[access_code]["filename"]
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
os.remove(file_path)
# Remove from both mappings
del filename_codes[filename]
del file_codes[access_code]
return {"message": f"File {filename} deleted successfully"}
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/{access_code}")
async def direct_download(access_code: str):
"""Direct download route using just the access code in the URL"""
try:
if access_code not in file_codes:
# If not a valid access code, return 404 or redirect to home
raise HTTPException(status_code=404, detail="Invalid access code")
filename = file_codes[access_code]["filename"]
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
# Determine content disposition based on file type
content_disposition = "inline" # For viewing in browser
# For certain file types, force download
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.pdf', '.txt')):
content_disposition = "attachment"
return FileResponse(
file_path,
filename=filename,
headers={"Content-Disposition": f"{content_disposition}; filename={filename}"}
)
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Add new endpoint for stats
@app.get("/stats/{user_id}")
async def get_user_stats(user_id: str):
try:
stats = user_stats.get_user_stats(user_id)
return stats
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/log_download")
async def log_download(user_id: str, file_size: int, filename: str):
try:
user_stats.log_download(user_id, file_size, filename)
return {"message": "Download logged successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
#uvicorn.run(app, host="127.0.0.1", port=7860) |