File size: 6,023 Bytes
a204c7a
854e13d
a204c7a
 
854e13d
a204c7a
 
854e13d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf4ecf8
854e13d
a204c7a
854e13d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a204c7a
f05a162
 
 
 
 
854e13d
 
 
a204c7a
 
854e13d
 
 
6d07546
a204c7a
854e13d
 
 
 
 
 
 
 
 
 
 
 
a204c7a
 
 
854e13d
a204c7a
 
 
 
 
854e13d
 
 
 
 
 
a204c7a
 
 
 
854e13d
a204c7a
854e13d
 
a204c7a
 
 
 
 
 
854e13d
 
 
a204c7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import os
from typing import Any, Dict, List, Tuple

import gradio as gr
from openai import OpenAI


def get_openai_client() -> OpenAI:
    """OpenAI API クライアントを環境変数から初期化して返す。"""
    api_key = os.getenv("OPENAI_API_KEY", "").strip()
    if not api_key:
        raise RuntimeError("OPENAI_API_KEY が設定されていません。環境変数を確認してください。")

    base_url = os.getenv("OPENAI_API_BASE", "").strip() or None
    return OpenAI(api_key=api_key, base_url=base_url)


def get_vector_store_id() -> str:
    """環境変数 VECTOR_STORE_ID から単一のベクターストアIDを取得する。"""
    value = os.getenv("VECTOR_STORE_ID", "").strip()
    if not value:
        raise RuntimeError("VECTOR_STORE_ID が設定されていません。環境変数を確認してください。")
    return value.split(",")[0].strip()


def extract_citations(response_dict: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Responses API レスポンスから file_search の引用情報を抽出する。"""
    citations: List[Dict[str, Any]] = []
    outputs = response_dict.get("output") or []

    for output in outputs:
        for content in output.get("content", []) or []:
            annotations = content.get("annotations") or []
            for annotation in annotations:
                file_citation = annotation.get("file_citation")
                if not file_citation:
                    continue
                entry = {
                    "file_id": file_citation.get("file_id"),
                    "file_name": file_citation.get("file_name"),
                    "vector_store_id": file_citation.get("vector_store_id"),
                    "quote": file_citation.get("quote"),
                }
                if entry not in citations:
                    citations.append(entry)
    return citations


def answer_with_file_search(
    vector_store_id: str,
    query: str,
    top_k: int = 5,
    model: str = "gpt-4o-mini",
) -> Tuple[str, List[Dict[str, Any]]]:
    """
    Responses API + file_search ツールを用いて検索と回答を同時に行い、回答と引用を返す。

    Cookbook の "Integrating search results with LLM in a single API call" を参考に構成。
    """
    if not query.strip():
        return "", []

    client = get_openai_client()
    system_prompt = (
        "あなたは取得したドキュメントから回答するアシスタントです。"
    )

    user_message = f"{query}\n\n検索結果は最大 {top_k} 件まで引用してください。"

    response = client.responses.create(
        model=model,
        input=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message},
        ],
        tools=[
            {
                "type": "file_search",
                "vector_store_ids": [vector_store_id],
            }
        ],
    )

    try:
        response_dict = response.model_dump()
    except AttributeError:
        # 旧バージョンSDK互換: model_dump が無ければ辞書化 fallback
        response_dict = getattr(response, "to_dict", lambda: {})()
    answer_text = response_dict.get("output_text") or ""
    citations = extract_citations(response_dict)
    return answer_text.strip(), citations


def format_answer(answer: str, citations: List[Dict[str, Any]]) -> str:
    """回答文と引用リストを UI 向けのテキストに整形する。"""
    if not answer:
        return "該当データがありません!"

    lines = [answer]
    if citations:
        lines.append("")
        lines.append("引用:")
        for citation in citations:
            file_name = citation.get("file_name") or citation.get("file_id") or "unknown"
            quote = citation.get("quote")
            if quote:
                lines.append(f"- {quote} [{file_name}]")
            else:
                lines.append(f"- [{file_name}]")
    return "\n".join(lines)


def produce_answer(question: str, top_k: int) -> str:
    """Gradio UI からの質問を処理し、Responses API の結果を返す。"""
    question = (question or "").strip()
    if not question:
        return "質問が入力されていません。"

    try:
        vector_store_id = get_vector_store_id()
        answer, citations = answer_with_file_search(vector_store_id, question, top_k=top_k)
        return format_answer(answer, citations)
    except RuntimeError as runtime_error:
        return str(runtime_error)
    except Exception:
        return "システムエラーが発生しました。時間をおいて再度お試しください。"


def respond(question: str, top_k: int = 5) -> str:
    """Gradio の UI から呼び出されるハンドラ。"""
    try:
        return produce_answer(question, int(top_k))
    except Exception:
        return "システムエラーが発生しました。時間をおいて再度お試しください。"


with gr.Blocks() as demo:
    gr.Markdown(
        """
        ## MCP Storage レスポンダ
        OpenAI Responses API の file_search ツールを利用し、指定した Vector Store から根拠付きで回答します。
        OPENAI_API_KEY と VECTOR_STORE_ID を環境変数に設定してからご利用ください。
        """
    )

    with gr.Row():
        question_box = gr.Textbox(
            label="質問",
            placeholder="ここに質問を入力してください(例: 製品Aの対応OSは?)",
            lines=4,
        )
    with gr.Row():
        top_k_slider = gr.Slider(
            minimum=1,
            maximum=10,
            value=5,
            step=1,
            label="検索件数 (top_k)",
        )
    with gr.Row():
        output_box = gr.Textbox(label="回答", lines=10)
    with gr.Row():
        submit_button = gr.Button("検索")

    submit_button.click(respond, inputs=[question_box, top_k_slider], outputs=output_box)


if __name__ == "__main__":
    demo.launch()