Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os | |
| os.environ['HF_HOME'] = '/tmp/hf_cache' | |
| os.makedirs(os.environ['HF_HOME'], exist_ok=True) # Ensure the directory exists | |
| import gradio as gr | |
| import subprocess | |
| import os | |
| import re | |
| import tempfile | |
| import json | |
| import csv | |
| # Removed: from typing import Iterable # Added for Theme | |
| from rag_scraper.scraper import Scraper | |
| from rag_scraper.converter import Converter | |
| from rag_scraper.link_extractor import LinkExtractor, LinkType | |
| from rag_scraper.utils import URLUtils | |
| # Removed: from gradio.themes.base import Base # Added for Theme | |
| # Removed: from gradio.themes.utils import colors, fonts, sizes # Added for Theme | |
| import markdown_pdf # Added for PDF conversion | |
| # --- Custom Theme Definition --- (REMOVED Seafoam class and instance) | |
| def is_github_repo(url_or_id): | |
| """Check if the input is a GitHub repository URL or ID.""" | |
| if "github.com" in url_or_id: | |
| return True | |
| if re.match(r'^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$', url_or_id): | |
| return True | |
| return False | |
| def check_repomix_installed(): | |
| """Check if Repomix is installed.""" | |
| try: | |
| result = subprocess.run(["repomix", "--version"], | |
| capture_output=True, text=True, check=False) | |
| return result.returncode == 0 | |
| except Exception: | |
| return False | |
| def run_repomix(repo_url_or_id, progress=gr.Progress(track_tqdm=True)): | |
| """Run Repomix on the GitHub repository and return the content.""" | |
| progress(0, desc="Starting Repomix processing...") | |
| try: | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| output_file_name = "repomix-output.md" | |
| output_file_path = os.path.join(temp_dir, output_file_name) | |
| if '/' in repo_url_or_id and not repo_url_or_id.startswith('http'): | |
| repo_url = f"https://github.com/{repo_url_or_id}" | |
| else: | |
| repo_url = repo_url_or_id | |
| progress(0.2, desc=f"Running Repomix on {repo_url}...") | |
| cmd = [ | |
| "repomix", | |
| "--remote", repo_url, | |
| "--output", output_file_path, | |
| "--style", "markdown", | |
| "--compress" | |
| ] | |
| process = subprocess.run(cmd, capture_output=True, text=True, check=False, encoding='utf-8') | |
| progress(0.8, desc="Repomix command executed.") | |
| if process.returncode != 0: | |
| error_details = f"Return Code: {process.returncode}\nStderr: {process.stderr}\nStdout: {process.stdout}" | |
| return f"Error running Repomix:\n{error_details}", None | |
| if os.path.exists(output_file_path): | |
| with open(output_file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| progress(1, desc="Repomix output processed.") | |
| return content, output_file_path | |
| else: | |
| error_details = f"Return Code: {process.returncode}\nStderr: {process.stderr}\nStdout: {process.stdout}" | |
| return f"Error: Repomix did not generate an output file at '{output_file_path}'.\nRepomix Output:\n{error_details}", None | |
| except Exception as e: | |
| progress(1, desc="Error during Repomix processing.") | |
| return f"Error processing GitHub repository: {str(e)}", None | |
| def scrape_and_convert_website(url, depth, progress=gr.Progress(track_tqdm=True)): | |
| """Fetch HTML, extract links, convert to Markdown.""" | |
| progress(0, desc=f"Starting web scrape for {url}...") | |
| visited_urls = set() | |
| all_markdown_content = "" | |
| def recursive_scrape(current_url, current_depth, total_links_estimate=1, link_index=0): | |
| if current_url in visited_urls or current_depth < 0: | |
| return "" | |
| visited_urls.add(current_url) | |
| try: | |
| progress_val = link_index / total_links_estimate if total_links_estimate > 0 else 0 | |
| progress(progress_val, desc=f"Scraping: {current_url} (Depth: {depth - current_depth})") | |
| html_content = Scraper.fetch_html(current_url) | |
| except Exception as e: | |
| return f"Error fetching {current_url}: {str(e)}\n" | |
| markdown_content = f"## Extracted from: {current_url}\n\n" | |
| markdown_content += Converter.html_to_markdown( | |
| html=html_content, | |
| base_url=current_url, | |
| parser_features='html.parser', | |
| ignore_links=True | |
| ) | |
| page_content = markdown_content + "\n\n" | |
| if current_depth > 0: | |
| try: | |
| links = LinkExtractor.scrape_url(current_url, link_type=LinkType.INTERNAL) | |
| valid_links = [ | |
| link for link in links | |
| if URLUtils.is_internal(link, current_url) and link not in visited_urls | |
| ] | |
| num_links = len(valid_links) | |
| for i, link_url in enumerate(valid_links): | |
| page_content += recursive_scrape(link_url, current_depth - 1, num_links, i) | |
| except Exception as e: | |
| page_content += f"Error extracting links from {current_url}: {str(e)}\n" | |
| return page_content | |
| all_markdown_content = recursive_scrape(url, depth) | |
| progress(1, desc="Web scraping complete.") | |
| with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".md", encoding="utf-8") as tmp_file: | |
| tmp_file.write(all_markdown_content) | |
| return all_markdown_content, tmp_file.name | |
| def convert_to_json(markdown_content, source_url_or_id): | |
| data = {"source": source_url_or_id, "content": markdown_content} | |
| return json.dumps(data, indent=2) | |
| def convert_to_csv(markdown_content, source_url_or_id): | |
| output = tempfile.NamedTemporaryFile(mode='w+', delete=False, newline='', suffix=".csv", encoding="utf-8") | |
| writer = csv.writer(output) | |
| writer.writerow(["source", "content"]) | |
| writer.writerow([source_url_or_id, markdown_content]) | |
| output.close() | |
| return output.name | |
| def save_output_to_file(content, output_format, source_url_or_id): | |
| """Saves content to a temporary file based on format and returns its path.""" | |
| processed_content = content # Default for Markdown and Text | |
| if output_format == "JSON": | |
| suffix = ".json" | |
| processed_content = convert_to_json(content, source_url_or_id) | |
| elif output_format == "CSV": | |
| # convert_to_csv returns a path directly | |
| return convert_to_csv(content, source_url_or_id) | |
| elif output_format == "Text": | |
| suffix = ".txt" | |
| elif output_format == "PDF": | |
| suffix = ".pdf" | |
| # PDF conversion happens differently, creates file directly | |
| pdf_output_path = "" | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf_file: | |
| pdf_output_path = tmp_pdf_file.name | |
| md_pdf = markdown_pdf.MarkdownPdf(toc_level=2) | |
| md_pdf.convert_from_string(content, pdf_output_path) | |
| return pdf_output_path | |
| except Exception as e: | |
| print(f"PDF conversion failed: {e}. Saving as Markdown instead.") | |
| suffix = ".pdf.md" | |
| # No processed_content change needed, it's already markdown | |
| else: # Default to Markdown | |
| suffix = ".md" | |
| with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=suffix, encoding="utf-8") as tmp_file: | |
| tmp_file.write(processed_content) | |
| return tmp_file.name | |
| def process_input_updated(url_or_id, source_type, depth, output_format_selection, progress=gr.Progress(track_tqdm=True)): | |
| progress(0, desc="Initializing...") | |
| raw_content = "" | |
| error_message = "" | |
| output_file_path = None | |
| if source_type == "GitHub Repository": | |
| if not check_repomix_installed(): | |
| error_message = "Repomix is not installed or not accessible. Please ensure it's installed globally." | |
| return error_message, None, None | |
| raw_content, _ = run_repomix(url_or_id, progress=progress) | |
| if "Error" in raw_content: | |
| error_message = raw_content | |
| raw_content = "" | |
| elif source_type == "Webpage": | |
| raw_content, _ = scrape_and_convert_website(url_or_id, depth, progress=progress) | |
| if "Error" in raw_content: | |
| error_message = raw_content | |
| raw_content = "" | |
| else: | |
| error_message = "Invalid source type selected." | |
| return error_message, None, None | |
| if error_message: | |
| return error_message, None, None | |
| try: | |
| progress(0.9, desc=f"Converting to {output_format_selection}...") | |
| output_file_path = save_output_to_file(raw_content, output_format_selection, url_or_id) | |
| preview_content = raw_content | |
| if output_format_selection == "JSON": | |
| preview_content = convert_to_json(raw_content, url_or_id) | |
| elif output_format_selection == "CSV" and output_file_path: | |
| try: | |
| with open(output_file_path, 'r', encoding='utf-8') as f_csv: | |
| csv_preview_lines = [next(f_csv) for _ in range(5)] | |
| preview_content = "".join(csv_preview_lines) | |
| if not preview_content: preview_content = "[CSV content is empty or very short]" | |
| except StopIteration: | |
| with open(output_file_path, 'r', encoding='utf-8') as f_csv: | |
| preview_content = f_csv.read() | |
| if not preview_content: preview_content = "[CSV content is empty]" | |
| except Exception as e_csv_preview: | |
| preview_content = f"[Error reading CSV for preview: {str(e_csv_preview)}]" | |
| elif output_format_selection == "CSV" and not output_file_path: | |
| preview_content = "[CSV file path not available for preview]" | |
| elif output_format_selection == "PDF": | |
| preview_content = f"[PDF generated. Download to view: {os.path.basename(output_file_path if output_file_path else 'file.pdf')}]" | |
| if "Saving as Markdown instead" in (output_file_path or ""): | |
| preview_content = raw_content + f"\n\n[Note: PDF conversion failed, showing Markdown. File saved as .pdf.md]" | |
| progress(1, desc="Processing complete.") | |
| return f"Successfully processed: {url_or_id}", preview_content, output_file_path | |
| except Exception as e: | |
| return f"Error during file conversion/saving: {str(e)}", raw_content, None | |
| with gr.Blocks(title="RAG-Ready Content Scraper", theme="CultriX/gradio-theme") as iface: | |
| gr.Markdown("# RAG-Ready Content Scraper") | |
| gr.Markdown( | |
| "Scrape webpage content or GitHub repositories to generate RAG-ready datasets." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| url_input = gr.Textbox( | |
| label="Enter URL or GitHub Repository ID", | |
| placeholder="e.g., https://example.com OR username/repo" | |
| ) | |
| source_type_input = gr.Radio( | |
| choices=["Webpage", "GitHub Repository"], | |
| value="Webpage", | |
| label="Select Source Type" | |
| ) | |
| depth_input = gr.Slider( | |
| minimum=0, maximum=3, step=1, value=0, | |
| label="Scraping Depth (for Webpages)", | |
| info="0: Only main page. Ignored for GitHub repos." | |
| ) | |
| output_format_input = gr.Dropdown( | |
| choices=["Markdown", "JSON", "CSV", "Text", "PDF"], | |
| value="Markdown", | |
| label="Select Output Format" | |
| ) | |
| submit_button = gr.Button("Process Content", variant="primary") | |
| with gr.Column(scale=3): | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| preview_output = gr.Code(label="Preview Content", language="markdown", interactive=False) | |
| file_download_output = gr.File(label="Download Processed File", interactive=False) | |
| gr.Examples( | |
| examples=[ | |
| ["https://gradio.app/docs/js", "Webpage", 1, "Markdown"], | |
| ["gradio-app/gradio", "GitHub Repository", 0, "Text"], | |
| ["https://en.wikipedia.org/wiki/Retrieval-augmented_generation", "Webpage", 0, "JSON"], | |
| ], | |
| inputs=[url_input, source_type_input, depth_input, output_format_input], | |
| outputs=[status_output, preview_output, file_download_output], | |
| fn=process_input_updated, | |
| cache_examples=False | |
| ) | |
| with gr.Accordion("How it Works & More Info", open=False): | |
| gr.Markdown( | |
| """ | |
| **Webpage Scraping:** | |
| 1. Enter a full URL (e.g., `https://example.com`). | |
| 2. Select "Webpage" as the source type. | |
| 3. Set the desired scraping depth. | |
| 4. Choose your output format. | |
| **GitHub Repository Processing:** | |
| 1. Enter a GitHub repository URL or ID (e.g., `username/repo`). | |
| 2. Select "GitHub Repository". (Depth is ignored). | |
| 3. Choose your output format. Uses **RepoMix**. | |
| **Output Formats:** Markdown, JSON, CSV, Text, PDF. | |
| **Note:** PDF generation requires `markdown-pdf` library. | |
| This app is designed for Docker/HuggingFace Spaces. | |
| [View Source Code on HuggingFace Spaces](https://huggingface.co/spaces/CultriX/RAG-Scraper) | |
| """ | |
| ) | |
| submit_button.click( | |
| fn=process_input_updated, | |
| inputs=[url_input, source_type_input, depth_input, output_format_input], | |
| outputs=[status_output, preview_output, file_download_output], | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() | |