File size: 11,456 Bytes
c6a2c26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
import sys
import os
import tempfile
from typing import List, Tuple, Optional, Dict
from urllib.parse import urlparse
import importlib
import traceback

# Base required imports
import gradio as gr

# Dictionary of required packages for each file type
REQUIRED_PACKAGES = {
    'url': ['langchain_community', 'requests', 'bs4'],
    'pdf': ['langchain_community', 'pypdf'],
    'docx': ['langchain_community', 'unstructured']
}

def check_and_import_packages(file_type: str) -> Tuple[bool, str, Optional[Exception]]:
    """
    Check and import required packages for a specific file type.
    Returns (success, error_message, exception)
    """
    if file_type not in REQUIRED_PACKAGES:
        return True, "", None
        
    missing_packages = []
    for package in REQUIRED_PACKAGES[file_type]:
        if not importlib.util.find_spec(package):
            missing_packages.append(package)
    
    if missing_packages:
        error_msg = (f"ERROR: Missing required packages for {file_type} processing:\n"
                    f"  - Missing: {', '.join(missing_packages)}\n"
                    f"  - Install with: pip install {' '.join(missing_packages)}")
        return False, error_msg, None
        
    try:
        if file_type == 'url':
            from langchain_community.document_loaders import WebBaseLoader
        elif file_type == 'pdf':
            from langchain_community.document_loaders import PyPDFLoader
        elif file_type == 'docx':
            from langchain_community.document_loaders import UnstructuredWordDocumentLoader
        return True, "", None
    except Exception as e:
        return False, f"ERROR: Failed to import required modules for {file_type}:\n  {str(e)}", e

def count_characters(text: str) -> Dict[str, int]:
    """Count characters in text."""
    if not text:
        return {
            'total': 0,
            'excluding_spaces': 0,
            'japanese': 0
        }
    return {
        'total': len(text),
        'excluding_spaces': len(text.replace(' ', '').replace('\n', '').replace('\t', '')),
        'japanese': len([c for c in text if '\u4e00' <= c <= '\u9fff' or '\u3040' <= c <= '\u309f' or '\u30a0' <= c <= '\u30ff'])
    }

def format_char_count(counts: Dict[str, int]) -> str:
    """Format character count information."""
    return (f"文字数(スペース・改行含む): {counts['total']}\n"
            f"文字数(スペース・改行除く): {counts['excluding_spaces']}\n"
            f"日本語文字数: {counts['japanese']}")

def process_raw_text(text: str) -> Tuple[str, List[str]]:
    """Process raw text input."""
    errors = []
    if not text or not text.strip():
        return "", errors
    
    try:
        return f"\n=== Raw Text Input ===\n{text.strip()}\n", errors
    except Exception as e:
        errors.append(f"ERROR: Failed to process raw text input:\n  {str(e)}")
        return "", errors

def is_valid_url(url: str) -> bool:
    """Validate if the given string is a valid URL."""
    try:
        result = urlparse(url)
        return all([result.scheme, result.netloc])
    except:
        return False

def process_urls(urls: str) -> Tuple[str, List[str]]:
    """Extract text from URLs using WebBaseLoader."""
    errors = []
    if not urls.strip():
        return "", errors

    success, error_msg, _ = check_and_import_packages('url')
    if not success:
        errors.append(error_msg)
        return "", errors

    from langchain_community.document_loaders import WebBaseLoader
    
    combined_text = ""
    url_list = [url.strip() for url in urls.split('\n') if url.strip()]

    for url in url_list:
        if not is_valid_url(url):
            errors.append(f"ERROR: Invalid URL format: {url}")
            continue

        try:
            loader = WebBaseLoader(url)
            documents = loader.load()
            combined_text += f"\n=== Content from URL: {url} ===\n"
            for doc in documents:
                combined_text += doc.page_content + "\n"
        except Exception as e:
            errors.append(f"ERROR: Failed to process URL {url}:\n  {str(e)}\n  {traceback.format_exc()}")

    return combined_text, errors

def process_txt(txt_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]:
    """Process text files directly."""
    errors = []
    try:
        with open(txt_file.name, 'r', encoding='utf-8') as f:
            content = f.read()
        return f"\n=== Content from TXT: {os.path.basename(txt_file.name)} ===\n{content}\n", errors
    except UnicodeDecodeError:
        try:
            # Try with different encoding if UTF-8 fails
            with open(txt_file.name, 'r', encoding='shift-jis') as f:
                content = f.read()
            return f"\n=== Content from TXT: {os.path.basename(txt_file.name)} ===\n{content}\n", errors
        except Exception as e:
            errors.append(f"ERROR: Failed to process text file {os.path.basename(txt_file.name)} (encoding error):\n  {str(e)}\n  {traceback.format_exc()}")
            return "", errors
    except Exception as e:
        errors.append(f"ERROR: Failed to process text file {os.path.basename(txt_file.name)}:\n  {str(e)}\n  {traceback.format_exc()}")
        return "", errors

def process_pdf(pdf_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]:
    """Extract text from a PDF file using PyPDFLoader."""
    errors = []
    success, error_msg, _ = check_and_import_packages('pdf')
    if not success:
        errors.append(error_msg)
        return "", errors

    from langchain_community.document_loaders import PyPDFLoader

    try:
        loader = PyPDFLoader(pdf_file.name)
        documents = loader.load()
        
        content = ""
        for doc in documents:
            content += doc.page_content + "\n"
            
        return f"\n=== Content from PDF: {os.path.basename(pdf_file.name)} ===\n{content}", errors
    except Exception as e:
        errors.append(f"ERROR: Failed to process PDF {os.path.basename(pdf_file.name)}:\n  {str(e)}\n  {traceback.format_exc()}")
        return "", errors

def process_docx(docx_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]:
    """Extract text from a DOCX file using UnstructuredWordDocumentLoader."""
    errors = []
    success, error_msg, _ = check_and_import_packages('docx')
    if not success:
        errors.append(error_msg)
        return "", errors

    from langchain_community.document_loaders import UnstructuredWordDocumentLoader

    try:
        loader = UnstructuredWordDocumentLoader(docx_file.name)
        documents = loader.load()
        
        content = ""
        for doc in documents:
            content += doc.page_content + "\n"
            
        return f"\n=== Content from DOCX: {os.path.basename(docx_file.name)} ===\n{content}", errors
    except Exception as e:
        errors.append(f"ERROR: Failed to process DOCX {os.path.basename(docx_file.name)}:\n  {str(e)}\n  {traceback.format_exc()}")
        return "", errors

def process_file(file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]:
    """Process a file based on its extension."""
    errors = []
    if not file:
        return "", errors
        
    file_ext = os.path.splitext(file.name)[1].lower()
    
    # Process based on file extension
    if file_ext == '.txt':
        return process_txt(file)
    elif file_ext == '.pdf':
        return process_pdf(file)
    elif file_ext in ['.doc', '.docx']:
        return process_docx(file)
    else:
        return "", [f"ERROR: Unsupported file type: {file_ext}"]

def combine_content(raw_text: str, url_input: str, files: List[tempfile._TemporaryFileWrapper]) -> Tuple[str, str, str, str]:
    """Combine content from all sources into a single text file."""
    combined_text = ""
    all_errors = []
    
    # Process raw text if provided
    if raw_text:
        text_content, text_errors = process_raw_text(raw_text)
        combined_text += text_content
        all_errors.extend(text_errors)
    
    # Process URLs if provided
    if url_input:
        url_text, url_errors = process_urls(url_input)
        combined_text += url_text
        all_errors.extend(url_errors)
    
    # Process each uploaded file
    if files:
        for file in files:
            file_text, file_errors = process_file(file)
            combined_text += file_text
            all_errors.extend(file_errors)

    # Calculate character count
    char_counts = count_characters(combined_text)
    char_count_text = format_char_count(char_counts)

    if not combined_text.strip():
        if all_errors:
            combined_text = "No content was extracted due to errors. Please check the error messages below."
        else:
            combined_text = "No content was extracted. Please provide some input (text, URLs, or files)."

    # Format error messages
    error_text = "\n".join(all_errors) if all_errors else "処理は正常に完了しました。"

    # Save to temporary file for download
    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp_file:
        tmp_file.write(combined_text)
        output_path = tmp_file.name

    return combined_text, output_path, char_count_text, error_text

# Create Gradio interface
with gr.Blocks(title="Document Content Extractor") as demo:
    gr.Markdown("# Document Content Extractor")
    gr.Markdown("""テキスト、URL、各種ドキュメントからテキストを抽出・結合するツールです。
                
対応ファイル形式:
- テキストファイル (.txt)
- PDFファイル (.pdf) - pypdfが必要
- Wordドキュメント (.doc, .docx) - unstructuredが必要
                
必要なパッケージ:
- URL処理用: langchain-community, requests, beautifulsoup4
- PDF処理用: langchain-community, pypdf
- DOCX処理用: langchain-community, unstructured""")

    with gr.Row():
        with gr.Column():
            raw_text = gr.Textbox(
                label="テキスト入力",
                placeholder="直接テキストを入力できます...",
                lines=5
            )
            url_input = gr.Textbox(
                label="URL入力(1行に1つ)",
                placeholder="URLを入力してください...",
                lines=5
            )
            files = gr.File(
                label="ファイルアップロード",
                file_count="multiple",
                file_types=[".txt", ".pdf", ".doc", ".docx"]
            )
            combine_btn = gr.Button("抽出・結合")

        with gr.Column():
            error_output = gr.Textbox(
                label="エラー・警告",
                lines=3,
                interactive=False,
                show_copy_button=True
            )
            char_count_output = gr.Textbox(
                label="文字数",
                lines=3,
                interactive=False
            )
            text_output = gr.Textbox(
                label="抽出されたテキスト",
                lines=20,
                interactive=False,
                show_copy_button=True
            )
            file_output = gr.File(label="結合テキストをダウンロード")

    combine_btn.click(
        fn=combine_content,
        inputs=[raw_text, url_input, files],
        outputs=[text_output, file_output, char_count_output, error_output]
    )

if __name__ == "__main__":
    demo.launch()