Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, status | |
| from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
| from fastapi.responses import FileResponse | |
| from typing import List, Dict | |
| import os | |
| import shutil | |
| import uuid | |
| from collections import defaultdict | |
| import secrets | |
| from config import DOCS_USERNAME, DOCS_PASSWORD | |
| import subprocess | |
| subprocess.Popen(["python", "new.py"]) | |
| # Initialize security for docs only | |
| security = HTTPBasic() | |
| def verify_credentials(credentials: HTTPBasicCredentials = Depends(security)): | |
| """Verify HTTP Basic Auth credentials for docs only""" | |
| is_username_correct = secrets.compare_digest(credentials.username, DOCS_USERNAME) | |
| is_password_correct = secrets.compare_digest(credentials.password, DOCS_PASSWORD) | |
| if not (is_username_correct and is_password_correct): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid credentials", | |
| headers={"WWW-Authenticate": "Basic"}, | |
| ) | |
| return credentials | |
| # Initialize FastAPI with auth for docs only | |
| app = FastAPI( | |
| title="File Sharing API", | |
| description="API for file sharing service", | |
| version="1.0.0", | |
| docs_url=None, # Disable default docs | |
| redoc_url=None # Disable default redoc | |
| ) | |
| # Create protected docs routes | |
| async def get_documentation(credentials: HTTPBasicCredentials = Depends(verify_credentials)): | |
| from fastapi.openapi.docs import get_swagger_ui_html | |
| return get_swagger_ui_html(openapi_url="/openapi.json", title="API Documentation") | |
| async def get_redoc(credentials: HTTPBasicCredentials = Depends(verify_credentials)): | |
| from fastapi.openapi.docs import get_redoc_html | |
| return get_redoc_html(openapi_url="/openapi.json", title="API Documentation") | |
| async def get_openapi(credentials: HTTPBasicCredentials = Depends(verify_credentials)): | |
| from fastapi.openapi.utils import get_openapi | |
| openapi_schema = get_openapi( | |
| title="File Sharing API", | |
| version="1.0.0", | |
| description="API for file sharing service", | |
| routes=app.routes | |
| ) | |
| return openapi_schema | |
| # Create uploads directory if it doesn't exist | |
| UPLOAD_DIR = "uploads" | |
| if not os.path.exists(UPLOAD_DIR): | |
| os.makedirs(UPLOAD_DIR) | |
| # Store file mappings with user info: {unique_code: {"filename": filename, "user_id": user_id}} | |
| file_codes: Dict[str, dict] = {} | |
| # Reverse mapping: {filename: {"code": code, "user_id": user_id}} | |
| filename_codes: Dict[str, dict] = {} | |
| # Add this class after other class definitions | |
| class UserStats: | |
| def __init__(self): | |
| self.upload_counts = defaultdict(int) | |
| self.download_counts = defaultdict(int) | |
| self.total_bytes_uploaded = defaultdict(int) | |
| self.total_bytes_downloaded = defaultdict(int) | |
| self.last_activity = defaultdict(str) | |
| def log_upload(self, user_id: str, file_size: int, filename: str): | |
| self.upload_counts[user_id] += 1 | |
| self.total_bytes_uploaded[user_id] += file_size | |
| self.last_activity[user_id] = f"Uploaded: {filename}" | |
| def log_download(self, user_id: str, file_size: int, filename: str): | |
| self.download_counts[user_id] += 1 | |
| self.total_bytes_downloaded[user_id] += file_size | |
| self.last_activity[user_id] = f"Downloaded: {filename}" | |
| def get_user_stats(self, user_id: str) -> dict: | |
| return { | |
| "uploads": self.upload_counts[user_id], | |
| "downloads": self.download_counts[user_id], | |
| "bytes_uploaded": self.total_bytes_uploaded[user_id], | |
| "bytes_downloaded": self.total_bytes_downloaded[user_id], | |
| "last_activity": self.last_activity.get(user_id, "No activity") | |
| } | |
| # Initialize stats | |
| user_stats = UserStats() | |
| async def upload_file(file: UploadFile = File(...), user_id: str = None): | |
| try: | |
| # Generate unique code | |
| unique_code = str(uuid.uuid4())[:8] | |
| # Save the uploaded file | |
| file_path = os.path.join(UPLOAD_DIR, file.filename) | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # Store the mapping with user info | |
| file_codes[unique_code] = {"filename": file.filename, "user_id": user_id} | |
| filename_codes[file.filename] = {"code": unique_code, "user_id": user_id} | |
| # Log the upload | |
| file_size = os.path.getsize(file_path) | |
| user_stats.log_upload(user_id, file_size, file.filename) | |
| return { | |
| "filename": file.filename, | |
| "access_code": unique_code, | |
| "message": "File uploaded successfully" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def upload_multiple_files(files: List[UploadFile] = File(...)): | |
| try: | |
| uploaded_files = [] | |
| for file in files: | |
| # Generate unique code for each file | |
| unique_code = str(uuid.uuid4())[:8] | |
| file_path = os.path.join(UPLOAD_DIR, file.filename) | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # Store the mapping | |
| file_codes[unique_code] = {"filename": file.filename, "user_id": None} | |
| filename_codes[file.filename] = {"code": unique_code, "user_id": None} | |
| uploaded_files.append({ | |
| "filename": file.filename, | |
| "access_code": unique_code | |
| }) | |
| return {"files": uploaded_files, "message": "Files uploaded successfully"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_files(user_id: str): | |
| """List files for a specific user""" | |
| try: | |
| files = [] | |
| for filename in os.listdir(UPLOAD_DIR): | |
| file_info = filename_codes.get(filename, {}) | |
| if file_info.get("user_id") == user_id: | |
| files.append({ | |
| "filename": filename, | |
| "access_code": file_info["code"] | |
| }) | |
| return {"files": files} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def download_file(access_code: str): | |
| try: | |
| if access_code not in file_codes: | |
| raise HTTPException(status_code=404, detail="Invalid access code") | |
| filename = file_codes[access_code]["filename"] | |
| file_path = os.path.join(UPLOAD_DIR, filename) | |
| if os.path.exists(file_path): | |
| return FileResponse(file_path, filename=filename) | |
| else: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_file(access_code: str, user_id: str = None): | |
| try: | |
| if access_code not in file_codes: | |
| raise HTTPException(status_code=404, detail="Invalid access code") | |
| # Check if user owns the file | |
| if user_id and file_codes[access_code]["user_id"] != user_id: | |
| raise HTTPException(status_code=403, detail="You don't have permission to delete this file") | |
| filename = file_codes[access_code]["filename"] | |
| file_path = os.path.join(UPLOAD_DIR, filename) | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| # Remove from both mappings | |
| del filename_codes[filename] | |
| del file_codes[access_code] | |
| return {"message": f"File {filename} deleted successfully"} | |
| else: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def direct_download(access_code: str): | |
| """Direct download route using just the access code in the URL""" | |
| try: | |
| if access_code not in file_codes: | |
| # If not a valid access code, return 404 or redirect to home | |
| raise HTTPException(status_code=404, detail="Invalid access code") | |
| filename = file_codes[access_code]["filename"] | |
| file_path = os.path.join(UPLOAD_DIR, filename) | |
| if os.path.exists(file_path): | |
| # Determine content disposition based on file type | |
| content_disposition = "inline" # For viewing in browser | |
| # For certain file types, force download | |
| if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.pdf', '.txt')): | |
| content_disposition = "attachment" | |
| return FileResponse( | |
| file_path, | |
| filename=filename, | |
| headers={"Content-Disposition": f"{content_disposition}; filename={filename}"} | |
| ) | |
| else: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Add new endpoint for stats | |
| async def get_user_stats(user_id: str): | |
| try: | |
| stats = user_stats.get_user_stats(user_id) | |
| return stats | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def log_download(user_id: str, file_size: int, filename: str): | |
| try: | |
| user_stats.log_download(user_id, file_size, filename) | |
| return {"message": "Download logged successfully"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |
| #uvicorn.run(app, host="127.0.0.1", port=7860) |