testingapi / main.py
Charan5775's picture
Update main.py
5fb0252 verified
raw
history blame
5.71 kB
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import FileResponse
from typing import List, Dict
import os
import shutil
import uuid
app = FastAPI()
# Create uploads directory if it doesn't exist
UPLOAD_DIR = "uploads"
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
# Store file mappings with user info: {unique_code: {"filename": filename, "user_id": user_id}}
file_codes: Dict[str, dict] = {}
# Reverse mapping: {filename: {"code": code, "user_id": user_id}}
filename_codes: Dict[str, dict] = {}
@app.post("/upload/")
async def upload_file(file: UploadFile = File(...), user_id: str = None):
try:
# Generate unique code
unique_code = str(uuid.uuid4())[:8]
# Save the uploaded file
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Store the mapping with user info
file_codes[unique_code] = {"filename": file.filename, "user_id": user_id}
filename_codes[file.filename] = {"code": unique_code, "user_id": user_id}
return {
"filename": file.filename,
"access_code": unique_code,
"message": "File uploaded successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/upload-multiple/")
async def upload_multiple_files(files: List[UploadFile] = File(...)):
try:
uploaded_files = []
for file in files:
# Generate unique code for each file
unique_code = str(uuid.uuid4())[:8]
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Store the mapping
file_codes[unique_code] = {"filename": file.filename, "user_id": None}
filename_codes[file.filename] = {"code": unique_code, "user_id": None}
uploaded_files.append({
"filename": file.filename,
"access_code": unique_code
})
return {"files": uploaded_files, "message": "Files uploaded successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/files/{user_id}")
async def list_files(user_id: str):
"""List files for a specific user"""
try:
files = []
for filename in os.listdir(UPLOAD_DIR):
file_info = filename_codes.get(filename, {})
if file_info.get("user_id") == user_id:
files.append({
"filename": filename,
"access_code": file_info["code"]
})
return {"files": files}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/download/{access_code}")
async def download_file(access_code: str):
try:
if access_code not in file_codes:
raise HTTPException(status_code=404, detail="Invalid access code")
filename = file_codes[access_code]["filename"]
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
return FileResponse(file_path, filename=filename)
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/delete/{access_code}")
async def delete_file(access_code: str):
try:
if access_code not in file_codes:
raise HTTPException(status_code=404, detail="Invalid access code")
filename = file_codes[access_code]["filename"]
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
os.remove(file_path)
# Remove from both mappings
del filename_codes[filename]
del file_codes[access_code]
return {"message": f"File {filename} deleted successfully"}
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/{access_code}")
async def direct_download(access_code: str):
"""Direct download route using just the access code in the URL"""
try:
if access_code not in file_codes:
# If not a valid access code, return 404 or redirect to home
raise HTTPException(status_code=404, detail="Invalid access code")
filename = file_codes[access_code]["filename"]
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
# Determine content disposition based on file type
content_disposition = "inline" # For viewing in browser
# For certain file types, force download
if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.pdf', '.txt')):
content_disposition = "attachment"
return FileResponse(
file_path,
filename=filename,
headers={"Content-Disposition": f"{content_disposition}; filename={filename}"}
)
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
#uvicorn.run(app, host="127.0.0.1", port=7860)