tregu0458's picture
Create app.py
c6a2c26 verified
import sys
import os
import tempfile
from typing import List, Tuple, Optional, Dict
from urllib.parse import urlparse
import importlib
import traceback
# Base required imports
import gradio as gr
# Dictionary of required packages for each file type
REQUIRED_PACKAGES = {
'url': ['langchain_community', 'requests', 'bs4'],
'pdf': ['langchain_community', 'pypdf'],
'docx': ['langchain_community', 'unstructured']
}
def check_and_import_packages(file_type: str) -> Tuple[bool, str, Optional[Exception]]:
"""
Check and import required packages for a specific file type.
Returns (success, error_message, exception)
"""
if file_type not in REQUIRED_PACKAGES:
return True, "", None
missing_packages = []
for package in REQUIRED_PACKAGES[file_type]:
if not importlib.util.find_spec(package):
missing_packages.append(package)
if missing_packages:
error_msg = (f"ERROR: Missing required packages for {file_type} processing:\n"
f" - Missing: {', '.join(missing_packages)}\n"
f" - Install with: pip install {' '.join(missing_packages)}")
return False, error_msg, None
try:
if file_type == 'url':
from langchain_community.document_loaders import WebBaseLoader
elif file_type == 'pdf':
from langchain_community.document_loaders import PyPDFLoader
elif file_type == 'docx':
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
return True, "", None
except Exception as e:
return False, f"ERROR: Failed to import required modules for {file_type}:\n {str(e)}", e
def count_characters(text: str) -> Dict[str, int]:
"""Count characters in text."""
if not text:
return {
'total': 0,
'excluding_spaces': 0,
'japanese': 0
}
return {
'total': len(text),
'excluding_spaces': len(text.replace(' ', '').replace('\n', '').replace('\t', '')),
'japanese': len([c for c in text if '\u4e00' <= c <= '\u9fff' or '\u3040' <= c <= '\u309f' or '\u30a0' <= c <= '\u30ff'])
}
def format_char_count(counts: Dict[str, int]) -> str:
"""Format character count information."""
return (f"文字数(スペース・改行含む): {counts['total']}\n"
f"文字数(スペース・改行除く): {counts['excluding_spaces']}\n"
f"日本語文字数: {counts['japanese']}")
def process_raw_text(text: str) -> Tuple[str, List[str]]:
"""Process raw text input."""
errors = []
if not text or not text.strip():
return "", errors
try:
return f"\n=== Raw Text Input ===\n{text.strip()}\n", errors
except Exception as e:
errors.append(f"ERROR: Failed to process raw text input:\n {str(e)}")
return "", errors
def is_valid_url(url: str) -> bool:
"""Validate if the given string is a valid URL."""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except:
return False
def process_urls(urls: str) -> Tuple[str, List[str]]:
"""Extract text from URLs using WebBaseLoader."""
errors = []
if not urls.strip():
return "", errors
success, error_msg, _ = check_and_import_packages('url')
if not success:
errors.append(error_msg)
return "", errors
from langchain_community.document_loaders import WebBaseLoader
combined_text = ""
url_list = [url.strip() for url in urls.split('\n') if url.strip()]
for url in url_list:
if not is_valid_url(url):
errors.append(f"ERROR: Invalid URL format: {url}")
continue
try:
loader = WebBaseLoader(url)
documents = loader.load()
combined_text += f"\n=== Content from URL: {url} ===\n"
for doc in documents:
combined_text += doc.page_content + "\n"
except Exception as e:
errors.append(f"ERROR: Failed to process URL {url}:\n {str(e)}\n {traceback.format_exc()}")
return combined_text, errors
def process_txt(txt_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]:
"""Process text files directly."""
errors = []
try:
with open(txt_file.name, 'r', encoding='utf-8') as f:
content = f.read()
return f"\n=== Content from TXT: {os.path.basename(txt_file.name)} ===\n{content}\n", errors
except UnicodeDecodeError:
try:
# Try with different encoding if UTF-8 fails
with open(txt_file.name, 'r', encoding='shift-jis') as f:
content = f.read()
return f"\n=== Content from TXT: {os.path.basename(txt_file.name)} ===\n{content}\n", errors
except Exception as e:
errors.append(f"ERROR: Failed to process text file {os.path.basename(txt_file.name)} (encoding error):\n {str(e)}\n {traceback.format_exc()}")
return "", errors
except Exception as e:
errors.append(f"ERROR: Failed to process text file {os.path.basename(txt_file.name)}:\n {str(e)}\n {traceback.format_exc()}")
return "", errors
def process_pdf(pdf_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]:
"""Extract text from a PDF file using PyPDFLoader."""
errors = []
success, error_msg, _ = check_and_import_packages('pdf')
if not success:
errors.append(error_msg)
return "", errors
from langchain_community.document_loaders import PyPDFLoader
try:
loader = PyPDFLoader(pdf_file.name)
documents = loader.load()
content = ""
for doc in documents:
content += doc.page_content + "\n"
return f"\n=== Content from PDF: {os.path.basename(pdf_file.name)} ===\n{content}", errors
except Exception as e:
errors.append(f"ERROR: Failed to process PDF {os.path.basename(pdf_file.name)}:\n {str(e)}\n {traceback.format_exc()}")
return "", errors
def process_docx(docx_file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]:
"""Extract text from a DOCX file using UnstructuredWordDocumentLoader."""
errors = []
success, error_msg, _ = check_and_import_packages('docx')
if not success:
errors.append(error_msg)
return "", errors
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
try:
loader = UnstructuredWordDocumentLoader(docx_file.name)
documents = loader.load()
content = ""
for doc in documents:
content += doc.page_content + "\n"
return f"\n=== Content from DOCX: {os.path.basename(docx_file.name)} ===\n{content}", errors
except Exception as e:
errors.append(f"ERROR: Failed to process DOCX {os.path.basename(docx_file.name)}:\n {str(e)}\n {traceback.format_exc()}")
return "", errors
def process_file(file: tempfile._TemporaryFileWrapper) -> Tuple[str, List[str]]:
"""Process a file based on its extension."""
errors = []
if not file:
return "", errors
file_ext = os.path.splitext(file.name)[1].lower()
# Process based on file extension
if file_ext == '.txt':
return process_txt(file)
elif file_ext == '.pdf':
return process_pdf(file)
elif file_ext in ['.doc', '.docx']:
return process_docx(file)
else:
return "", [f"ERROR: Unsupported file type: {file_ext}"]
def combine_content(raw_text: str, url_input: str, files: List[tempfile._TemporaryFileWrapper]) -> Tuple[str, str, str, str]:
"""Combine content from all sources into a single text file."""
combined_text = ""
all_errors = []
# Process raw text if provided
if raw_text:
text_content, text_errors = process_raw_text(raw_text)
combined_text += text_content
all_errors.extend(text_errors)
# Process URLs if provided
if url_input:
url_text, url_errors = process_urls(url_input)
combined_text += url_text
all_errors.extend(url_errors)
# Process each uploaded file
if files:
for file in files:
file_text, file_errors = process_file(file)
combined_text += file_text
all_errors.extend(file_errors)
# Calculate character count
char_counts = count_characters(combined_text)
char_count_text = format_char_count(char_counts)
if not combined_text.strip():
if all_errors:
combined_text = "No content was extracted due to errors. Please check the error messages below."
else:
combined_text = "No content was extracted. Please provide some input (text, URLs, or files)."
# Format error messages
error_text = "\n".join(all_errors) if all_errors else "処理は正常に完了しました。"
# Save to temporary file for download
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp_file:
tmp_file.write(combined_text)
output_path = tmp_file.name
return combined_text, output_path, char_count_text, error_text
# Create Gradio interface
with gr.Blocks(title="Document Content Extractor") as demo:
gr.Markdown("# Document Content Extractor")
gr.Markdown("""テキスト、URL、各種ドキュメントからテキストを抽出・結合するツールです。
対応ファイル形式:
- テキストファイル (.txt)
- PDFファイル (.pdf) - pypdfが必要
- Wordドキュメント (.doc, .docx) - unstructuredが必要
必要なパッケージ:
- URL処理用: langchain-community, requests, beautifulsoup4
- PDF処理用: langchain-community, pypdf
- DOCX処理用: langchain-community, unstructured""")
with gr.Row():
with gr.Column():
raw_text = gr.Textbox(
label="テキスト入力",
placeholder="直接テキストを入力できます...",
lines=5
)
url_input = gr.Textbox(
label="URL入力(1行に1つ)",
placeholder="URLを入力してください...",
lines=5
)
files = gr.File(
label="ファイルアップロード",
file_count="multiple",
file_types=[".txt", ".pdf", ".doc", ".docx"]
)
combine_btn = gr.Button("抽出・結合")
with gr.Column():
error_output = gr.Textbox(
label="エラー・警告",
lines=3,
interactive=False,
show_copy_button=True
)
char_count_output = gr.Textbox(
label="文字数",
lines=3,
interactive=False
)
text_output = gr.Textbox(
label="抽出されたテキスト",
lines=20,
interactive=False,
show_copy_button=True
)
file_output = gr.File(label="結合テキストをダウンロード")
combine_btn.click(
fn=combine_content,
inputs=[raw_text, url_input, files],
outputs=[text_output, file_output, char_count_output, error_output]
)
if __name__ == "__main__":
demo.launch()