import sys import os import tempfile from typing import List, Tuple, Optional, Dict from urllib.parse import urlparse import importlib import traceback # Base required imports import gradio as gr # Dictionary of required packages for each file type REQUIRED_PACKAGES = { 'url': ['langchain_community', 'requests', 'bs4'], 'pdf': ['langchain_community', 'pypdf'], 'docx': ['langchain_community', 'unstructured'] } def check_and_import_packages(file_type: str) -> Tuple[bool, str, Optional[Exception]]: """ Check and import required packages for a specific file type. Returns (success, error_message, exception) """ if file_type not in REQUIRED_PACKAGES: return True, "", None missing_packages = [] for package in REQUIRED_PACKAGES[file_type]: if not importlib.util.find_spec(package): missing_packages.append(package) if missing_packages: error_msg = (f"ERROR: Missing required packages for {file_type} processing:\n" f" - Missing: {', '.join(missing_packages)}\n" f" - Install with: pip install {' '.join(missing_packages)}") return False, error_msg, None try: if file_type == 'url': from langchain_community.document_loaders import WebBaseLoader elif file_type == 'pdf': from langchain_community.document_loaders import PyPDFLoader elif file_type == 'docx': from langchain_community.document_loaders import UnstructuredWordDocumentLoader return True, "", None except Exception as e: return False, f"ERROR: Failed to import required modules for {file_type}:\n {str(e)}", e def count_characters(text: str) -> Dict[str, int]: """Count characters in text.""" if not text: return { 'total': 0, 'excluding_spaces': 0, 'japanese': 0 } return { 'total': len(text), 'excluding_spaces': len(text.replace(' ', '').replace('\n', '').replace('\t', '')), 'japanese': len([c for c in text if '\u4e00' <= c <= '\u9fff' or '\u3040' <= c <= '\u309f' or '\u30a0' <= c <= '\u30ff']) } def format_char_count(counts: Dict[str, int]) -> str: """Format character count information.""" return (f"文字数(スペース・改行含む): {counts['total']}\n" f"文字数(スペース・改行除く): {counts['excluding_spaces']}\n" f"日本語文字数: {counts['japanese']}") def process_raw_text(text: str) -> Tuple[str, List[str]]: """Process raw text input.""" errors = [] if not text or not text.strip(): return "", errors try: return f"\n=== Raw Text Input ===\n{text.strip()}\n", errors except Exception as e: errors.append(f"ERROR: Failed to process raw text input:\n {str(e)}") return "", errors def is_valid_url(url: str) -> bool: """Validate if the given string is a valid URL.""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except: return False def process_urls(urls: str) -> Tuple[str, List[str]]: """Extract text from URLs using WebBaseLoader.""" errors = [] if not urls.strip(): return "", errors success, error_msg, _ = check_and_import_packages('url') if not success: errors.append(error_msg) return "", errors from langchain_community.document_loaders import WebBaseLoader combined_text = "" url_list = [url.strip() for url in urls.split('\n') if url.strip()] for url in url_list: if not is_valid_url(url): errors.append(f"ERROR: Invalid URL format: {url}") continue try: loader = WebBaseLoader(url) documents = loader.load() combined_text += f"\n=== Content from URL: {url} ===\n" for doc in documents: combined_text += doc.page_content + "\n" except Exception as e: errors.append(f"ERROR: Failed to process URL {url}:\n {str(e)}\n {traceback.format_exc()}") return combined_text, errors def process_txt(txt_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]: """Process text files directly.""" errors = [] try: with open(txt_file.name, 'r', encoding='utf-8') as f: content = f.read() return f"\n=== Content from TXT: {os.path.basename(txt_file.name)} ===\n{content}\n", errors except UnicodeDecodeError: try: # Try with different encoding if UTF-8 fails with open(txt_file.name, 'r', encoding='shift-jis') as f: content = f.read() return f"\n=== Content from TXT: {os.path.basename(txt_file.name)} ===\n{content}\n", errors except Exception as e: errors.append(f"ERROR: Failed to process text file {os.path.basename(txt_file.name)} (encoding error):\n {str(e)}\n {traceback.format_exc()}") return "", errors except Exception as e: errors.append(f"ERROR: Failed to process text file {os.path.basename(txt_file.name)}:\n {str(e)}\n {traceback.format_exc()}") return "", errors def process_pdf(pdf_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]: """Extract text from a PDF file using PyPDFLoader.""" errors = [] success, error_msg, _ = check_and_import_packages('pdf') if not success: errors.append(error_msg) return "", errors from langchain_community.document_loaders import PyPDFLoader try: loader = PyPDFLoader(pdf_file.name) documents = loader.load() content = "" for doc in documents: content += doc.page_content + "\n" return f"\n=== Content from PDF: {os.path.basename(pdf_file.name)} ===\n{content}", errors except Exception as e: errors.append(f"ERROR: Failed to process PDF {os.path.basename(pdf_file.name)}:\n {str(e)}\n {traceback.format_exc()}") return "", errors def process_docx(docx_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]: """Extract text from a DOCX file using UnstructuredWordDocumentLoader.""" errors = [] success, error_msg, _ = check_and_import_packages('docx') if not success: errors.append(error_msg) return "", errors from langchain_community.document_loaders import UnstructuredWordDocumentLoader try: loader = UnstructuredWordDocumentLoader(docx_file.name) documents = loader.load() content = "" for doc in documents: content += doc.page_content + "\n" return f"\n=== Content from DOCX: {os.path.basename(docx_file.name)} ===\n{content}", errors except Exception as e: errors.append(f"ERROR: Failed to process DOCX {os.path.basename(docx_file.name)}:\n {str(e)}\n {traceback.format_exc()}") return "", errors def process_file(file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]: """Process a file based on its extension.""" errors = [] if not file: return "", errors file_ext = os.path.splitext(file.name)[1].lower() # Process based on file extension if file_ext == '.txt': return process_txt(file) elif file_ext == '.pdf': return process_pdf(file) elif file_ext in ['.doc', '.docx']: return process_docx(file) else: return "", [f"ERROR: Unsupported file type: {file_ext}"] def combine_content(raw_text: str, url_input: str, files: List[tempfile._TemporaryFileWrapper]) -> Tuple[str, str, str, str]: """Combine content from all sources into a single text file.""" combined_text = "" all_errors = [] # Process raw text if provided if raw_text: text_content, text_errors = process_raw_text(raw_text) combined_text += text_content all_errors.extend(text_errors) # Process URLs if provided if url_input: url_text, url_errors = process_urls(url_input) combined_text += url_text all_errors.extend(url_errors) # Process each uploaded file if files: for file in files: file_text, file_errors = process_file(file) combined_text += file_text all_errors.extend(file_errors) # Calculate character count char_counts = count_characters(combined_text) char_count_text = format_char_count(char_counts) if not combined_text.strip(): if all_errors: combined_text = "No content was extracted due to errors. Please check the error messages below." else: combined_text = "No content was extracted. Please provide some input (text, URLs, or files)." # Format error messages error_text = "\n".join(all_errors) if all_errors else "処理は正常に完了しました。" # Save to temporary file for download with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp_file: tmp_file.write(combined_text) output_path = tmp_file.name return combined_text, output_path, char_count_text, error_text # Create Gradio interface with gr.Blocks(title="Document Content Extractor") as demo: gr.Markdown("# Document Content Extractor") gr.Markdown("""テキスト、URL、各種ドキュメントからテキストを抽出・結合するツールです。 対応ファイル形式: - テキストファイル (.txt) - PDFファイル (.pdf) - pypdfが必要 - Wordドキュメント (.doc, .docx) - unstructuredが必要 必要なパッケージ: - URL処理用: langchain-community, requests, beautifulsoup4 - PDF処理用: langchain-community, pypdf - DOCX処理用: langchain-community, unstructured""") with gr.Row(): with gr.Column(): raw_text = gr.Textbox( label="テキスト入力", placeholder="直接テキストを入力できます...", lines=5 ) url_input = gr.Textbox( label="URL入力(1行に1つ)", placeholder="URLを入力してください...", lines=5 ) files = gr.File( label="ファイルアップロード", file_count="multiple", file_types=[".txt", ".pdf", ".doc", ".docx"] ) combine_btn = gr.Button("抽出・結合") with gr.Column(): error_output = gr.Textbox( label="エラー・警告", lines=3, interactive=False, show_copy_button=True ) char_count_output = gr.Textbox( label="文字数", lines=3, interactive=False ) text_output = gr.Textbox( label="抽出されたテキスト", lines=20, interactive=False, show_copy_button=True ) file_output = gr.File(label="結合テキストをダウンロード") combine_btn.click( fn=combine_content, inputs=[raw_text, url_input, files], outputs=[text_output, file_output, char_count_output, error_output] ) if __name__ == "__main__": demo.launch()