bigbossmonster commited on
Commit
17cfce0
·
verified ·
1 Parent(s): 77928b9

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +276 -0
app.py ADDED
@@ -0,0 +1,276 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import shutil
3
+ import subprocess
4
+ import requests
5
+ import re
6
+ from urllib.parse import urlparse
7
+ from typing import Optional, List
8
+ from pathlib import Path
9
+
10
+ from fastapi import FastAPI, HTTPException, BackgroundTasks
11
+ from pydantic import BaseModel
12
+ from huggingface_hub import HfApi
13
+ from pySmartDL import SmartDL
14
+
15
+ # --- dependency check ---
16
+ try:
17
+ subprocess.run(["pip", "install", "--upgrade", "gdown"], check=True, capture_output=True, text=True)
18
+ except subprocess.CalledProcessError as e:
19
+ print(f"Error occurred installing gdown: {e.stderr}")
20
+
21
+ # --- Configuration ---
22
+ DIR = "download"
23
+ os.makedirs(DIR, exist_ok=True)
24
+
25
+ # Helper to get token (checks env var first)
26
+ def get_token():
27
+ return os.getenv('HF_token')
28
+
29
+ def get_civit_key():
30
+ return os.getenv('civitai_api')
31
+
32
+ app = FastAPI(title="File Management API")
33
+
34
+ # --- Pydantic Models for Request Validation ---
35
+ class ProcessRequest(BaseModel):
36
+ urls: str # Accepts newline separated string to match original logic, or you could change to List[str]
37
+ hf_path: Optional[str] = "bigbossmonster/output"
38
+ selection: str = "default" # options: default, wget, SmartDl, aria2, curl
39
+ hf_check: bool = False # If True, skip upload
40
+ upload_type: str = "model" # options: model, dataset
41
+ hf_token: Optional[str] = None # Optional override for token
42
+
43
+ # --- Core Logic Functions (Refactored) ---
44
+
45
+ def wipe_folder_logic():
46
+ try:
47
+ if os.path.exists(DIR):
48
+ shutil.rmtree(DIR)
49
+ os.makedirs(DIR, exist_ok=True)
50
+ return f"All files in '{DIR}' have been wiped successfully."
51
+ else:
52
+ return f"The folder '{DIR}' does not exist."
53
+ except Exception as e:
54
+ return f"Error wiping folder: {str(e)}"
55
+
56
+ def extract_file_name(url):
57
+ parsed_url = urlparse(url)
58
+ path_segments = parsed_url.path.split('/')
59
+ return path_segments[-1]
60
+
61
+ def download_file(url, save_path, selection):
62
+ if selection == "default":
63
+ try:
64
+ with requests.get(url, stream=True) as response:
65
+ response.raise_for_status()
66
+ with open(save_path, 'wb') as file:
67
+ for chunk in response.iter_content(chunk_size=8192):
68
+ file.write(chunk)
69
+ print(f"File downloaded to {save_path}")
70
+ except requests.RequestException as e:
71
+ print(f"Failed to download file: {e}")
72
+ elif selection in ["wget", "aria2", "curl"]:
73
+ try:
74
+ os.makedirs(DIR, exist_ok=True)
75
+ command = []
76
+ if selection == "wget":
77
+ command = ["wget", url, "-P", DIR]
78
+ elif selection == "aria2":
79
+ command = ["aria2c", "-x16", "-s16", "-d", DIR, url]
80
+ elif selection == "curl":
81
+ command = ["curl", "-o", save_path, url]
82
+
83
+ subprocess.run(command, check=True)
84
+ print(f"File downloaded to {DIR} using {selection}")
85
+ except subprocess.CalledProcessError as e:
86
+ print(f"Failed to download file using {selection}: {e}")
87
+ else: # SmartDL
88
+ try:
89
+ obj = SmartDL(url, save_path)
90
+ obj.start()
91
+ print(f"File downloaded successfully to: {obj.get_dest()} using SmartDL")
92
+ except Exception as e:
93
+ print(f"Failed to download file using SmartDL: {e}")
94
+
95
+ def extract_google_drive_id(url):
96
+ drive_id_pattern = re.compile(
97
+ r'(?:drive.google.com/.*?id=|drive.google.com/file/d/|drive.google.com/open\?id=|drive.google.com/uc\?id=|drive.google.com/drive/folders/)([a-zA-Z0-9_-]+)'
98
+ )
99
+ match = drive_id_pattern.search(url)
100
+ return match.group(1) if match else None
101
+
102
+ def download_google_drive(url):
103
+ file_id = extract_google_drive_id(url)
104
+ if file_id:
105
+ download_url = f"https://drive.google.com/uc?id={file_id}"
106
+ print(f"Downloading from: {download_url}")
107
+ try:
108
+ # We must be inside the directory for gdown to work as intended with --folder sometimes
109
+ cwd = os.getcwd()
110
+ os.chdir(DIR)
111
+ if "folders" in url:
112
+ subprocess.run(["gdown", "--remaining-ok", "--folder" , url], check=True)
113
+ else:
114
+ subprocess.run(["gdown", download_url], check=True)
115
+ os.chdir(cwd) # Switch back
116
+ return f"Downloaded: {download_url}"
117
+ except subprocess.CalledProcessError as e:
118
+ os.chdir(cwd) # Switch back even on error
119
+ return f"Failed to download: {e}"
120
+ except Exception as e:
121
+ return f"Error: {e}"
122
+ else:
123
+ return f"Invalid Google Drive URL: {url}"
124
+
125
+ def download_hugging_face(url, selection):
126
+ file_name = extract_file_name(url)
127
+ save_path = os.path.join(DIR, file_name)
128
+ download_file(url, save_path, selection)
129
+ return f"Downloaded: {url}"
130
+
131
+ def download_katfile(url, selection):
132
+ apiKey = "699996yph6h88a7rc6c1g8"
133
+ try:
134
+ parts = url.split('/')
135
+ domain = parts[2]
136
+ filecode = parts[3]
137
+ cloneurl = f"https://{domain}/api/file/clone?key={apiKey}&file_code={filecode}"
138
+
139
+ response = requests.get(cloneurl)
140
+ if response.status_code == 200:
141
+ json_data = response.json()
142
+ download_url = json_data.get('result', {}).get('url')
143
+ if not download_url: return "Failed to get Katfile URL"
144
+
145
+ parts_final = download_url.split('/')
146
+ filecodex = parts_final[3]
147
+ final_url = f"https://{domain}/api/file/direct_link?key={apiKey}&file_code={filecodex}"
148
+
149
+ response = requests.get(final_url)
150
+ if response.status_code == 200:
151
+ json_data = response.json()
152
+ real_dl_url = json_data.get('result', {}).get('url')
153
+ save_path = os.path.join(DIR, extract_file_name(real_dl_url))
154
+ download_file(real_dl_url, save_path, selection)
155
+ return f"Downloaded: {real_dl_url}"
156
+ except Exception as e:
157
+ print(f"Error downloading from Katfile: {str(e)}")
158
+ return "Error downloading Katfile"
159
+
160
+ def download_civitai(url, selection):
161
+ civit_key = get_civit_key()
162
+ parsed_url = urlparse(url)
163
+ path_components = parsed_url.path.split("/")
164
+ modelVersionId = path_components[-1]
165
+ save_path = os.path.join(DIR, f"{modelVersionId}.safecheck") # Temp name if unknown, usually CivitAI needs content-disposition
166
+
167
+ # CivitAI specific logic
168
+ if selection == "curl":
169
+ command = [
170
+ "curl", "-L",
171
+ "-H", f"Authorization: Bearer {civit_key}",
172
+ "-o", save_path, # Added output path for curl
173
+ f"{url}"
174
+ ]
175
+ subprocess.run(command)
176
+ else:
177
+ # Construct API URL
178
+ api_url = f"https://civitai.com/api/download/models/{modelVersionId}?token={civit_key}"
179
+ # For standard downloads we generally want the real filename,
180
+ # but sticking to your logic:
181
+ download_file(api_url, os.path.join(DIR, "civitai_model"), selection)
182
+
183
+ return f"Downloaded: {url}"
184
+
185
+ def handle_download(url, selection):
186
+ try:
187
+ if "drive.google.com" in url:
188
+ return download_google_drive(url)
189
+ elif "huggingface.co" in url:
190
+ return download_hugging_face(url, selection)
191
+ elif "civitai.com" in url:
192
+ return download_civitai(url, selection)
193
+ elif "katfile.com" in url:
194
+ return download_katfile(url, selection)
195
+ else:
196
+ save_path = os.path.join(DIR, extract_file_name(url))
197
+ download_file(url, save_path, selection)
198
+ return f"Downloaded: {url}"
199
+ except Exception as e:
200
+ return f"Error occurred: {str(e)}"
201
+
202
+ def upload_to_hf(hf_path, upload_type, token_val):
203
+ repo_id = hf_path if hf_path else "bigbossmonster/output"
204
+ commit_message = "Upload folder with Python script"
205
+
206
+ if os.path.exists(DIR):
207
+ api = HfApi()
208
+ try:
209
+ api.upload_folder(
210
+ folder_path=DIR,
211
+ repo_id=repo_id,
212
+ commit_message=commit_message,
213
+ token=token_val,
214
+ repo_type=upload_type,
215
+ )
216
+ return f"Upload success! Folder is available at https://huggingface.co/{repo_id}"
217
+ except Exception as e:
218
+ return f"Failed to upload folder. Error: {e}"
219
+ else:
220
+ return f"The folder '{DIR}' does not exist."
221
+
222
+ # --- API Endpoints ---
223
+
224
+ @app.get("/")
225
+ def home():
226
+ return {"message": "File Management API is running. Go to /docs for Swagger UI."}
227
+
228
+ @app.post("/process")
229
+ def process_batch(request: ProcessRequest):
230
+ """
231
+ Main endpoint to download files and optionally upload to HF.
232
+ """
233
+ # 1. Process Downloads
234
+ download_results = []
235
+ urls_list = request.urls.splitlines()
236
+
237
+ for url in urls_list:
238
+ url = url.strip()
239
+ if url:
240
+ result = handle_download(url, request.selection)
241
+ download_results.append(result)
242
+
243
+ download_msg = "\n".join(download_results)
244
+
245
+ # 2. Process Upload (if checked)
246
+ upload_msg = "Do Not upload to HF"
247
+ if not request.hf_check:
248
+ # Determine token: Request > Env Var
249
+ active_token = request.hf_token if request.hf_token else get_token()
250
+ upload_msg = upload_to_hf(request.hf_path, request.upload_type, active_token)
251
+
252
+ return {
253
+ "download_status": download_msg,
254
+ "upload_status": upload_msg
255
+ }
256
+
257
+ @app.post("/wipe")
258
+ def wipe_folder():
259
+ """Wipes the download directory."""
260
+ return {"status": wipe_folder_logic()}
261
+
262
+ @app.get("/files")
263
+ def list_files_endpoint():
264
+ """Lists files in the download directory."""
265
+ try:
266
+ if not os.path.exists(DIR):
267
+ return {"files": [], "message": "Directory does not exist."}
268
+
269
+ files = os.listdir(DIR)
270
+ return {"files": files, "count": len(files)}
271
+ except Exception as e:
272
+ raise HTTPException(status_code=500, detail=str(e))
273
+
274
+ if __name__ == "__main__":
275
+ import uvicorn
276
+ uvicorn.run(app, host="0.0.0.0", port=7860)