testingapi / main.py
Charan5775's picture
Update main.py
5e9e9cd verified
from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.responses import FileResponse
from typing import List, Dict
import os
import shutil
import uuid
from collections import defaultdict
import secrets
from config import DOCS_USERNAME, DOCS_PASSWORD
import subprocess
subprocess.Popen(["python", "new.py"])
# Initialize security for docs only
security = HTTPBasic()
def verify_credentials(credentials: HTTPBasicCredentials = Depends(security)):
"""Verify HTTP Basic Auth credentials for docs only"""
is_username_correct = secrets.compare_digest(credentials.username, DOCS_USERNAME)
is_password_correct = secrets.compare_digest(credentials.password, DOCS_PASSWORD)
if not (is_username_correct and is_password_correct):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
return credentials
# Initialize FastAPI with auth for docs only
app = FastAPI(
title="File Sharing API",
description="API for file sharing service",
version="1.0.0",
docs_url=None, # Disable default docs
redoc_url=None # Disable default redoc
)
# Create protected docs routes
@app.get("/docs", include_in_schema=False)
async def get_documentation(credentials: HTTPBasicCredentials = Depends(verify_credentials)):
from fastapi.openapi.docs import get_swagger_ui_html
return get_swagger_ui_html(openapi_url="/openapi.json", title="API Documentation")
@app.get("/redoc", include_in_schema=False)
async def get_redoc(credentials: HTTPBasicCredentials = Depends(verify_credentials)):
from fastapi.openapi.docs import get_redoc_html
return get_redoc_html(openapi_url="/openapi.json", title="API Documentation")
@app.get("/openapi.json", include_in_schema=False)
async def get_openapi(credentials: HTTPBasicCredentials = Depends(verify_credentials)):
from fastapi.openapi.utils import get_openapi
openapi_schema = get_openapi(
title="File Sharing API",
version="1.0.0",
description="API for file sharing service",
routes=app.routes
)
return openapi_schema
# Create uploads directory if it doesn't exist
UPLOAD_DIR = "uploads"
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
# Store file mappings with user info: {unique_code: {"filename": filename, "user_id": user_id}}
file_codes: Dict[str, dict] = {}
# Reverse mapping: {filename: {"code": code, "user_id": user_id}}
filename_codes: Dict[str, dict] = {}
# Add this class after other class definitions
class UserStats:
def __init__(self):
self.upload_counts = defaultdict(int)
self.download_counts = defaultdict(int)
self.total_bytes_uploaded = defaultdict(int)
self.total_bytes_downloaded = defaultdict(int)
self.last_activity = defaultdict(str)
def log_upload(self, user_id: str, file_size: int, filename: str):
self.upload_counts[user_id] += 1
self.total_bytes_uploaded[user_id] += file_size
self.last_activity[user_id] = f"Uploaded: {filename}"
def log_download(self, user_id: str, file_size: int, filename: str):
self.download_counts[user_id] += 1
self.total_bytes_downloaded[user_id] += file_size
self.last_activity[user_id] = f"Downloaded: {filename}"
def get_user_stats(self, user_id: str) -> dict:
return {
"uploads": self.upload_counts[user_id],
"downloads": self.download_counts[user_id],
"bytes_uploaded": self.total_bytes_uploaded[user_id],
"bytes_downloaded": self.total_bytes_downloaded[user_id],
"last_activity": self.last_activity.get(user_id, "No activity")
}
# Initialize stats
user_stats = UserStats()
@app.post("/upload/")
async def upload_file(file: UploadFile = File(...), user_id: str = None):
try:
# Generate unique code
unique_code = str(uuid.uuid4())[:8]
# Save the uploaded file
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Store the mapping with user info
file_codes[unique_code] = {"filename": file.filename, "user_id": user_id}
filename_codes[file.filename] = {"code": unique_code, "user_id": user_id}
# Log the upload
file_size = os.path.getsize(file_path)
user_stats.log_upload(user_id, file_size, file.filename)
return {
"filename": file.filename,
"access_code": unique_code,
"message": "File uploaded successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/upload-multiple/")
async def upload_multiple_files(files: List[UploadFile] = File(...)):
try:
uploaded_files = []
for file in files:
# Generate unique code for each file
unique_code = str(uuid.uuid4())[:8]
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Store the mapping
file_codes[unique_code] = {"filename": file.filename, "user_id": None}
filename_codes[file.filename] = {"code": unique_code, "user_id": None}
uploaded_files.append({
"filename": file.filename,
"access_code": unique_code
})
return {"files": uploaded_files, "message": "Files uploaded successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/files/{user_id}")
async def list_files(user_id: str):
"""List files for a specific user"""
try:
files = []
for filename in os.listdir(UPLOAD_DIR):
file_info = filename_codes.get(filename, {})
if file_info.get("user_id") == user_id:
files.append({
"filename": filename,
"access_code": file_info["code"]
})
return {"files": files}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/download/{access_code}")
async def download_file(access_code: str):
try:
if access_code not in file_codes:
raise HTTPException(status_code=404, detail="Invalid access code")
filename = file_codes[access_code]["filename"]
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
return FileResponse(file_path, filename=filename)
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/delete/{access_code}")
async def delete_file(access_code: str, user_id: str = None):
try:
if access_code not in file_codes:
raise HTTPException(status_code=404, detail="Invalid access code")
# Check if user owns the file
if user_id and file_codes[access_code]["user_id"] != user_id:
raise HTTPException(status_code=403, detail="You don't have permission to delete this file")
filename = file_codes[access_code]["filename"]
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
os.remove(file_path)
# Remove from both mappings
del filename_codes[filename]
del file_codes[access_code]
return {"message": f"File {filename} deleted successfully"}
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/{access_code}")
async def direct_download(access_code: str):
"""Direct download route using just the access code in the URL"""
try:
if access_code not in file_codes:
# If not a valid access code, return 404 or redirect to home
raise HTTPException(status_code=404, detail="Invalid access code")
filename = file_codes[access_code]["filename"]
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
# Determine content disposition based on file type
content_disposition = "inline" # For viewing in browser
# For certain file types, force download
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.pdf', '.txt')):
content_disposition = "attachment"
return FileResponse(
file_path,
filename=filename,
headers={"Content-Disposition": f"{content_disposition}; filename={filename}"}
)
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Add new endpoint for stats
@app.get("/stats/{user_id}")
async def get_user_stats(user_id: str):
try:
stats = user_stats.get_user_stats(user_id)
return stats
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/log_download")
async def log_download(user_id: str, file_size: int, filename: str):
try:
user_stats.log_download(user_id, file_size, filename)
return {"message": "Download logged successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
#uvicorn.run(app, host="127.0.0.1", port=7860)