Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import requests | |
| from pathlib import Path | |
| import re | |
| import os | |
| import tempfile | |
| import shutil | |
| import urllib | |
| from huggingface_hub import whoami, HfApi, hf_hub_download, RepoCard | |
| from huggingface_hub.utils import build_hf_headers, hf_raise_for_status | |
| from gradio_huggingfacehub_search import HuggingfaceHubSearch | |
| ENDPOINT = "https://huggingface.co" | |
| # ENDPOINT = "http://localhost:5564" | |
| REPO_TYPES = ["model", "dataset", "space"] | |
| HF_REPO = os.environ.get("HF_REPO") if os.environ.get("HF_REPO") else "" # set your default repo | |
| HF_REPO_PREFIX = os.environ.get("HF_REPO_PREFIX") if os.environ.get("HF_REPO_PREFIX") else "" # set your default repo prefix | |
| HF_REPO_SUFFIX = os.environ.get("HF_REPO_SUFFIX") if os.environ.get("HF_REPO_SUFFIX") else "" # set your default repo suffix | |
| HF_USER = os.environ.get("HF_USER") if os.environ.get("HF_USER") else "" # set your username | |
| REGEX_HF_REPO = r'^[\w_\-\.]+/[\w_\-\.]+$' | |
| def remove_repo_tags(repo_id: str, tags: list[str], repo_type: str, hf_token: str): | |
| try: | |
| card = RepoCard.load(repo_id, repo_type=repo_type, token=hf_token) | |
| orig_content = card.content | |
| for tag in tags: | |
| if 'tags' in card.data and tag in card.data['tags']: card.data['tags'].remove(tag) | |
| if card.content == orig_content: return | |
| card.push_to_hub(repo_id=repo_id, repo_type=repo_type, token=hf_token) | |
| except Exception as e: | |
| print(f"Failed to remove tags from repocard. {e}") | |
| def duplicate(source_repo, dst_repo, repo_type, private, overwrite, auto_dir, remove_tag, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)): | |
| hf_token = oauth_token.token | |
| api = HfApi(token=hf_token) | |
| try: | |
| if not repo_type in REPO_TYPES: | |
| raise ValueError("need to select valid repo type") | |
| _ = whoami(oauth_token.token) | |
| # ^ this will throw if token is invalid | |
| except Exception as e: | |
| raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""") | |
| try: | |
| if re.fullmatch(REGEX_HF_REPO, source_repo): target = "" | |
| else: | |
| source_repo, target = re.findall(r'^(?:http.+\.co/)?(?:datasets)?(?:spaces)?([\w_\-\.]+/[\w_\-\.]+)/?(?:blob/main/)?(?:resolve/main/)?(.+)?$', source_repo)[0] | |
| target = urllib.parse.unquote(target.removesuffix("/")) | |
| if re.fullmatch(REGEX_HF_REPO, dst_repo): subfolder = "" | |
| else: | |
| dst_repo, subfolder = re.findall(r'^([\w_\-\.]+/[\w_\-\.]+)/?(.+)?$', dst_repo)[0] | |
| subfolder = subfolder.removesuffix("/") | |
| if auto_dir: subfolder = source_repo | |
| if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}") | |
| if overwrite or subfolder: | |
| temp_dir = tempfile.mkdtemp() | |
| api.create_repo(repo_id=dst_repo, repo_type=repo_type, private=private, exist_ok=True, token=hf_token) | |
| for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token): | |
| if target and target not in path: continue | |
| file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token) | |
| if not Path(file).exists(): continue | |
| if Path(file).is_dir(): # unused for now | |
| api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token) | |
| elif Path(file).is_file(): | |
| api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token) | |
| if Path(file).exists(): Path(file).unlink() | |
| if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}" | |
| elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}" | |
| else: repo_url = f"https://huggingface.co/{dst_repo}" | |
| shutil.rmtree(temp_dir) | |
| else: | |
| r = requests.post( | |
| f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate", | |
| headers=build_hf_headers(token=oauth_token.token), | |
| json={"repository": dst_repo, "private": private}, | |
| ) | |
| hf_raise_for_status(r) | |
| repo_url = r.json().get("url") | |
| if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token) | |
| return ( | |
| f'Find your repo <a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">here</a>', | |
| "sp.jpg", | |
| ) | |
| except Exception as e: | |
| print(e) | |
| raise gr.Error(f"Error occured: {e}") | |
| def parse_repos(s): | |
| repo_pattern = r'[^\w_\-\.]?([\w_\-\.]+/[\w_\-\.]+)[^\w_\-\.]?' | |
| try: | |
| s = re.sub("https?://[\\w/:%#\\$&\\?\\(\\)~\\.=\\+\\-]+", "", s) | |
| repos = re.findall(repo_pattern, s) | |
| return list(repos) | |
| except Exception: | |
| return [] | |
| def duplicate_m2o(source_repos_str, dst_repo, repo_type, private, overwrite, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)): | |
| hf_token = oauth_token.token | |
| api = HfApi(token=hf_token) | |
| try: | |
| if not repo_type in REPO_TYPES: | |
| raise ValueError("need to select valid repo type") | |
| _ = whoami(oauth_token.token) | |
| # ^ this will throw if token is invalid | |
| except Exception as e: | |
| raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""") | |
| try: | |
| if re.fullmatch(REGEX_HF_REPO, dst_repo): subfolder_prefix = "" | |
| else: | |
| dst_repo, subfolder_prefix = re.findall(r'^([\w_\-\.]+/[\w_\-\.]+)/?(.+)?$', dst_repo)[0] | |
| subfolder_prefix = subfolder.removesuffix("/") | |
| if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}") | |
| source_repos = parse_repos(source_repos_str) | |
| for source_repo in source_repos: | |
| if re.fullmatch(REGEX_HF_REPO, source_repo): target = "" | |
| else: | |
| source_repo, target = re.findall(r'^(?:http.+\.co/)?(?:datasets)?(?:spaces)?([\w_\-\.]+/[\w_\-\.]+)/?(?:blob/main/)?(?:resolve/main/)?(.+)?$', source_repo)[0] | |
| target = urllib.parse.unquote(target.removesuffix("/")) | |
| subfolder = subfolder_prefix + "/" + source_repo if subfolder_prefix else source_repo | |
| temp_dir = tempfile.mkdtemp() | |
| api.create_repo(repo_id=dst_repo, repo_type=repo_type, private=private, exist_ok=True, token=hf_token) | |
| for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token): | |
| if target and target not in path: continue | |
| file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token) | |
| if not Path(file).exists(): continue | |
| if Path(file).is_dir(): # unused for now | |
| api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token) | |
| elif Path(file).is_file(): | |
| api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token) | |
| if Path(file).exists(): Path(file).unlink() | |
| if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}" | |
| elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}" | |
| else: repo_url = f"https://huggingface.co/{dst_repo}" | |
| shutil.rmtree(temp_dir) | |
| return ( | |
| f'Find your repo <a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">here</a>', | |
| "sp.jpg", | |
| ) | |
| except Exception as e: | |
| print(e) | |
| raise gr.Error(f"Error occured: {e}") | |
| def duplicate_m2m(source_repos_str, hf_user, repo_type, private, overwrite, remove_tag, repo_prefix, repo_suffix, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)): | |
| hf_token = oauth_token.token | |
| api = HfApi(token=hf_token) | |
| try: | |
| if not repo_type in REPO_TYPES: | |
| raise ValueError("need to select valid repo type") | |
| _ = whoami(oauth_token.token) | |
| # ^ this will throw if token is invalid | |
| except Exception as e: | |
| raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""") | |
| try: | |
| source_repos = parse_repos(source_repos_str) | |
| repo_url_result = 'Find your repo ' | |
| for source_repo in source_repos: | |
| if not re.fullmatch(REGEX_HF_REPO, source_repo) or not api.repo_exists(repo_id=source_repo, repo_type=repo_type, token=hf_token): continue | |
| dst_repo = hf_user + "/" + repo_prefix + source_repo.split("/")[-1] + repo_suffix | |
| if not re.fullmatch(REGEX_HF_REPO, dst_repo): continue | |
| if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): | |
| gr.Info(f"Repo already exists {dst_repo}") | |
| continue | |
| r = requests.post( | |
| f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate", | |
| headers=build_hf_headers(token=oauth_token.token), | |
| json={"repository": dst_repo, "private": private}, | |
| ) | |
| hf_raise_for_status(r) | |
| repo_url = r.json().get("url") | |
| repo_url_result += f'<a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">{dst_repo}</a><br>\n' | |
| if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token) | |
| return ( | |
| repo_url_result, | |
| "sp.jpg", | |
| ) | |
| except Exception as e: | |
| print(e) | |
| raise gr.Error(f"Error occured: {e}") | |
| def add_repo_text(repo_id: str, source_repos: str): | |
| return source_repos + "\n" + repo_id if source_repos else repo_id | |
| def swap_visibilty(profile: gr.OAuthProfile | None): | |
| return gr.update(elem_classes=["main_ui_logged_in"]) if profile else gr.update(elem_classes=["main_ui_logged_out"]) | |
| css = ''' | |
| .main_ui_logged_out{opacity: 0.3; pointer-events: none} | |
| .title {text-align: center; align-items: center} | |
| ''' | |
| with gr.Blocks(css=css) as demo: | |
| gr.LoginButton() | |
| with gr.Column(elem_classes="main_ui_logged_out") as main_ui: | |
| gr.Markdown("# Duplicate your repo!", elem_classes="title") | |
| gr.Markdown("Duplicate a Hugging Face repository! This Space is a an experimental demo.") | |
| with gr.Tab("One to One"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| search = HuggingfaceHubSearch( | |
| label="source_repo", | |
| placeholder="Source repository (e.g. osanseviero/src)", | |
| search_type=["model", "dataset", "space"], | |
| sumbit_on_select=False, | |
| ) | |
| with gr.Group(): | |
| dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO) | |
| repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model") | |
| with gr.Row(): | |
| is_private = gr.Checkbox(label="Make new repo private?", value=True) | |
| is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=True) | |
| is_subdir = gr.Checkbox(label="Create subdirectories automatically?", value=True) | |
| is_remtag = gr.Checkbox(label="Remove NFAA tag?", value=True) | |
| with gr.Row(): | |
| submit_button = gr.Button("Submit", variant="primary") | |
| clear_button = gr.Button("Clear", variant="secondary") | |
| with gr.Column(): | |
| output_md = gr.Markdown(label="output") | |
| output_image = gr.Image(show_label=False) | |
| with gr.Tab("Multi to One"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| m2o_search = HuggingfaceHubSearch( | |
| label="source_repo", | |
| placeholder="Source repository (e.g. osanseviero/src)", | |
| search_type=["model", "dataset", "space"], | |
| sumbit_on_select=True, | |
| ) | |
| m2o_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10) | |
| with gr.Group(): | |
| m2o_dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO) | |
| m2o_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model") | |
| with gr.Row(): | |
| m2o_is_private = gr.Checkbox(label="Make new repo private?", value=True) | |
| m2o_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=True) | |
| with gr.Row(): | |
| m2o_submit_button = gr.Button("Submit", variant="primary") | |
| m2o_clear_button = gr.Button("Clear", variant="secondary") | |
| with gr.Column(): | |
| m2o_output_md = gr.Markdown(label="output") | |
| m2o_output_image = gr.Image(show_label=False) | |
| with gr.Tab("Multi to Multi"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| m2m_search = HuggingfaceHubSearch( | |
| label="source_repo", | |
| placeholder="Source repository (e.g. osanseviero/src)", | |
| search_type=["model", "dataset", "space"], | |
| sumbit_on_select=True, | |
| ) | |
| m2m_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10) | |
| with gr.Group(): | |
| with gr.Row(): | |
| m2m_user = gr.Textbox(label="hf_user", placeholder="Your HF username", value=HF_USER) | |
| m2m_prefix = gr.Textbox(label="repo_prefix", value=HF_REPO_PREFIX) | |
| m2m_suffix = gr.Textbox(label="repo_suffix", value=HF_REPO_SUFFIX) | |
| m2m_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model") | |
| with gr.Row(): | |
| m2m_is_private = gr.Checkbox(label="Make new repo private?", value=True) | |
| m2m_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=False) | |
| m2m_is_remtag = gr.Checkbox(label="Remove NFAA tag?", value=True) | |
| with gr.Row(): | |
| m2m_submit_button = gr.Button("Submit", variant="primary") | |
| m2m_clear_button = gr.Button("Clear", variant="secondary") | |
| with gr.Column(): | |
| m2m_output_md = gr.Markdown(label="output") | |
| m2m_output_image = gr.Image(show_label=False) | |
| demo.load(fn=swap_visibilty, outputs=main_ui) | |
| submit_button.click(duplicate, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], [output_md, output_image]) | |
| clear_button.click(lambda: ("", HF_REPO, "model", True, True, True, True), None, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], queue=False) | |
| m2o_search.submit(add_repo_text, [m2o_search, m2o_source_repos], [m2o_source_repos], queue=False) | |
| m2o_submit_button.click(duplicate_m2o, [m2o_source_repos, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite], [m2o_output_md, m2o_output_image]) | |
| m2o_clear_button.click(lambda: ("", HF_REPO, "model", True, True, ""), None, | |
| [m2o_search, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite, m2o_source_repos], queue=False) | |
| m2m_search.submit(add_repo_text, [m2m_search, m2m_source_repos], [m2m_source_repos], queue=False) | |
| m2m_submit_button.click(duplicate_m2m, [m2m_source_repos, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_prefix, m2m_suffix], | |
| [m2m_output_md, m2m_output_image]) | |
| m2m_clear_button.click(lambda: ("", HF_USER, "model", True, False, True, "", HF_REPO_PREFIX, HF_REPO_SUFFIX), None, | |
| [m2m_search, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_source_repos, m2m_prefix, m2m_suffix], queue=False) | |
| demo.queue() | |
| demo.launch() |