Charan5775 commited on
Commit
f82ffe3
·
verified ·
1 Parent(s): 656ea3c

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +148 -119
main.py CHANGED
@@ -1,119 +1,148 @@
1
- from fastapi import FastAPI, UploadFile, File, HTTPException
2
- from fastapi.responses import FileResponse
3
- from typing import List, Dict
4
- import os
5
- import shutil
6
- import uuid
7
-
8
- app = FastAPI()
9
-
10
- # Create uploads directory if it doesn't exist
11
- UPLOAD_DIR = "uploads"
12
- if not os.path.exists(UPLOAD_DIR):
13
- os.makedirs(UPLOAD_DIR)
14
-
15
- # Store file mappings: {unique_code: filename}
16
- file_codes: Dict[str, str] = {}
17
- # Reverse mapping for easy lookup: {filename: unique_code}
18
- filename_codes: Dict[str, str] = {}
19
-
20
- @app.post("/upload/")
21
- async def upload_file(file: UploadFile = File(...)):
22
- try:
23
- # Generate unique code
24
- unique_code = str(uuid.uuid4())[:8]
25
-
26
- # Save the uploaded file
27
- file_path = os.path.join(UPLOAD_DIR, file.filename)
28
- with open(file_path, "wb") as buffer:
29
- shutil.copyfileobj(file.file, buffer)
30
-
31
- # Store the mapping
32
- file_codes[unique_code] = file.filename
33
- filename_codes[file.filename] = unique_code
34
-
35
- return {
36
- "filename": file.filename,
37
- "access_code": unique_code,
38
- "message": "File uploaded successfully"
39
- }
40
- except Exception as e:
41
- raise HTTPException(status_code=500, detail=str(e))
42
-
43
- @app.post("/upload-multiple/")
44
- async def upload_multiple_files(files: List[UploadFile] = File(...)):
45
- try:
46
- uploaded_files = []
47
- for file in files:
48
- # Generate unique code for each file
49
- unique_code = str(uuid.uuid4())[:8]
50
-
51
- file_path = os.path.join(UPLOAD_DIR, file.filename)
52
- with open(file_path, "wb") as buffer:
53
- shutil.copyfileobj(file.file, buffer)
54
-
55
- # Store the mapping
56
- file_codes[unique_code] = file.filename
57
- filename_codes[file.filename] = unique_code
58
-
59
- uploaded_files.append({
60
- "filename": file.filename,
61
- "access_code": unique_code
62
- })
63
- return {"files": uploaded_files, "message": "Files uploaded successfully"}
64
- except Exception as e:
65
- raise HTTPException(status_code=500, detail=str(e))
66
-
67
- @app.get("/files/")
68
- async def list_files():
69
- try:
70
- files = []
71
- for filename in os.listdir(UPLOAD_DIR):
72
- code = filename_codes.get(filename, "no_code")
73
- files.append({
74
- "filename": filename,
75
- "access_code": code
76
- })
77
- return {"files": files}
78
- except Exception as e:
79
- raise HTTPException(status_code=500, detail=str(e))
80
-
81
- @app.get("/download/{access_code}")
82
- async def download_file(access_code: str):
83
- try:
84
- if access_code not in file_codes:
85
- raise HTTPException(status_code=404, detail="Invalid access code")
86
-
87
- filename = file_codes[access_code]
88
- file_path = os.path.join(UPLOAD_DIR, filename)
89
-
90
- if os.path.exists(file_path):
91
- return FileResponse(file_path, filename=filename)
92
- else:
93
- raise HTTPException(status_code=404, detail="File not found")
94
- except Exception as e:
95
- raise HTTPException(status_code=500, detail=str(e))
96
-
97
- @app.delete("/delete/{access_code}")
98
- async def delete_file(access_code: str):
99
- try:
100
- if access_code not in file_codes:
101
- raise HTTPException(status_code=404, detail="Invalid access code")
102
-
103
- filename = file_codes[access_code]
104
- file_path = os.path.join(UPLOAD_DIR, filename)
105
-
106
- if os.path.exists(file_path):
107
- os.remove(file_path)
108
- # Remove from both mappings
109
- del filename_codes[filename]
110
- del file_codes[access_code]
111
- return {"message": f"File {filename} deleted successfully"}
112
- else:
113
- raise HTTPException(status_code=404, detail="File not found")
114
- except Exception as e:
115
- raise HTTPException(status_code=500, detail=str(e))
116
-
117
- if __name__ == "__main__":
118
- import uvicorn
119
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, UploadFile, File, HTTPException
2
+ from fastapi.responses import FileResponse
3
+ from typing import List, Dict
4
+ import os
5
+ import shutil
6
+ import uuid
7
+
8
+ app = FastAPI()
9
+
10
+ # Create uploads directory if it doesn't exist
11
+ UPLOAD_DIR = "uploads"
12
+ if not os.path.exists(UPLOAD_DIR):
13
+ os.makedirs(UPLOAD_DIR)
14
+
15
+ # Store file mappings: {unique_code: filename}
16
+ file_codes: Dict[str, str] = {}
17
+ # Reverse mapping for easy lookup: {filename: unique_code}
18
+ filename_codes: Dict[str, str] = {}
19
+
20
+ @app.post("/upload/")
21
+ async def upload_file(file: UploadFile = File(...)):
22
+ try:
23
+ # Generate unique code
24
+ unique_code = str(uuid.uuid4())[:8]
25
+
26
+ # Save the uploaded file
27
+ file_path = os.path.join(UPLOAD_DIR, file.filename)
28
+ with open(file_path, "wb") as buffer:
29
+ shutil.copyfileobj(file.file, buffer)
30
+
31
+ # Store the mapping
32
+ file_codes[unique_code] = file.filename
33
+ filename_codes[file.filename] = unique_code
34
+
35
+ return {
36
+ "filename": file.filename,
37
+ "access_code": unique_code,
38
+ "message": "File uploaded successfully"
39
+ }
40
+ except Exception as e:
41
+ raise HTTPException(status_code=500, detail=str(e))
42
+
43
+ @app.post("/upload-multiple/")
44
+ async def upload_multiple_files(files: List[UploadFile] = File(...)):
45
+ try:
46
+ uploaded_files = []
47
+ for file in files:
48
+ # Generate unique code for each file
49
+ unique_code = str(uuid.uuid4())[:8]
50
+
51
+ file_path = os.path.join(UPLOAD_DIR, file.filename)
52
+ with open(file_path, "wb") as buffer:
53
+ shutil.copyfileobj(file.file, buffer)
54
+
55
+ # Store the mapping
56
+ file_codes[unique_code] = file.filename
57
+ filename_codes[file.filename] = unique_code
58
+
59
+ uploaded_files.append({
60
+ "filename": file.filename,
61
+ "access_code": unique_code
62
+ })
63
+ return {"files": uploaded_files, "message": "Files uploaded successfully"}
64
+ except Exception as e:
65
+ raise HTTPException(status_code=500, detail=str(e))
66
+
67
+ @app.get("/files/")
68
+ async def list_files():
69
+ try:
70
+ files = []
71
+ for filename in os.listdir(UPLOAD_DIR):
72
+ code = filename_codes.get(filename, "no_code")
73
+ files.append({
74
+ "filename": filename,
75
+ "access_code": code
76
+ })
77
+ return {"files": files}
78
+ except Exception as e:
79
+ raise HTTPException(status_code=500, detail=str(e))
80
+
81
+ @app.get("/download/{access_code}")
82
+ async def download_file(access_code: str):
83
+ try:
84
+ if access_code not in file_codes:
85
+ raise HTTPException(status_code=404, detail="Invalid access code")
86
+
87
+ filename = file_codes[access_code]
88
+ file_path = os.path.join(UPLOAD_DIR, filename)
89
+
90
+ if os.path.exists(file_path):
91
+ return FileResponse(file_path, filename=filename)
92
+ else:
93
+ raise HTTPException(status_code=404, detail="File not found")
94
+ except Exception as e:
95
+ raise HTTPException(status_code=500, detail=str(e))
96
+
97
+ @app.delete("/delete/{access_code}")
98
+ async def delete_file(access_code: str):
99
+ try:
100
+ if access_code not in file_codes:
101
+ raise HTTPException(status_code=404, detail="Invalid access code")
102
+
103
+ filename = file_codes[access_code]
104
+ file_path = os.path.join(UPLOAD_DIR, filename)
105
+
106
+ if os.path.exists(file_path):
107
+ os.remove(file_path)
108
+ # Remove from both mappings
109
+ del filename_codes[filename]
110
+ del file_codes[access_code]
111
+ return {"message": f"File {filename} deleted successfully"}
112
+ else:
113
+ raise HTTPException(status_code=404, detail="File not found")
114
+ except Exception as e:
115
+ raise HTTPException(status_code=500, detail=str(e))
116
+
117
+ @app.get("/{access_code}")
118
+ async def direct_download(access_code: str):
119
+ """Direct download route using just the access code in the URL"""
120
+ try:
121
+ if access_code not in file_codes:
122
+ # If not a valid access code, return 404 or redirect to home
123
+ raise HTTPException(status_code=404, detail="Invalid access code")
124
+
125
+ filename = file_codes[access_code]
126
+ file_path = os.path.join(UPLOAD_DIR, filename)
127
+
128
+ if os.path.exists(file_path):
129
+ # Determine content disposition based on file type
130
+ content_disposition = "inline" # For viewing in browser
131
+ # For certain file types, force download
132
+ if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.pdf', '.txt')):
133
+ content_disposition = "attachment"
134
+
135
+ return FileResponse(
136
+ file_path,
137
+ filename=filename,
138
+ headers={"Content-Disposition": f"{content_disposition}; filename={filename}"}
139
+ )
140
+ else:
141
+ raise HTTPException(status_code=404, detail="File not found")
142
+ except Exception as e:
143
+ raise HTTPException(status_code=500, detail=str(e))
144
+
145
+ if __name__ == "__main__":
146
+ import uvicorn
147
+ uvicorn.run(app, host="0.0.0.0", port=7860)
148
+ #uvicorn.run(app, host="127.0.0.1", port=7860)