Spaces:
Sleeping
Sleeping
| #pip3 install cohere google-genai python-dotenv tqdm psycopg2-binary gradio | |
| import os | |
| import cohere | |
| import base64 | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| import fitz # PyMuPDF | |
| from google import genai | |
| from google.genai import types | |
| import numpy as np | |
| import psycopg2 | |
| import gradio as gr | |
| import shutil # To clean up images directory | |
| # 環境変数をロードします | |
| load_dotenv() | |
| COHERE_API_KEY = os.getenv("COHERE_API_KEY") | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| # Cohereクライアントを初期化します | |
| # APIキーが設定されていない場合は、エラーを処理できるようにNoneに設定します | |
| try: | |
| co = cohere.ClientV2(api_key=COHERE_API_KEY) | |
| except Exception as e: | |
| print(f"Cohereクライアントの初期化に失敗しました: {e}。COHERE_API_KEYが正しく設定されているか確認してください。") | |
| co = None # エラーを後で処理するためにNoneに設定 | |
| # データベース接続情報 (ユーザーの指示に従いハードコード) | |
| '''DB_CONFIG = { | |
| "dbname": "smair", | |
| "user": "smairuser", | |
| "password": "smairuser", | |
| "host": "www.ryhintl.com", | |
| "port": 10629 | |
| }''' | |
| # 処理済みファイルを保存するディレクトリ | |
| # アプリケーション起動時に存在確認し、クリーンアップします | |
| IMAGES_DIR = Path("./pdfimages") | |
| if IMAGES_DIR.exists(): | |
| print(IMAGES_DIR) | |
| #shutil.rmtree(IMAGES_DIR) # 以前の実行からの画像をクリア | |
| IMAGES_DIR.mkdir(exist_ok=True) | |
| def convert_pdf_to_images(pdf_path, output_dir_str, zoom=2.0): | |
| """ | |
| 指定されたPDFファイルをスライド単位で画像(PNG)に変換する関数です。 | |
| Parameters: | |
| ---------- | |
| pdf_path : str or Path | |
| 入力PDFファイルのパス。 | |
| output_dir_str : str | |
| 変換後のPNG画像を保存するディレクトリのパス(文字列)。 | |
| zoom : float, optional (default=2.0) | |
| 出力画像の拡大率。 | |
| Returns: | |
| ------- | |
| image_paths : list of str | |
| 変換された各スライド画像(PNG)のファイルパス一覧。 | |
| """ | |
| output_dir = Path(output_dir_str) | |
| output_dir.mkdir(parents=True, exist_ok=True) # ディレクトリが存在しない場合は作成 | |
| pdf_name = Path(pdf_path).stem # PDFファイル名から拡張子を除去 | |
| doc = fitz.open(pdf_path) # PyMuPDFでPDFを開く | |
| image_paths = [] | |
| for i, page in enumerate(doc): | |
| mat = fitz.Matrix(zoom, zoom) # 拡大行列を作成 | |
| pix = page.get_pixmap(matrix=mat) # ピクセルマップを取得 | |
| image_path = output_dir / f"{pdf_name}_{i+1}.png" # 出力画像パスを生成 | |
| pix.save(str(image_path)) # 画像を保存 | |
| image_paths.append(str(image_path)) # パスリストに追加 | |
| doc.close() # PDFドキュメントを閉じる | |
| return image_paths | |
| def get_image_embedding(image_path): | |
| """ | |
| 指定した画像ファイル(PNGなど)を Cohere Embed 4 を使ってベクトル化する関数です。 | |
| Parameters: | |
| ---------- | |
| image_path : str or Path | |
| ベクトル化したい画像ファイルのパス。 | |
| Returns: | |
| ------- | |
| response : cohere.EmbedByTypeResponse | |
| Cohere API から返されるレスポンスオブジェクト。 | |
| """ | |
| if co is None: | |
| raise ValueError("Cohereクライアントが初期化されていません。COHERE_API_KEYを確認してください。") | |
| with open(image_path, "rb") as f: | |
| image_data = f.read() # 画像ファイルをバイナリで読み込み | |
| stringified_buffer = base64.b64encode(image_data).decode("utf-8") # base64エンコード | |
| content_type = "image/png" # MIMEタイプ | |
| image_base64 = f"data:{content_type};base64,{stringified_buffer}" # データURI形式 | |
| response = co.embed( | |
| model="embed-v4.0", # 使用するCohereモデル | |
| input_type="image", # 入力タイプは画像 | |
| embedding_types=["float"], # 浮動小数点ベクトルとして埋め込みを要求 | |
| images=[image_base64], # エンコードされた画像リスト | |
| ) | |
| return response | |
| def generate_answer_with_gemini(image_paths, text_prompt): | |
| """ | |
| 複数の画像とテキストプロンプトを使って、Google Gemini API によるマルチモーダル応答を生成する関数です。 | |
| Parameters: | |
| ---------- | |
| image_paths : list of str or Path | |
| 入力画像ファイルのパスリスト。 | |
| text_prompt : str | |
| Geminiモデルに渡す自然言語のプロンプト(質問・指示文)。 | |
| Returns: | |
| ------- | |
| response_text : str | |
| Gemini APIから返された自然言語応答。API呼び出しに失敗した場合はエラーメッセージを返します。 | |
| """ | |
| if not GEMINI_API_KEY: | |
| return "[Gemini APIエラー] GEMINI_API_KEYが環境変数に設定されていません。" | |
| client = genai.Client(api_key=GEMINI_API_KEY) # Geminiクライアントを初期化 | |
| contents = [text_prompt] # プロンプトをコンテンツリストに追加 | |
| # 画像パスを検証し、存在するファイルのみを処理します | |
| valid_image_paths = [] | |
| for path_str in image_paths: | |
| path = Path(path_str) | |
| if path.exists() and path.is_file(): | |
| valid_image_paths.append(path) | |
| else: | |
| print(f"警告: 画像ファイルが見つかりません: {path_str}") | |
| if not valid_image_paths: | |
| return "[Geminiエラー] 有効な画像がGeminiに渡されませんでした。検索結果を確認してください。" | |
| for path in valid_image_paths: | |
| mime_type = "image/png" if path.suffix.lower() == ".png" else "image/jpeg" | |
| try: | |
| with open(path, "rb") as f: | |
| image_bytes = f.read() # 画像をバイトデータとして読み込み | |
| part = types.Part.from_bytes(data=image_bytes, mime_type=mime_type) # Gemini Partオブジェクトに変換 | |
| contents.append(part) # コンテンツリストに画像を追加 | |
| except Exception as e: | |
| print(f"警告: Geminiへの画像読み込み中にエラーが発生しました: {path}, エラー: {e}") | |
| continue # エラーが発生しても続行 | |
| try: | |
| response = client.models.generate_content( | |
| model="gemini-2.5-flash", # 使用するGeminiモデル | |
| contents=contents, # プロンプトと画像を含むコンテンツ | |
| config=types.GenerateContentConfig( | |
| system_instruction="あなたはスライド画像に関する質問に答えるAIアシスタントです。スライド画像の内容を理解し、質問に答えてください。回答にはスライド画像の内容のみを使用してください。", | |
| ), | |
| ) | |
| return response.text # Geminiの回答テキストを返す | |
| except Exception as e: | |
| return f"[Gemini APIエラー] {str(e)}" # エラーメッセージを返す | |
| def build_pgvector_index_gradio(pdf_file_path, progress=gr.Progress()): | |
| """ | |
| PDFを画像に変換し、画像から埋め込みを生成し、pgvectorに保存する関数。 | |
| Gradioからの呼び出し用にラップし、プログレスバーに対応します。 | |
| Parameters: | |
| ---------- | |
| pdf_file_path : str | |
| アップロードされたPDFファイルのパス。 | |
| progress : gr.Progress | |
| Gradioのプログレスバーオブジェクト。 | |
| Returns: | |
| ------- | |
| tuple: (str, list) | |
| 処理ステータスメッセージと、処理された画像ファイルのパスリスト。 | |
| """ | |
| if not COHERE_API_KEY: | |
| yield "エラー: COHERE_API_KEYが設定されていません。", [] | |
| return | |
| if not pdf_file_path: | |
| yield "エラー: PDFファイルが提供されていません。", [] | |
| return | |
| # 新しいPDFアップロードのために、以前の画像とパスをクリアします | |
| if IMAGES_DIR.exists(): | |
| shutil.rmtree(IMAGES_DIR) | |
| IMAGES_DIR.mkdir(exist_ok=True) | |
| try: | |
| # 1. PDFを画像に変換 | |
| processing_message = "PDFから画像を抽出中..." | |
| progress(0.1, desc=processing_message) # 進捗を更新 | |
| yield processing_message, [] # UIを更新 | |
| image_paths = convert_pdf_to_images(pdf_file_path, str(IMAGES_DIR)) | |
| if not image_paths: | |
| yield "警告: PDFから画像が抽出されませんでした。ファイルを確認してください。", [] | |
| return | |
| conn = None | |
| try: | |
| #conn = psycopg2.connect(**DB_CONFIG) # データベースに接続 | |
| conn = psycopg2.connect( | |
| dbname="smair", | |
| user="smairuser", | |
| password="smairuser", | |
| host="www.ryhintl.com", | |
| port=10629 | |
| ) | |
| conn.autocommit = True # 個別のトランザクション処理のために自動コミットを有効化 | |
| cursor = conn.cursor() | |
| # imgpdf_embeddings テーブルが存在するかどうかを確認 | |
| cursor.execute(""" | |
| SELECT EXISTS ( | |
| SELECT FROM | |
| pg_tables | |
| WHERE | |
| schemaname = 'public' AND | |
| tablename = 'imgpdf_embeddings' | |
| ); | |
| """) | |
| table_exists = cursor.fetchone()[0] | |
| if not table_exists: | |
| table_create_message = "テーブル 'imgpdf_embeddings' が存在しません。新しく作成します。" | |
| progress(0.2, desc=table_create_message) | |
| yield table_create_message, [] | |
| cursor.execute(""" | |
| CREATE TABLE imgpdf_embeddings ( | |
| id SERIAL PRIMARY KEY, | |
| image_path VARCHAR(255) UNIQUE NOT NULL, | |
| embedding VECTOR(1536) -- Cohere Embed v4.0 (Image)の次元に合わせる | |
| ); | |
| """) | |
| else: | |
| table_exists_message = "テーブル 'imgpdf_embeddings' は既に存在します。" | |
| progress(0.2, desc=table_exists_message) | |
| yield table_exists_message, [] | |
| processing_message = f"{len(image_paths)} 枚のスライド画像を生成しました。画像を埋め込み、pgvectorデータベースに保存中..." | |
| yield processing_message, image_paths # 処理された画像パスをすぐに表示 | |
| # 埋め込みを繰り返し挿入し、進捗を更新 | |
| for i, path in enumerate(image_paths): | |
| try: | |
| res = get_image_embedding(path) | |
| embedding = res.embeddings.float_[0] | |
| cursor.execute( | |
| "INSERT INTO imgpdf_embeddings (image_path, embedding) VALUES (%s, %s) ON CONFLICT (image_path) DO NOTHING;", | |
| (str(path), str(embedding)) # リストをそのままstr()で文字列化 | |
| ) | |
| # プログレスバーを更新 | |
| progress((i + 1) / len(image_paths), desc=f"{Path(path).name} の埋め込みを保存しました。") | |
| except cohere.errors.TooManyRequestsError as e: | |
| final_message = f"エラー: Cohere APIのレート制限に達しました。{e.message}。残りの処理を中断します。" | |
| yield final_message, image_paths | |
| return # レート制限に達した場合は処理を停止 | |
| except Exception as e: | |
| # 個別の挿入が失敗しても続行 | |
| warning_message = f"警告: {Path(path).name} の処理中にエラーが発生しました。スキップします。エラー: {e}" | |
| print(warning_message) # デバッグのためにコンソールに出力 | |
| yield warning_message, image_paths # UIを警告で更新 | |
| continue | |
| final_message = "pgvectorへの保存が完了しました。" | |
| yield final_message, image_paths | |
| except psycopg2.Error as e: | |
| yield f"データベース接続または操作エラー: {e}", [] | |
| except Exception as e: | |
| yield f"予期せぬエラーが発生しました: {e}", [] | |
| finally: | |
| if conn: | |
| conn.close() # データベース接続を閉じる | |
| except Exception as e: | |
| yield f"PDF処理中にエラーが発生しました: {e}", [] | |
| def search_pgvector_with_text_gradio(text_query_str): | |
| """ | |
| テキストクエリに基づいて、pgvectorで類似画像を検索し、Geminiで回答を生成する関数。 | |
| Gradioからの呼び出し用にラップします。 | |
| Parameters: | |
| ---------- | |
| text_query_str : str | |
| 検索するテキストクエリ。 | |
| Returns: | |
| ------- | |
| tuple: (str, list, str) | |
| 検索ステータスメッセージ、検索された画像ファイルのパスリスト、Geminiによる回答。 | |
| """ | |
| if not text_query_str: | |
| return "検索クエリを入力してください。", [], "" | |
| if co is None: | |
| return "エラー: Cohereクライアントが初期化されていません。COHERE_API_KEYを確認してください。", [], "" | |
| conn = None | |
| try: | |
| #conn = psycopg2.connect(**DB_CONFIG) # データベースに接続 | |
| conn = psycopg2.connect( | |
| dbname="smair", | |
| user="smairuser", | |
| password="smairuser", | |
| host="www.ryhintl.com", | |
| port=10629 | |
| ) | |
| cursor = conn.cursor() | |
| response = co.embed( | |
| model="embed-v4.0", # テキスト埋め込みに使用するCohereモデル | |
| input_type="text", # 入力タイプはテキスト | |
| embedding_types=["float"], | |
| texts=[text_query_str], | |
| ) | |
| query_vec = response.embeddings.float_[0] # ここで query_vec は既にリストです | |
| # pgvectorのL2距離演算子 `<->` を使用して類似検索 | |
| # LIMITはk=3に固定 | |
| cursor.execute( | |
| "SELECT image_path FROM imgpdf_embeddings ORDER BY embedding <-> %s LIMIT %s", | |
| (str(query_vec), 3) # query_vec は既にリストなので、そのままstr()で文字列化 | |
| ) | |
| results = cursor.fetchall() # 結果を取得 | |
| cursor.close() | |
| conn.close() # データベース接続を閉じる | |
| retrieved_image_paths = [row[0] for row in results] # 検索された画像パスを抽出 | |
| if not retrieved_image_paths: | |
| return "テキストクエリに一致する画像が見つかりませんでした。", [], "一致する画像が見つかりませんでした。" | |
| # 検索された画像パスを表示し、Geminiで回答を生成 | |
| gemini_answer = generate_answer_with_gemini(retrieved_image_paths, text_query_str) | |
| return "類似画像の検索とGeminiによる回答が完了しました。", retrieved_image_paths, gemini_answer | |
| except psycopg2.Error as e: | |
| return f"データベース検索エラー: {e}", [], "" | |
| except cohere.errors.TooManyRequestsError as e: | |
| return f"エラー: Cohere APIのレート制限に達しました。{e.message}", [], "" | |
| except Exception as e: | |
| return f"予期せぬエラーが発生しました: {e}", [], "" | |
| finally: | |
| if conn: | |
| conn.close() | |
| # Gradio UI Blocks | |
| with gr.Blocks(title="PDF to RAG with Cohere, PGVector, and Gemini", css="footer {visibility: hidden;}") as imgpdf: | |
| gr.Markdown( | |
| """ | |
| <div style="font-size: 12px;"> | |
| <h1>PDFからのRAG(Retrieval-Augmented Generation)アプリ</h1> | |
| <p>PDFファイルをアップロードし、その内容をCohere EmbeddingsとPGVectorを使用して検索可能な形式でデータベースに保存します。その後、テキストクエリに基づいて関連するスライド画像を検索し、Google Geminiを使って回答を生成します。</p> | |
| <hr> | |
| </div> | |
| """ | |
| ) | |
| with gr.Tab("PDF処理"): | |
| pdf_upload_input = gr.File(label="PDFファイルをアップロード", type="filepath", file_types=[".pdf"]) | |
| process_button = gr.Button("PDFを処理してデータベースに保存") | |
| processing_status = gr.Markdown(label="処理ステータス", value="PDFをアップロードして「処理」ボタンを押してください。") | |
| # 処理された画像を表示するためのギャラリー | |
| processed_image_gallery = gr.Gallery(label="処理された画像 (データベースに保存済)", preview=True, height="auto", object_fit="contain") | |
| with gr.Tab("RAGによる質問"): | |
| query_text_input = gr.Textbox(label="質問を入力してください", placeholder="PDFの内容について質問を入力...", info="Mainstream領域でつなげている領域は何ですか?") | |
| search_button = gr.Button("質問を検索してGeminiで回答") | |
| search_status = gr.Markdown(label="検索ステータス") | |
| retrieved_images_gallery = gr.Gallery(label="検索された関連画像", preview=True, height="auto", object_fit="contain") | |
| gemini_answer_output = gr.Markdown(label="Geminiによる回答") | |
| # イベントハンドラー | |
| # `process_button`がクリックされたときに`build_pgvector_index_gradio`を実行 | |
| process_button.click( | |
| fn=build_pgvector_index_gradio, | |
| inputs=[pdf_upload_input], | |
| outputs=[processing_status, processed_image_gallery] | |
| ) | |
| # `search_button`がクリックされたときに`search_pgvector_with_text_gradio`を実行 | |
| search_button.click( | |
| fn=search_pgvector_with_text_gradio, | |
| inputs=[query_text_input], | |
| outputs=[search_status, retrieved_images_gallery, gemini_answer_output] | |
| ) | |
| # Gradioアプリを起動 | |
| if __name__ == "__main__": | |
| imgpdf.launch(share=False) # share=Trueで外部共有可能に | |