Spaces:
Paused
Paused
| import requests | |
| from bs4 import BeautifulSoup | |
| import os | |
| import json | |
| import gradio as gr | |
| from datasets import Dataset | |
| from PIL import Image | |
| from huggingface_hub import HfApi, HfFolder, Repository, create_repo | |
| import io | |
| import uuid | |
| import time | |
| import random | |
| import zipfile | |
| import csv | |
| DATA_DIR = "/data" | |
| IMAGES_DIR = os.path.join(DATA_DIR, "images") | |
| USER_AGENTS = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0" | |
| ] | |
| def get_headers(cookies=None): | |
| headers = { | |
| "User-Agent": random.choice(USER_AGENTS), | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.5", | |
| "Referer": "https://www.google.com/", | |
| "DNT": "1", | |
| "Connection": "keep-alive", | |
| "Upgrade-Insecure-Requests": "1" | |
| } | |
| if cookies: | |
| headers["Cookie"] = cookies | |
| return headers | |
| def make_request(url, cookies=None): | |
| time.sleep(random.uniform(1, 3)) # Add a random delay between requests | |
| return requests.get(url, headers=get_headers(cookies), timeout=10) | |
| def extract_image_url(html_content): | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| script = soup.find('script', type='text/javascript', string=lambda text: 'image =' in text if text else False) | |
| if script: | |
| try: | |
| js_object_str = script.string.split('=', 1)[1].strip().rstrip(';') | |
| js_object_str = js_object_str.replace("'", '"') | |
| image_data = json.loads(js_object_str) | |
| return f"{image_data['domain']}{image_data['base_dir']}/{image_data['dir']}/{image_data['img']}" | |
| except json.JSONDecodeError as e: | |
| raise Exception(f"Failed to decode JSON: {str(e)}") | |
| img_tag = soup.find('img', alt=True) | |
| if img_tag and 'src' in img_tag.attrs: | |
| return img_tag['src'] | |
| return None | |
| def extract_tags(html_content): | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| tag_elements = soup.find_all('li', class_='tag-type-general') | |
| tags = [tag_element.find_all('a')[1].text for tag_element in tag_elements if len(tag_element.find_all('a')) > 1] | |
| return ','.join(tags) | |
| def download_image(url, cookies=None): | |
| try: | |
| response = make_request(url, cookies) | |
| response.raise_for_status() | |
| return Image.open(io.BytesIO(response.content)) | |
| except requests.RequestException as e: | |
| raise Exception(f"Failed to download image: {str(e)}") | |
| class DatasetBuilder: | |
| def __init__(self, dataset_name): | |
| self.dataset_name = dataset_name | |
| self.dataset = self.load_dataset() | |
| os.makedirs(IMAGES_DIR, exist_ok=True) | |
| self.hf_token = os.getenv("HF_Token") # Access the token from the environment variable | |
| def get_dataset_file(self): | |
| return os.path.join(DATA_DIR, f"{self.dataset_name}.json") | |
| def load_dataset(self): | |
| dataset_file = self.get_dataset_file() | |
| if os.path.exists(dataset_file): | |
| with open(dataset_file, 'r') as f: | |
| return json.load(f) | |
| return [] | |
| def save_dataset(self): | |
| dataset_file = self.get_dataset_file() | |
| with open(dataset_file, 'w') as f: | |
| json.dump(self.dataset, f) | |
| def resize_images(self, min_size=512, max_size=768): | |
| for item in self.dataset: | |
| image_path = os.path.join(IMAGES_DIR, item['image']) | |
| image = Image.open(image_path) | |
| # Resize the image while maintaining the aspect ratio | |
| image.thumbnail((max_size, max_size), resample=Image.BICUBIC) | |
| # Save the resized image | |
| image.save(image_path) | |
| def resize_dataset(self): | |
| resized_dataset_name = f"{self.dataset_name} (resized)" | |
| resized_dataset_builder = DatasetBuilder(resized_dataset_name) | |
| resized_dataset_builder.dataset = self.dataset | |
| resized_dataset_builder.resize_images() | |
| resized_dataset_builder.save_dataset() | |
| return f"Resized dataset '{self.dataset_name}' to '{resized_dataset_name}'." | |
| def create_downloadable_dataset(self): | |
| if not self.dataset: | |
| return None, "Dataset is empty. Add some images first." | |
| try: | |
| # Create a temporary ZIP file | |
| zip_filename = f"{self.dataset_name}.zip" | |
| zip_path = os.path.join(DATA_DIR, zip_filename) | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| # Add the dataset CSV file | |
| dataset_file = f"{self.dataset_name}.csv" | |
| dataset_file_path = os.path.join(DATA_DIR, dataset_file) | |
| with open(dataset_file_path, 'w', newline='') as csvfile: | |
| writer = csv.writer(csvfile) | |
| writer.writerow(['image', 'tags']) | |
| for item in self.dataset: | |
| writer.writerow([item['image'], item['tags']]) | |
| zipf.write(dataset_file_path, os.path.basename(dataset_file_path)) | |
| # Add all images | |
| for item in self.dataset: | |
| image_path = os.path.join(IMAGES_DIR, item['image']) | |
| zipf.write(image_path, os.path.join("images", item['image'])) | |
| return zip_path, f"Dataset '{self.dataset_name}' ready for download." | |
| except Exception as e: | |
| return None, f"Error creating downloadable dataset: {str(e)}" | |
| def add_image(self, url, cookies=None): | |
| try: | |
| response = make_request(url, cookies) | |
| response.raise_for_status() | |
| html_content = response.text | |
| image_url = extract_image_url(html_content) | |
| if not image_url: | |
| raise Exception("Failed to extract image URL") | |
| tags = extract_tags(html_content) | |
| image = download_image(image_url, cookies) | |
| filename = f"{uuid.uuid4()}.jpg" | |
| filepath = os.path.join(IMAGES_DIR, filename) | |
| image.save(filepath) | |
| self.dataset.append({ | |
| 'image': filename, | |
| 'text': tags | |
| }) | |
| self.save_dataset() | |
| return f"Added image with tags: {tags}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def build_huggingface_dataset(self): | |
| if not self.dataset: | |
| return "Dataset is empty. Add some images first." | |
| try: | |
| hf_dataset = Dataset.from_dict({ | |
| 'image': [os.path.join(IMAGES_DIR, item['image']) for item in self.dataset], | |
| 'text': [item['tags'] for item in self.dataset] | |
| }) | |
| return "HuggingFace Dataset created successfully!" | |
| except Exception as e: | |
| return f"Error creating HuggingFace Dataset: {str(e)}" | |
| def get_dataset_info(self): | |
| return f"Current dataset size ({self.dataset_name}): {len(self.dataset)} images" | |
| def get_dataset_preview(self, num_images=5): | |
| preview = [] | |
| for item in self.dataset[-num_images:]: | |
| image_path = os.path.join(IMAGES_DIR, item['image']) | |
| preview.append((image_path, item['tags'])) | |
| return preview | |
| def upload_to_huggingface(self, private=True): | |
| if not self.dataset: | |
| return "Dataset is empty. Add some images first." | |
| if not self.hf_token: | |
| return "Error: Hugging Face Token not found. Please make sure the token is correctly set as an environment variable." | |
| try: | |
| hf_api = HfApi(token=self.hf_token) # Use the token | |
| hf_user = hf_api.whoami()["name"] | |
| repo_id = f"{hf_user}/{self.dataset_name}" | |
| # Create or update the repository | |
| repo_url = create_repo(repo_id, token=self.hf_token, private=private, exist_ok=True) | |
| # Save the dataset locally as a JSON file | |
| dataset_file = self.get_dataset_file() | |
| self.save_dataset() | |
| # Initialize a local repository | |
| repo = Repository(local_dir=DATA_DIR, clone_from=repo_id, use_auth_token=self.hf_token) | |
| # Copy dataset files to the repository directory | |
| repo.git_pull(lfs=True) # Pull the latest changes | |
| os.makedirs(os.path.join(DATA_DIR, "images"), exist_ok=True) | |
| for item in self.dataset: | |
| src_image_path = os.path.join(IMAGES_DIR, item['image']) | |
| dst_image_path = os.path.join(repo.local_dir, "images", item['image']) | |
| if not os.path.exists(dst_image_path): | |
| os.makedirs(os.path.dirname(dst_image_path), exist_ok=True) | |
| os.system(f"cp {src_image_path} {dst_image_path}") | |
| # Add files to the repository and push | |
| repo.git_add(pattern=".") | |
| repo.git_commit("Add dataset and images") | |
| repo.git_push() | |
| return f"Dataset '{self.dataset_name}' successfully uploaded to Hugging Face Hub as a {'private' if private else 'public'} repository." | |
| except Exception as e: | |
| return f"Error uploading dataset to Hugging Face: {str(e)}" | |
| def add_image_to_dataset(url, cookies, dataset_name): | |
| builder = DatasetBuilder(dataset_name) | |
| result = builder.add_image(url, cookies) | |
| return result, builder.get_dataset_info(), builder.get_dataset_preview() | |
| def create_huggingface_dataset(dataset_name): | |
| builder = DatasetBuilder(dataset_name) | |
| return builder.build_huggingface_dataset() | |
| def view_dataset(dataset_name): | |
| builder = DatasetBuilder(dataset_name) | |
| return builder.get_dataset_preview(num_images=60) | |
| def upload_huggingface_dataset(dataset_name, privacy): | |
| builder = DatasetBuilder(dataset_name) | |
| return builder.upload_to_huggingface(private=privacy) | |
| def download_dataset(dataset_name): | |
| builder = DatasetBuilder(dataset_name) | |
| zip_path, message = builder.create_downloadable_dataset() | |
| return zip_path, message | |
| def resize_dataset(dataset_name): | |
| builder = DatasetBuilder(dataset_name) | |
| return builder.resize_dataset() | |
| def download_resized_dataset(dataset_name): | |
| builder = DatasetBuilder(f"{dataset_name} (resized)") | |
| zip_path, message = builder.create_downloadable_dataset() | |
| return zip_path, message | |
| # Create Gradio interface | |
| with gr.Blocks(theme="huggingface") as iface: | |
| gr.Markdown("# Image Dataset Builder") | |
| gr.Markdown("Enter a URL to add an image and its tags to the dataset. Progress is saved automatically.") | |
| with gr.Row(): | |
| dataset_name_input = gr.Textbox(lines=1, label="Dataset Name", placeholder="Enter dataset name...", value="default_dataset") | |
| url_input = gr.Textbox(lines=2, label="URL", placeholder="Enter image URL here...") | |
| cookies_input = gr.Textbox(lines=2, label="Cookies (optional)", placeholder="Enter cookies") | |
| add_button = gr.Button("Add Image") | |
| result_output = gr.Textbox(label="Result") | |
| dataset_info = gr.Textbox(label="Dataset Info") | |
| gr.Markdown("## Dataset Preview") | |
| preview_gallery = gr.Gallery(label="Recent Additions", show_label=False, elem_id="preview_gallery", columns=5, rows=1, height="auto") | |
| add_button.click(add_image_to_dataset, inputs=[url_input, cookies_input, dataset_name_input], outputs=[result_output, dataset_info, preview_gallery]) | |
| create_hf_button = gr.Button("Create HuggingFace Dataset") | |
| hf_result = gr.Textbox(label="Dataset Creation Result") | |
| create_hf_button.click(create_huggingface_dataset, inputs=[dataset_name_input], outputs=hf_result) | |
| view_dataset_button = gr.Button("View Dataset") | |
| dataset_gallery = gr.Gallery(label="Dataset Contents", show_label=False, elem_id="dataset_gallery", columns=5, rows=4, height="auto") | |
| view_dataset_button.click(view_dataset, inputs=[dataset_name_input], outputs=dataset_gallery) | |
| gr.Markdown("## Upload Dataset to Hugging Face") | |
| privacy_radio = gr.Radio(choices=["private", "public"], value="private", label="Repository Privacy") | |
| upload_hf_button = gr.Button("Upload to Hugging Face") | |
| hf_upload_result = gr.Textbox(label="Upload Result") | |
| upload_hf_button.click(upload_huggingface_dataset, inputs=[dataset_name_input, privacy_radio], outputs=hf_upload_result) | |
| gr.Markdown("## Download Dataset") | |
| download_button = gr.Button("Download Dataset") | |
| download_output = gr.File(label="Download") | |
| download_message = gr.Textbox(label="Download Status") | |
| download_button.click( | |
| download_dataset, | |
| inputs=[dataset_name_input], | |
| outputs=[download_output, download_message] | |
| ) | |
| gr.Markdown("## Resize Dataset") | |
| resize_button = gr.Button("Resize Dataset") | |
| resize_result = gr.Textbox(label="Resize Result") | |
| resize_button.click( | |
| resize_dataset, | |
| inputs=[dataset_name_input], | |
| outputs=resize_result | |
| ) | |
| gr.Markdown("## Download Resized Dataset") | |
| download_resized_button = gr.Button("Download Resized Dataset") | |
| download_resized_output = gr.File(label="Download Resized") | |
| download_resized_message = gr.Textbox(label="Resized Download Status") | |
| download_resized_button.click( | |
| download_resized_dataset, | |
| inputs=[dataset_name_input], | |
| outputs=[download_resized_output, download_resized_message] | |
| ) | |
| # Launch the interface | |
| iface.launch() |