Translsis commited on
Commit
cabb42d
·
verified ·
1 Parent(s): c954dee

Delete app(3).py

Browse files
Files changed (1) hide show
  1. app(3).py +0 -646
app(3).py DELETED
@@ -1,646 +0,0 @@
1
- import gradio as gr
2
- import os
3
- import logging
4
- import asyncio
5
- import tempfile
6
- import git
7
- import shutil
8
- import zipfile
9
- import secrets
10
- import string
11
- from pathlib import Path
12
- from typing import Tuple, Optional, Dict
13
- from huggingface_hub import login, HfApi, create_repo, upload_file
14
- from urllib.parse import urlparse
15
- from functools import partial
16
- from datetime import datetime
17
-
18
- # Setup logging
19
- logging.basicConfig(
20
- level=logging.INFO,
21
- format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
22
- )
23
- logger = logging.getLogger(__name__)
24
-
25
- class ZipHandler:
26
- @staticmethod
27
- def generate_password(length: int = 16) -> str:
28
- """Generate a random password for ZIP encryption"""
29
- alphabet = string.ascii_letters + string.digits + string.punctuation
30
- return ''.join(secrets.choice(alphabet) for _ in range(length))
31
-
32
- @staticmethod
33
- async def create_zip(
34
- source_path: Path,
35
- use_password: bool = False,
36
- password: Optional[str] = None,
37
- progress_callback: Optional[callable] = None
38
- ) -> Tuple[Path, Optional[str]]:
39
- """Create a ZIP file from the source path"""
40
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
41
- zip_path = Path(tempfile.gettempdir()) / f"upload_{timestamp}.zip"
42
-
43
- if use_password and not password:
44
- password = ZipHandler.generate_password()
45
-
46
- try:
47
- total_files = sum(1 for _ in Path(source_path).rglob('*') if _.is_file())
48
- processed_files = 0
49
-
50
- with zipfile.ZipFile(
51
- zip_path,
52
- 'w',
53
- zipfile.ZIP_DEFLATED,
54
- compresslevel=9
55
- ) as zipf:
56
- for root, _, files in os.walk(source_path):
57
- for file in files:
58
- file_path = Path(root) / file
59
- arc_name = str(file_path.relative_to(source_path))
60
-
61
- if use_password:
62
- zipf.write(
63
- file_path,
64
- arc_name,
65
- zipfile.ZIP_DEFLATED,
66
- pwd=password.encode() if password else None
67
- )
68
- else:
69
- zipf.write(file_path, arc_name)
70
-
71
- processed_files += 1
72
- if progress_callback:
73
- progress_callback(processed_files / total_files)
74
-
75
- return zip_path, password
76
- except Exception as e:
77
- logger.error(f"ZIP creation failed: {e}")
78
- if zip_path.exists():
79
- zip_path.unlink()
80
- raise
81
-
82
- class HuggingFaceManager:
83
- def __init__(self):
84
- self.api = HfApi()
85
- self.token: Optional[str] = None
86
- self.temp_dir = Path(tempfile.mkdtemp())
87
-
88
- def cleanup(self):
89
- """Clean up temporary directory"""
90
- if self.temp_dir.exists():
91
- shutil.rmtree(self.temp_dir, ignore_errors=True)
92
-
93
- def validate_repo_name(self, repo_name: str) -> bool:
94
- """Validate repository name format"""
95
- if not repo_name or '/' not in repo_name:
96
- raise ValueError("Repository name must be in format 'username/repo-name'")
97
- username, repo = repo_name.split('/', 1)
98
- return all(name.strip() for name in [username, repo])
99
-
100
- def validate_url(self, url: str) -> bool:
101
- """Validate URL format"""
102
- try:
103
- result = urlparse(url)
104
- return all([result.scheme, result.netloc])
105
- except Exception:
106
- return False
107
-
108
- async def login_and_validate(self, token: str) -> str:
109
- """Login to Hugging Face and validate token"""
110
- if not token.strip():
111
- return "Error: Token cannot be empty"
112
-
113
- try:
114
- loop = asyncio.get_event_loop()
115
- await loop.run_in_executor(None, login, token)
116
- await loop.run_in_executor(None, self.api.whoami)
117
-
118
- self.token = token
119
- return "Login successful!"
120
- except Exception as e:
121
- logger.error(f"Login failed: {e}")
122
- return f"Login failed: {str(e)}"
123
-
124
- async def download_from_url(
125
- self,
126
- url: str,
127
- download_type: str,
128
- progress: Optional[gr.Progress] = None
129
- ) -> Tuple[str, Optional[str]]:
130
- """Download content from URL using wget or git"""
131
- if not self.validate_url(url):
132
- return "Invalid URL format!", None
133
-
134
- output_path = self.temp_dir / "downloaded_content"
135
- if output_path.exists():
136
- shutil.rmtree(output_path)
137
- output_path.mkdir(parents=True)
138
-
139
- try:
140
- if download_type == "wget":
141
- if progress:
142
- progress(0, desc="Downloading with wget...")
143
-
144
- process = await asyncio.create_subprocess_exec(
145
- 'wget', '-P', str(output_path), url,
146
- stdout=asyncio.subprocess.PIPE,
147
- stderr=asyncio.subprocess.PIPE
148
- )
149
-
150
- try:
151
- stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=300)
152
- if process.returncode != 0:
153
- raise Exception(f"wget failed: {stderr.decode()}")
154
- except asyncio.TimeoutError:
155
- raise Exception("Download timed out after 5 minutes")
156
-
157
- elif download_type == "git":
158
- if progress:
159
- progress(0, desc="Cloning git repository...")
160
-
161
- await asyncio.get_event_loop().run_in_executor(
162
- None,
163
- partial(git.Repo.clone_from, url, str(output_path), depth=1)
164
- )
165
-
166
- if progress:
167
- progress(1, desc="Download complete")
168
-
169
- return "Download successful!", str(output_path)
170
-
171
- except Exception as e:
172
- logger.error(f"Download failed: {e}")
173
- if output_path.exists():
174
- shutil.rmtree(output_path)
175
- return f"Download failed: {str(e)}", None
176
-
177
- async def create_repository(
178
- self,
179
- repo_name: str,
180
- repo_type: str,
181
- is_private: bool
182
- ) -> str:
183
- """Create a new repository on Hugging Face"""
184
- if not self.token:
185
- return "Please login first!"
186
-
187
- try:
188
- self.validate_repo_name(repo_name)
189
-
190
- await asyncio.get_event_loop().run_in_executor(
191
- None,
192
- partial(
193
- create_repo,
194
- repo_name,
195
- private=is_private,
196
- repo_type=repo_type,
197
- exist_ok=True
198
- )
199
- )
200
-
201
- return f"Repository '{repo_name}' created successfully!"
202
- except Exception as e:
203
- logger.error(f"Failed to create repository: {e}")
204
- return f"Failed to create repository: {str(e)}"
205
-
206
- async def upload_file_worker(
207
- self,
208
- file_path: Path,
209
- relative_path: Path,
210
- repo_name: str,
211
- repo_type: str,
212
- progress_callback=None
213
- ) -> Optional[str]:
214
- """Upload a single file to repository"""
215
- try:
216
- await asyncio.get_event_loop().run_in_executor(
217
- None,
218
- partial(
219
- upload_file,
220
- path_or_fileobj=str(file_path),
221
- path_in_repo=str(relative_path),
222
- repo_id=repo_name,
223
- repo_type=repo_type,
224
- commit_message=f"Upload: {relative_path}"
225
- )
226
- )
227
- if progress_callback:
228
- progress_callback()
229
- return str(relative_path)
230
- except Exception as e:
231
- logger.error(f"Error uploading {relative_path}: {e}")
232
- return None
233
-
234
- async def upload_folder(
235
- self,
236
- folder_path: str,
237
- repo_name: str,
238
- repo_type: str,
239
- use_zip: bool = False,
240
- use_password: bool = False,
241
- custom_password: Optional[str] = None,
242
- progress: Optional[gr.Progress] = None
243
- ) -> Dict[str, str]:
244
- """Upload entire folder to repository with optional ZIP compression"""
245
- if not self.token:
246
- return {"status": "Please login first!", "password": None}
247
-
248
- folder_path = Path(folder_path)
249
- if not folder_path.exists():
250
- return {"status": f"Folder {folder_path} does not exist!", "password": None}
251
-
252
- try:
253
- self.validate_repo_name(repo_name)
254
-
255
- if use_zip:
256
- if progress:
257
- progress(0, desc="Creating ZIP archive...")
258
-
259
- zip_path, password = await ZipHandler.create_zip(
260
- folder_path,
261
- use_password,
262
- custom_password,
263
- lambda p: progress(p * 0.5) if progress else None
264
- )
265
-
266
- if progress:
267
- progress(0.5, desc="Uploading ZIP file...")
268
-
269
- # Upload single ZIP file
270
- await self.upload_file_worker(
271
- zip_path,
272
- zip_path.name,
273
- repo_name,
274
- repo_type,
275
- lambda: progress(1.0) if progress else None
276
- )
277
-
278
- # Clean up ZIP file
279
- zip_path.unlink()
280
-
281
- status = "Upload completed successfully!"
282
- if password:
283
- status += f"\nZIP password: {password}"
284
- return {"status": status, "password": password}
285
-
286
- else:
287
- # Original folder upload logic
288
- files_to_upload = []
289
- for root, dirs, files in os.walk(folder_path):
290
- if '.git' in dirs:
291
- dirs.remove('.git')
292
-
293
- for file in files:
294
- file_path = Path(root) / file
295
- relative_path = file_path.relative_to(folder_path)
296
- files_to_upload.append((file_path, relative_path))
297
-
298
- if not files_to_upload:
299
- return {"status": "No files found to upload", "password": None}
300
-
301
- if progress:
302
- progress(0, desc=f"Uploading {len(files_to_upload)} files...")
303
-
304
- uploaded_count = 0
305
- def update_progress():
306
- nonlocal uploaded_count
307
- uploaded_count += 1
308
- if progress:
309
- progress(uploaded_count / len(files_to_upload))
310
-
311
- tasks = [
312
- self.upload_file_worker(
313
- file_path,
314
- relative_path,
315
- repo_name,
316
- repo_type,
317
- update_progress
318
- )
319
- for file_path, relative_path in files_to_upload
320
- ]
321
-
322
- results = await asyncio.gather(*tasks, return_exceptions=True)
323
-
324
- successful = sum(1 for r in results if r is not None)
325
- failed = len(files_to_upload) - successful
326
-
327
- return {
328
- "status": f"Uploaded {successful} files successfully. Failed: {failed}",
329
- "password": None
330
- }
331
-
332
- except Exception as e:
333
- logger.error(f"Upload failed: {e}")
334
- return {"status": f"Upload failed: {str(e)}", "password": None}
335
-
336
- async def delete_files(
337
- self,
338
- folder_path: str,
339
- repo_name: str,
340
- repo_type: str,
341
- dry_run: bool = True,
342
- progress: Optional[gr.Progress] = None
343
- ) -> str:
344
- """Delete files from repository based on local folder structure"""
345
- if not self.token:
346
- return "Please login first!"
347
-
348
- try:
349
- self.validate_repo_name(repo_name)
350
-
351
- folder_path = Path(folder_path)
352
- if not folder_path.exists():
353
- return f"Folder {folder_path} does not exist!"
354
-
355
- # Get repository files
356
- existing_files = await asyncio.get_event_loop().run_in_executor(
357
- None,
358
- partial(self.api.list_repo_files, repo_id=repo_name, repo_type=repo_type)
359
- )
360
-
361
- # Find files to delete
362
- files_to_delete = []
363
- for root, dirs, files in os.walk(folder_path):
364
- if '.git' in dirs:
365
- dirs.remove('.git')
366
-
367
- for file in files:
368
- file_path = Path(root) / file
369
- relative_path = str(file_path.relative_to(folder_path))
370
- if relative_path in existing_files:
371
- files_to_delete.append(relative_path)
372
-
373
- if not files_to_delete:
374
- return "No matching files found to delete"
375
-
376
- if dry_run:
377
- return f"Dry run: Would delete {len(files_to_delete)} files"
378
-
379
- # Perform deletion
380
- if progress:
381
- progress(0, desc=f"Deleting {len(files_to_delete)} files...")
382
-
383
- deleted_count = 0
384
- for file_path in files_to_delete:
385
- try:
386
- await asyncio.get_event_loop().run_in_executor(
387
- None,
388
- partial(
389
- self.api.delete_file,
390
- repo_id=repo_name,
391
- path_in_repo=file_path,
392
- repo_type=repo_type
393
- )
394
- )
395
- deleted_count += 1
396
- if progress:
397
- progress(deleted_count / len(files_to_delete))
398
- except Exception as e:
399
- logger.error(f"Error deleting {file_path}: {e}")
400
-
401
- return f"Successfully deleted {deleted_count} out of {len(files_to_delete)} files"
402
-
403
- except Exception as e:
404
- logger.error(f"Delete operation failed: {e}")
405
- return f"Delete operation failed: {str(e)}"
406
-
407
- def create_interface() -> gr.Blocks:
408
- """Create Gradio interface"""
409
- manager = HuggingFaceManager()
410
-
411
- with gr.Blocks(title="Hugging Face Repository Manager") as app:
412
- gr.Markdown("# Hugging Face Repository Manager")
413
-
414
- # Login Tab
415
- with gr.Tab("Login"):
416
- token_input = gr.Textbox(
417
- label="Hugging Face Token",
418
- type="password",
419
- placeholder="Enter your Hugging Face token"
420
- )
421
- login_btn = gr.Button("Login", variant="primary")
422
- login_output = gr.Textbox(label="Login Status", interactive=False)
423
-
424
- login_btn.click(
425
- fn=lambda x: asyncio.run(manager.login_and_validate(x)),
426
- inputs=[token_input],
427
- outputs=[login_output]
428
- )
429
-
430
- # Download Tab
431
- with gr.Tab("Download"):
432
- download_url = gr.Textbox(
433
- label="URL",
434
- placeholder="Enter URL to download"
435
- )
436
- download_type = gr.Radio(
437
- choices=["wget", "git"],
438
- label="Download Type",
439
- value="wget"
440
- )
441
- download_btn = gr.Button("Download", variant="primary")
442
- download_output = gr.Textbox(label="Status", interactive=False)
443
- download_path = gr.download_path = gr.Textbox(
444
- label="Download Path",
445
- interactive=False,
446
- visible=False
447
- )
448
-
449
- download_btn.click(
450
- fn=lambda x, y: asyncio.run(manager.download_from_url(x, y)),
451
- inputs=[download_url, download_type],
452
- outputs=[download_output, download_path]
453
- )
454
-
455
- # Create Repository Tab
456
- with gr.Tab("Create Repository"):
457
- repo_name = gr.Textbox(
458
- label="Repository Name",
459
- placeholder="username/repo-name"
460
- )
461
- repo_type = gr.Radio(
462
- choices=["model", "dataset", "space"],
463
- label="Repository Type",
464
- value="model"
465
- )
466
- is_private = gr.Checkbox(label="Private Repository", value=True)
467
- create_btn = gr.Button("Create Repository", variant="primary")
468
- create_output = gr.Textbox(label="Status", interactive=False)
469
-
470
- create_btn.click(
471
- fn=lambda x, y, z: asyncio.run(manager.create_repository(x, y, z)),
472
- inputs=[repo_name, repo_type, is_private],
473
- outputs=[create_output]
474
- )
475
-
476
- # Upload Tab
477
- with gr.Tab("Upload"):
478
- with gr.Row():
479
- upload_folder_path = gr.Textbox(
480
- label="Local Folder Path",
481
- placeholder="Path to local folder"
482
- )
483
- use_downloaded = gr.Checkbox(
484
- label="Use Downloaded Content",
485
- value=False
486
- )
487
-
488
- upload_repo_name = gr.Textbox(
489
- label="Repository Name",
490
- placeholder="username/repo-name"
491
- )
492
- upload_repo_type = gr.Radio(
493
- choices=["model", "dataset", "space"],
494
- label="Repository Type",
495
- value="model"
496
- )
497
-
498
- with gr.Row():
499
- use_zip = gr.Checkbox(label="Create ZIP Archive", value=False)
500
- use_password = gr.Checkbox(
501
- label="Password Protect ZIP",
502
- value=False,
503
- interactive=True
504
- )
505
-
506
- with gr.Row():
507
- custom_password = gr.Textbox(
508
- label="Custom Password (optional)",
509
- placeholder="Leave empty for auto-generated password",
510
- visible=False
511
- )
512
-
513
- # Show/hide password input based on checkbox
514
- use_password.change(
515
- fn=lambda x: gr.update(visible=x),
516
- inputs=[use_password],
517
- outputs=[custom_password]
518
- )
519
-
520
- upload_btn = gr.Button("Upload Files", variant="primary")
521
- upload_status = gr.Textbox(label="Status", interactive=False)
522
- password_output = gr.Textbox(
523
- label="ZIP Password",
524
- interactive=False,
525
- visible=False
526
- )
527
-
528
- def prepare_upload_path(folder_path, use_downloaded, downloaded_path):
529
- return downloaded_path if use_downloaded and downloaded_path else folder_path
530
-
531
- def process_upload_result(result):
532
- """Process the upload result and split it into status and password"""
533
- if isinstance(result, dict):
534
- return result["status"], result.get("password", "")
535
- return str(result), ""
536
-
537
- def update_password_visibility(status, password):
538
- """Update password field visibility based on whether a password exists"""
539
- return gr.update(visible=bool(password))
540
-
541
- upload_btn.click(
542
- fn=lambda *args: asyncio.run(
543
- manager.upload_folder(
544
- prepare_upload_path(args[0], args[1], args[2]),
545
- args[3],
546
- args[4],
547
- args[5],
548
- args[6],
549
- args[7]
550
- )
551
- ),
552
- inputs=[
553
- upload_folder_path,
554
- use_downloaded,
555
- download_path,
556
- upload_repo_name,
557
- upload_repo_type,
558
- use_zip,
559
- use_password,
560
- custom_password
561
- ],
562
- outputs=[upload_status, password_output]
563
- ).then(
564
- fn=update_password_visibility,
565
- inputs=[upload_status, password_output],
566
- outputs=[password_output]
567
- )
568
-
569
- # Delete Tab
570
- with gr.Tab("Delete"):
571
- delete_folder_path = gr.Textbox(
572
- label="Local Folder Path",
573
- placeholder="Path to local folder for reference"
574
- )
575
- delete_repo_name = gr.Textbox(
576
- label="Repository Name",
577
- placeholder="username/repo-name"
578
- )
579
- delete_repo_type = gr.Radio(
580
- choices=["model", "dataset", "space"],
581
- label="Repository Type",
582
- value="model"
583
- )
584
- dry_run = gr.Checkbox(
585
- label="Dry Run",
586
- value=True,
587
- info="Preview changes without making them"
588
- )
589
- delete_btn = gr.Button("Delete Files", variant="secondary")
590
- delete_output = gr.Textbox(label="Status", interactive=False)
591
-
592
- delete_btn.click(
593
- fn=lambda *args: asyncio.run(manager.delete_files(*args)),
594
- inputs=[
595
- delete_folder_path,
596
- delete_repo_name,
597
- delete_repo_type,
598
- dry_run
599
- ],
600
- outputs=[delete_output]
601
- )
602
-
603
- # Error handling
604
- gr.Error()
605
-
606
- # Footer information
607
- gr.Markdown("""
608
- ### Information
609
- - Get your token from [Hugging Face Settings](https://huggingface.co/settings/tokens)
610
- - Repository names must be in format: username/repository-name
611
- - For files larger than 5GB, use Git LFS
612
- - Repository types:
613
- - Model: For ML models and weights
614
- - Dataset: For datasets and data files
615
- - Space: For demos and applications
616
- """)
617
-
618
- return app
619
-
620
- def main():
621
- """Application entry point"""
622
- try:
623
- logger.info("Starting Hugging Face Repository Manager")
624
- app = create_interface()
625
-
626
- # Configure and launch the application
627
- app.queue()
628
- app.launch(
629
- server_name="0.0.0.0",
630
- server_port=7860,
631
- share=True,
632
- max_threads=2,
633
- # Security configurations
634
- auth=None,
635
- ssl_keyfile=None,
636
- ssl_certfile=None,
637
- ssl_verify=True
638
- )
639
- except Exception as e:
640
- logger.error(f"Application failed to start: {e}")
641
- raise
642
- finally:
643
- logger.info("Shutting down Hugging Face Repository Manager")
644
-
645
- if __name__ == "__main__":
646
- main()