structura_one / app.py
fudii0921's picture
Update app.py
8f05b7c verified
import os
import pandas as pd
import cohere
import gradio as gr
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor
# Docling: PDF構造解析
from docling.document_converter import DocumentConverter
# LangChain: チャンキング
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 環境変数の読み込み
load_dotenv()
COHERE_API_KEY = os.environ.get("COHERE_API_KEY")
co = cohere.ClientV2(api_key=COHERE_API_KEY)
# Docling Converterの初期化
converter = DocumentConverter()
def cleanse_text_with_cohere(raw_text):
"""Cohereを使用してテキストをクレンジング(並列実行用)"""
system_message = """
あなたは高度なドキュメントエディターです。
提供されたMarkdownテキストを、以下のルールに従ってクレンジングしてください:
1. ページ番号、不自然なリピートヘッダー、システムログを削除。
2. 文の途中で切れている不自然な改行を結合し、自然な文章にする。
3. 表(|---|---|)の形式が崩れている場合は、正しいMarkdownテーブル形式に修正。
4. 階層構造が不明瞭な場合は、適切な見出し(#、##、###)を付与。
5. 出力は純粋なMarkdownテキストのみとし、説明は不要です。
"""
try:
response = co.chat(
model="command-r-plus-08-2024",
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": f"以下のテキストをクレンジングしてください:\n\n{raw_text}"}
]
)
return response.message.content[0].text
except Exception as e:
return f"Cleansing Error: {e}\nRaw Text: {raw_text}"
def process_pdf_for_rag(files, apply_cleansing):
"""
PDFを解析し、並列クレンジングを実行して結果をストリーミングで返す
"""
if not files:
yield "ファイルがアップロードされていません。", None, "エラー: ファイルがありません"
return
all_markdown_content = ""
excel_tables = []
accumulated_text = ""
# 1. Doclingによる解析工程
for idx, file_info in enumerate(files):
pdf_path = file_info.name
filename = os.path.basename(pdf_path)
status = f"【工程 1/4】解析中 ({idx+1}/{len(files)}): {filename}..."
yield accumulated_text, None, status
result = converter.convert(pdf_path)
markdown_text = result.document.export_to_markdown()
all_markdown_content += f"\n\n{markdown_text}"
for i, table in enumerate(result.document.tables):
try:
df = table.export_to_dataframe()
if not df.empty:
excel_tables.append({
"sheet_name": f"Tab_{i}_{filename[:15]}",
"df": df,
"filename": filename
})
except Exception as e:
print(f"Table export error: {e}")
# 2. チャンキング工程 (チャンクサイズ 2000)
yield accumulated_text, None, "【工程 2/4】セマンティック・チャンキング(2000文字)実行中..."
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=2000,
chunk_overlap=200,
separators=["\n# ", "\n## ", "\n### ", "\n\n", "\n", " "]
)
raw_chunks = text_splitter.split_text(all_markdown_content)
total_chunks = len(raw_chunks)
# 3. 並列クレンジング工程
if apply_cleansing:
status = f"【工程 3/4】並列AIクレンジング実行中 (全 {total_chunks} チャンク)..."
yield accumulated_text, None, status
# 最大5スレッドで並列処理(APIのレート制限に応じて調整可)
with ThreadPoolExecutor(max_workers=5) as executor:
cleansed_results = list(executor.map(cleanse_text_with_cohere, raw_chunks))
for idx, processed_chunk in enumerate(cleansed_results):
accumulated_text += f"### CLEANSED CHUNK {idx+1} ###\n{processed_chunk}\n\n"
yield accumulated_text, None, f"クレンジング結果を表示中 ({idx+1}/{total_chunks})..."
else:
for idx, chunk in enumerate(raw_chunks):
accumulated_text += f"### RAW CHUNK {idx+1} ###\n{chunk}\n\n"
yield accumulated_text, None, f"RAWチャンクを表示中 ({idx+1}/{total_chunks})..."
# 4. Excel内容をテキストに結合 & ファイル生成
yield accumulated_text, None, "【工程 4/4】表データをテキストへ結合中..."
if excel_tables:
accumulated_text += "\n\n" + "="*50 + "\n"
accumulated_text += "📊 抽出された表データプレビュー (Excel出力内容)\n"
accumulated_text += "="*50 + "\n\n"
excel_path = "extracted_financial_data.xlsx"
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
for item in excel_tables:
df = item["df"]
s_name = item["sheet_name"][:30].replace("[", "").replace("]", "")
df.to_excel(writer, sheet_name=s_name, index=False)
accumulated_text += f"📄 表: {item['sheet_name']} (from {item['filename']})\n"
accumulated_text += df.to_markdown(index=False) + "\n\n"
yield accumulated_text, None, status
else:
excel_path = None
yield accumulated_text, excel_path, f"処理完了: {total_chunks}チャンクを並列処理しました。"
# --- Gradio UI ---
with gr.Blocks(title="STRUCTURA ONE", theme=gr.themes.Ocean()) as demo:
gr.HTML("""
<div style="display: flex; flex-direction: column; justify-content: center; align-items: center; text-align: center; padding: 20px; background-color: lightsteelblue; border-radius: 10px;">
<img src="https://www.ryhintl.com/images/structura_logo.png" alt="Structura ONE" width="100" height="100" />
<h1 style="color: #1a365d; margin-top: 10px; margin-bottom: 0;">🚀 Unleash your Documents</h1>
<p style="color: #4a5568; margin-top: 5px;">Parallelized RAG Parsing Engine | Docling × Cohere</p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="PDFをアップロード", file_count="multiple", file_types=[".pdf"])
cleansing_switch = gr.Checkbox(label="高速並列AIクレンジングを適用", value=True)
run_btn = gr.Button("解析開始", variant="primary")
status_out = gr.Textbox(label="現在のステータス", interactive=False)
excel_out = gr.File(label="Excelダウンロード")
with gr.Column(scale=2):
text_out = gr.Textbox(
label="構造化Markdown & テーブルプレビュー",
lines=30,
max_lines=50
)
# 処理中のボタン無効化はGradioのデフォルト動作で制御
run_btn.click(
fn=process_pdf_for_rag,
inputs=[file_input, cleansing_switch],
outputs=[text_out, excel_out, status_out],
queue=True
)
if __name__ == "__main__":
demo.queue().launch(css="footer {visibility: hidden;}")