import os import pandas as pd import cohere import gradio as gr from dotenv import load_dotenv from concurrent.futures import ThreadPoolExecutor # Docling: PDF構造解析 from docling.document_converter import DocumentConverter # LangChain: チャンキング from langchain_text_splitters import RecursiveCharacterTextSplitter # 環境変数の読み込み load_dotenv() COHERE_API_KEY = os.environ.get("COHERE_API_KEY") co = cohere.ClientV2(api_key=COHERE_API_KEY) # Docling Converterの初期化 converter = DocumentConverter() def cleanse_text_with_cohere(raw_text): """Cohereを使用してテキストをクレンジング(並列実行用)""" system_message = """ あなたは高度なドキュメントエディターです。 提供されたMarkdownテキストを、以下のルールに従ってクレンジングしてください: 1. ページ番号、不自然なリピートヘッダー、システムログを削除。 2. 文の途中で切れている不自然な改行を結合し、自然な文章にする。 3. 表(|---|---|)の形式が崩れている場合は、正しいMarkdownテーブル形式に修正。 4. 階層構造が不明瞭な場合は、適切な見出し(#、##、###)を付与。 5. 出力は純粋なMarkdownテキストのみとし、説明は不要です。 """ try: response = co.chat( model="command-r-plus-08-2024", messages=[ {"role": "system", "content": system_message}, {"role": "user", "content": f"以下のテキストをクレンジングしてください:\n\n{raw_text}"} ] ) return response.message.content[0].text except Exception as e: return f"Cleansing Error: {e}\nRaw Text: {raw_text}" def process_pdf_for_rag(files, apply_cleansing): """ PDFを解析し、並列クレンジングを実行して結果をストリーミングで返す """ if not files: yield "ファイルがアップロードされていません。", None, "エラー: ファイルがありません" return all_markdown_content = "" excel_tables = [] accumulated_text = "" # 1. Doclingによる解析工程 for idx, file_info in enumerate(files): pdf_path = file_info.name filename = os.path.basename(pdf_path) status = f"【工程 1/4】解析中 ({idx+1}/{len(files)}): {filename}..." yield accumulated_text, None, status result = converter.convert(pdf_path) markdown_text = result.document.export_to_markdown() all_markdown_content += f"\n\n{markdown_text}" for i, table in enumerate(result.document.tables): try: df = table.export_to_dataframe() if not df.empty: excel_tables.append({ "sheet_name": f"Tab_{i}_{filename[:15]}", "df": df, "filename": filename }) except Exception as e: print(f"Table export error: {e}") # 2. チャンキング工程 (チャンクサイズ 2000) yield accumulated_text, None, "【工程 2/4】セマンティック・チャンキング(2000文字)実行中..." text_splitter = RecursiveCharacterTextSplitter( chunk_size=2000, chunk_overlap=200, separators=["\n# ", "\n## ", "\n### ", "\n\n", "\n", " "] ) raw_chunks = text_splitter.split_text(all_markdown_content) total_chunks = len(raw_chunks) # 3. 並列クレンジング工程 if apply_cleansing: status = f"【工程 3/4】並列AIクレンジング実行中 (全 {total_chunks} チャンク)..." yield accumulated_text, None, status # 最大5スレッドで並列処理(APIのレート制限に応じて調整可) with ThreadPoolExecutor(max_workers=5) as executor: cleansed_results = list(executor.map(cleanse_text_with_cohere, raw_chunks)) for idx, processed_chunk in enumerate(cleansed_results): accumulated_text += f"### CLEANSED CHUNK {idx+1} ###\n{processed_chunk}\n\n" yield accumulated_text, None, f"クレンジング結果を表示中 ({idx+1}/{total_chunks})..." else: for idx, chunk in enumerate(raw_chunks): accumulated_text += f"### RAW CHUNK {idx+1} ###\n{chunk}\n\n" yield accumulated_text, None, f"RAWチャンクを表示中 ({idx+1}/{total_chunks})..." # 4. Excel内容をテキストに結合 & ファイル生成 yield accumulated_text, None, "【工程 4/4】表データをテキストへ結合中..." if excel_tables: accumulated_text += "\n\n" + "="*50 + "\n" accumulated_text += "📊 抽出された表データプレビュー (Excel出力内容)\n" accumulated_text += "="*50 + "\n\n" excel_path = "extracted_financial_data.xlsx" with pd.ExcelWriter(excel_path, engine='openpyxl') as writer: for item in excel_tables: df = item["df"] s_name = item["sheet_name"][:30].replace("[", "").replace("]", "") df.to_excel(writer, sheet_name=s_name, index=False) accumulated_text += f"📄 表: {item['sheet_name']} (from {item['filename']})\n" accumulated_text += df.to_markdown(index=False) + "\n\n" yield accumulated_text, None, status else: excel_path = None yield accumulated_text, excel_path, f"処理完了: {total_chunks}チャンクを並列処理しました。" # --- Gradio UI --- with gr.Blocks(title="STRUCTURA ONE", theme=gr.themes.Ocean()) as demo: gr.HTML("""
Parallelized RAG Parsing Engine | Docling × Cohere