Spaces:
Sleeping
Sleeping
| import sys | |
| import os | |
| import tempfile | |
| from typing import List, Tuple, Optional, Dict | |
| from urllib.parse import urlparse | |
| import importlib | |
| import traceback | |
| # Base required imports | |
| import gradio as gr | |
| # Dictionary of required packages for each file type | |
| REQUIRED_PACKAGES = { | |
| 'url': ['langchain_community', 'requests', 'bs4'], | |
| 'pdf': ['langchain_community', 'pypdf'], | |
| 'docx': ['langchain_community', 'unstructured'] | |
| } | |
| def check_and_import_packages(file_type: str) -> Tuple[bool, str, Optional[Exception]]: | |
| """ | |
| Check and import required packages for a specific file type. | |
| Returns (success, error_message, exception) | |
| """ | |
| if file_type not in REQUIRED_PACKAGES: | |
| return True, "", None | |
| missing_packages = [] | |
| for package in REQUIRED_PACKAGES[file_type]: | |
| if not importlib.util.find_spec(package): | |
| missing_packages.append(package) | |
| if missing_packages: | |
| error_msg = (f"ERROR: Missing required packages for {file_type} processing:\n" | |
| f" - Missing: {', '.join(missing_packages)}\n" | |
| f" - Install with: pip install {' '.join(missing_packages)}") | |
| return False, error_msg, None | |
| try: | |
| if file_type == 'url': | |
| from langchain_community.document_loaders import WebBaseLoader | |
| elif file_type == 'pdf': | |
| from langchain_community.document_loaders import PyPDFLoader | |
| elif file_type == 'docx': | |
| from langchain_community.document_loaders import UnstructuredWordDocumentLoader | |
| return True, "", None | |
| except Exception as e: | |
| return False, f"ERROR: Failed to import required modules for {file_type}:\n {str(e)}", e | |
| def count_characters(text: str) -> Dict[str, int]: | |
| """Count characters in text.""" | |
| if not text: | |
| return { | |
| 'total': 0, | |
| 'excluding_spaces': 0, | |
| 'japanese': 0 | |
| } | |
| return { | |
| 'total': len(text), | |
| 'excluding_spaces': len(text.replace(' ', '').replace('\n', '').replace('\t', '')), | |
| 'japanese': len([c for c in text if '\u4e00' <= c <= '\u9fff' or '\u3040' <= c <= '\u309f' or '\u30a0' <= c <= '\u30ff']) | |
| } | |
| def format_char_count(counts: Dict[str, int]) -> str: | |
| """Format character count information.""" | |
| return (f"文字数(スペース・改行含む): {counts['total']}\n" | |
| f"文字数(スペース・改行除く): {counts['excluding_spaces']}\n" | |
| f"日本語文字数: {counts['japanese']}") | |
| def process_raw_text(text: str) -> Tuple[str, List[str]]: | |
| """Process raw text input.""" | |
| errors = [] | |
| if not text or not text.strip(): | |
| return "", errors | |
| try: | |
| return f"\n=== Raw Text Input ===\n{text.strip()}\n", errors | |
| except Exception as e: | |
| errors.append(f"ERROR: Failed to process raw text input:\n {str(e)}") | |
| return "", errors | |
| def is_valid_url(url: str) -> bool: | |
| """Validate if the given string is a valid URL.""" | |
| try: | |
| result = urlparse(url) | |
| return all([result.scheme, result.netloc]) | |
| except: | |
| return False | |
| def process_urls(urls: str) -> Tuple[str, List[str]]: | |
| """Extract text from URLs using WebBaseLoader.""" | |
| errors = [] | |
| if not urls.strip(): | |
| return "", errors | |
| success, error_msg, _ = check_and_import_packages('url') | |
| if not success: | |
| errors.append(error_msg) | |
| return "", errors | |
| from langchain_community.document_loaders import WebBaseLoader | |
| combined_text = "" | |
| url_list = [url.strip() for url in urls.split('\n') if url.strip()] | |
| for url in url_list: | |
| if not is_valid_url(url): | |
| errors.append(f"ERROR: Invalid URL format: {url}") | |
| continue | |
| try: | |
| loader = WebBaseLoader(url) | |
| documents = loader.load() | |
| combined_text += f"\n=== Content from URL: {url} ===\n" | |
| for doc in documents: | |
| combined_text += doc.page_content + "\n" | |
| except Exception as e: | |
| errors.append(f"ERROR: Failed to process URL {url}:\n {str(e)}\n {traceback.format_exc()}") | |
| return combined_text, errors | |
| def process_txt(txt_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]: | |
| """Process text files directly.""" | |
| errors = [] | |
| try: | |
| with open(txt_file.name, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| return f"\n=== Content from TXT: {os.path.basename(txt_file.name)} ===\n{content}\n", errors | |
| except UnicodeDecodeError: | |
| try: | |
| # Try with different encoding if UTF-8 fails | |
| with open(txt_file.name, 'r', encoding='shift-jis') as f: | |
| content = f.read() | |
| return f"\n=== Content from TXT: {os.path.basename(txt_file.name)} ===\n{content}\n", errors | |
| except Exception as e: | |
| errors.append(f"ERROR: Failed to process text file {os.path.basename(txt_file.name)} (encoding error):\n {str(e)}\n {traceback.format_exc()}") | |
| return "", errors | |
| except Exception as e: | |
| errors.append(f"ERROR: Failed to process text file {os.path.basename(txt_file.name)}:\n {str(e)}\n {traceback.format_exc()}") | |
| return "", errors | |
| def process_pdf(pdf_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]: | |
| """Extract text from a PDF file using PyPDFLoader.""" | |
| errors = [] | |
| success, error_msg, _ = check_and_import_packages('pdf') | |
| if not success: | |
| errors.append(error_msg) | |
| return "", errors | |
| from langchain_community.document_loaders import PyPDFLoader | |
| try: | |
| loader = PyPDFLoader(pdf_file.name) | |
| documents = loader.load() | |
| content = "" | |
| for doc in documents: | |
| content += doc.page_content + "\n" | |
| return f"\n=== Content from PDF: {os.path.basename(pdf_file.name)} ===\n{content}", errors | |
| except Exception as e: | |
| errors.append(f"ERROR: Failed to process PDF {os.path.basename(pdf_file.name)}:\n {str(e)}\n {traceback.format_exc()}") | |
| return "", errors | |
| def process_docx(docx_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]: | |
| """Extract text from a DOCX file using UnstructuredWordDocumentLoader.""" | |
| errors = [] | |
| success, error_msg, _ = check_and_import_packages('docx') | |
| if not success: | |
| errors.append(error_msg) | |
| return "", errors | |
| from langchain_community.document_loaders import UnstructuredWordDocumentLoader | |
| try: | |
| loader = UnstructuredWordDocumentLoader(docx_file.name) | |
| documents = loader.load() | |
| content = "" | |
| for doc in documents: | |
| content += doc.page_content + "\n" | |
| return f"\n=== Content from DOCX: {os.path.basename(docx_file.name)} ===\n{content}", errors | |
| except Exception as e: | |
| errors.append(f"ERROR: Failed to process DOCX {os.path.basename(docx_file.name)}:\n {str(e)}\n {traceback.format_exc()}") | |
| return "", errors | |
| def process_file(file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]: | |
| """Process a file based on its extension.""" | |
| errors = [] | |
| if not file: | |
| return "", errors | |
| file_ext = os.path.splitext(file.name)[1].lower() | |
| # Process based on file extension | |
| if file_ext == '.txt': | |
| return process_txt(file) | |
| elif file_ext == '.pdf': | |
| return process_pdf(file) | |
| elif file_ext in ['.doc', '.docx']: | |
| return process_docx(file) | |
| else: | |
| return "", [f"ERROR: Unsupported file type: {file_ext}"] | |
| def combine_content(raw_text: str, url_input: str, files: List[tempfile._TemporaryFileWrapper]) -> Tuple[str, str, str, str]: | |
| """Combine content from all sources into a single text file.""" | |
| combined_text = "" | |
| all_errors = [] | |
| # Process raw text if provided | |
| if raw_text: | |
| text_content, text_errors = process_raw_text(raw_text) | |
| combined_text += text_content | |
| all_errors.extend(text_errors) | |
| # Process URLs if provided | |
| if url_input: | |
| url_text, url_errors = process_urls(url_input) | |
| combined_text += url_text | |
| all_errors.extend(url_errors) | |
| # Process each uploaded file | |
| if files: | |
| for file in files: | |
| file_text, file_errors = process_file(file) | |
| combined_text += file_text | |
| all_errors.extend(file_errors) | |
| # Calculate character count | |
| char_counts = count_characters(combined_text) | |
| char_count_text = format_char_count(char_counts) | |
| if not combined_text.strip(): | |
| if all_errors: | |
| combined_text = "No content was extracted due to errors. Please check the error messages below." | |
| else: | |
| combined_text = "No content was extracted. Please provide some input (text, URLs, or files)." | |
| # Format error messages | |
| error_text = "\n".join(all_errors) if all_errors else "処理は正常に完了しました。" | |
| # Save to temporary file for download | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp_file: | |
| tmp_file.write(combined_text) | |
| output_path = tmp_file.name | |
| return combined_text, output_path, char_count_text, error_text | |
| # Create Gradio interface | |
| with gr.Blocks(title="Document Content Extractor") as demo: | |
| gr.Markdown("# Document Content Extractor") | |
| gr.Markdown("""テキスト、URL、各種ドキュメントからテキストを抽出・結合するツールです。 | |
| 対応ファイル形式: | |
| - テキストファイル (.txt) | |
| - PDFファイル (.pdf) - pypdfが必要 | |
| - Wordドキュメント (.doc, .docx) - unstructuredが必要 | |
| 必要なパッケージ: | |
| - URL処理用: langchain-community, requests, beautifulsoup4 | |
| - PDF処理用: langchain-community, pypdf | |
| - DOCX処理用: langchain-community, unstructured""") | |
| with gr.Row(): | |
| with gr.Column(): | |
| raw_text = gr.Textbox( | |
| label="テキスト入力", | |
| placeholder="直接テキストを入力できます...", | |
| lines=5 | |
| ) | |
| url_input = gr.Textbox( | |
| label="URL入力(1行に1つ)", | |
| placeholder="URLを入力してください...", | |
| lines=5 | |
| ) | |
| files = gr.File( | |
| label="ファイルアップロード", | |
| file_count="multiple", | |
| file_types=[".txt", ".pdf", ".doc", ".docx"] | |
| ) | |
| combine_btn = gr.Button("抽出・結合") | |
| with gr.Column(): | |
| error_output = gr.Textbox( | |
| label="エラー・警告", | |
| lines=3, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| char_count_output = gr.Textbox( | |
| label="文字数", | |
| lines=3, | |
| interactive=False | |
| ) | |
| text_output = gr.Textbox( | |
| label="抽出されたテキスト", | |
| lines=20, | |
| interactive=False, | |
| show_copy_button=True | |
| ) | |
| file_output = gr.File(label="結合テキストをダウンロード") | |
| combine_btn.click( | |
| fn=combine_content, | |
| inputs=[raw_text, url_input, files], | |
| outputs=[text_output, file_output, char_count_output, error_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |