File size: 10,290 Bytes
f00fd2c
dcf4ed6
 
 
f00fd2c
 
 
 
 
 
 
 
dcf4ed6
f00fd2c
 
 
1d22c32
 
 
f00fd2c
1d22c32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d93a81
1d22c32
 
 
 
 
 
f00fd2c
1d22c32
 
 
 
 
 
 
 
3fefcc3
c7188b5
f00fd2c
c7188b5
f00fd2c
 
 
 
 
 
 
 
 
 
 
 
 
c7188b5
 
f00fd2c
c7188b5
dcf4ed6
f00fd2c
 
 
 
 
c7188b5
d60e9d2
f00fd2c
 
 
ce3c490
f00fd2c
 
 
c7188b5
 
f00fd2c
60a7f01
f00fd2c
 
60a7f01
f00fd2c
 
 
60a7f01
f00fd2c
 
60a7f01
 
 
f00fd2c
 
 
 
8f6e2f3
 
 
 
f00fd2c
 
60a7f01
f00fd2c
cc8a692
8f6e2f3
cc8a692
f00fd2c
 
 
 
8f6e2f3
 
 
cc8a692
8f6e2f3
cc8a692
8f6e2f3
 
 
 
 
 
 
f00fd2c
 
 
 
8f6e2f3
 
 
 
f00fd2c
 
 
 
 
 
 
 
 
8f6e2f3
f00fd2c
295f49c
 
cc8a692
873bb2e
295f49c
 
 
 
873bb2e
 
 
 
 
 
 
295f49c
 
 
 
 
cc8a692
295f49c
f00fd2c
295f49c
cc8a692
 
f00fd2c
cc8a692
f00fd2c
951cab2
17d0c32
703f693
 
 
 
 
f00fd2c
 
4db1006
 
 
f00fd2c
f4d296a
f00fd2c
 
 
f4d296a
4db1006
 
61404fd
 
f00fd2c
 
 
 
 
61404fd
 
 
 
 
 
f00fd2c
 
 
 
61404fd
f00fd2c
 
61404fd
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import os, json, pickle, datetime, requests, re, gradio as gr
from typing import Optional, List, Dict, Any
from requests.adapters import HTTPAdapter, Retry
from langchain.llms.base import LLM
from langchain.agents import initialize_agent, AgentType, load_tools
from langchain.tools import Tool
from langchain.memory import ConversationBufferMemory
from langchain_experimental.tools.python.tool import PythonREPLTool
from langchain_community.retrievers import WikipediaRetriever
from langchain.tools.retriever import create_retriever_tool
from langchain_community.tools.shell.tool import ShellTool
from langchain.tools import YouTubeSearchTool

# ──────────────────────────────
# βœ… GitHubModelLLM (κ·ΈλŒ€λ‘œ μœ μ§€)
# ──────────────────────────────
class GitHubModelLLM(LLM):
    model: str = "openai/gpt-4.1"
    endpoint: str = "https://models.github.ai/inference"
    token: Optional[str] = os.environ.get("token")
    system_prompt: Optional[str] = "λ„ˆλŠ” PIXAL(Primary Interactive X-ternal Assistant with multi Language)이야.λ„ˆμ˜ κ°œλ°œμžλŠ” μ •μ„±μœ€ μ΄λΌλŠ” 6ν•™λ…„ 파이썬 ν”„λ‘œκ·Έλž˜λ¨Έμ•Ό."
    request_timeout: float = 30.0
    max_retries: int = 2
    backoff_factor: float = 0.3

    @property
    def _llm_type(self) -> str:
        return "github_models_api"

    def _post_chat(self, body: Dict[str, Any]) -> Dict[str, Any]:
        token = self.token or os.getenv("GITHUB_TOKEN") or os.getenv("token")
        if not token:
            raise ValueError("❌ GitHub token이 μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€.")
        session = requests.Session()
        retries = Retry(total=self.max_retries, backoff_factor=self.backoff_factor,
                        status_forcelist=[429, 500, 502, 503, 504])
        session.mount("https://", HTTPAdapter(max_retries=retries))
        session.headers.update({
            "Content-Type": "application/json",
            "Authorization": "Bearer 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"
        })
        resp = session.post(f"{self.endpoint}/chat/completions", json=body, timeout=self.request_timeout)
        resp.raise_for_status()
        return resp.json()

    def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs) -> str:
        body = {"model": self.model, "messages": []}
        if self.system_prompt:
            body["messages"].append({"role": "system", "content": self.system_prompt})
        body["messages"].append({"role": "user", "content": prompt})
        if stop:
            body["stop"] = stop
        res = self._post_chat(body)
        msg = res.get("choices", [{}])[0].get("message", {})
        return msg.get("content") or json.dumps(msg.get("function_call", {}))

# ──────────────────────────────
# βœ… HuggingFace API (ν”„λ‘œν•„)
# ──────────────────────────────
def get_hf_userinfo(hf_token: str) -> dict:
    try:
        r = requests.get("https://huggingface.co/api/whoami-v2",
                         headers={"Authorization": f"Bearer {hf_token}"}, timeout=5)
        if r.status_code == 200:
            j = r.json()
            return {
                "name": j.get("name", "guest"),
                "avatar": j.get("avatar", "https://huggingface.co/front/assets/huggingface_logo-noborder.svg")
            }
    except Exception:
        pass
    return {"name": "guest", "avatar": "https://huggingface.co/front/assets/huggingface_logo-noborder.svg"}

# ──────────────────────────────
# βœ… Agent ꡬ성
# ──────────────────────────────
llm = GitHubModelLLM()
tools = load_tools(["ddg-search", "requests_all", "llm-math"], llm=llm, allow_dangerous_tools=True)
tools += [YouTubeSearchTool(), ShellTool(), PythonREPLTool()]
retriever = WikipediaRetriever(lang="ko")
retriever_tool = create_retriever_tool(retriever, name="wiki_search", description="μœ„ν‚€λ°±κ³Ό 검색 도ꡬ")
tools.append(retriever_tool)

def time_now(_=""):
    now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9)))
    return f"ν˜„μž¬ μ‹œκ°: {now.strftime('%Y-%m-%d %H:%M:%S')} (Asia/Seoul)"
tools.append(Tool(name="time_now", func=time_now, description="ν˜„μž¬ μ‹œκ°„μ„ λ°˜ν™˜ν•©λ‹ˆλ‹€."))

memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
agent = initialize_agent(tools, llm, agent_type=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
                         memory=memory, verbose=True)

# ──────────────────────────────
# βœ… λŒ€ν™” μ €μž₯/λ‘œλ“œ
# ──────────────────────────────
os.chdir(os.path.dirname(os.path.abspath(__file__)))

def summarize_title(history):
    if not history:
        return "μƒˆ λŒ€ν™”"
    text = "\n".join(f"User:{m} AI:{r}" for m, r in history[-3:])
    try:
        title = llm._call(f"이 λŒ€ν™”μ˜ 주제λ₯Ό ν•œ μ€„λ‘œ μš”μ•½ν•΄μ€˜:\n{text}")
        return title.strip().replace("\n", " ")[:50]
    except Exception:
        return "μš”μ•½ μ‹€νŒ¨"

def save_conversation(history, hf_token):
    info = get_hf_userinfo(hf_token)
    username = info["name"]
    if username.lower() == "guest":
        return
    fname = f"{username}.pkl"
    data = {}
    if os.path.exists(fname):
        with open(fname, "rb") as f:
            data = pickle.load(f)
    title = summarize_title(history)
    data[title] = {"title": title, "updated": datetime.datetime.now().isoformat(), "history": history}
    with open(fname, "wb") as f:
        pickle.dump(data, f)

def load_conversation(hf_token, conv_title=None):
    info = get_hf_userinfo(hf_token)
    username = info["name"]
    if username.lower() == "guest":
        return []
    fname = f"{username}.pkl"
    if not os.path.exists(fname):
        return []
    with open(fname, "rb") as f:
        data = pickle.load(f)
    if conv_title and conv_title in data:
        return data[conv_title]["history"]
    elif data:
        latest = max(data.values(), key=lambda x: x["updated"])
        return latest["history"]
    return []

def refresh_conversation_list(hf_token):
    info = get_hf_userinfo(hf_token)
    username = info["name"]
    if username.lower() == "guest":
        return gr.update(choices=[], value=None)
    fname = f"{username}.pkl"
    if not os.path.exists(fname):
        return gr.update(choices=[], value=None)
    with open(fname, "rb") as f:
        data = pickle.load(f)
    titles = sorted(data.keys(), reverse=True)
    return gr.update(choices=titles, value=titles[0] if titles else None)

# ──────────────────────────────
# βœ… Chat ν•¨μˆ˜
# ──────────────────────────────
def chat(message, history,hf_token):
    try:
        raw_response = agent.invoke(message)
        text = str(raw_response)

        # JSON ν˜•μ‹ 응닡 νŒŒμ‹±
        output = text
        match = re.search(r"\{.*\}", text, re.DOTALL)
        if match:
            try:
                obj = json.loads(match.group(0))
                output = (
                    obj.get("action_input")
                    or obj.get("Final Answer")
                    or obj.get("output")
                    or obj.get("content")
                    or text
                )
            except Exception:
                output = text
    except Exception as e:
        output = f"⚠️ 였λ₯˜: {e}"

    # 기둝 μΆ”κ°€ 및 μ¦‰μ‹œ μ €μž₯
    history = history + [(message, output)]
    save_conversation(history, hf_token)
    return history, history, ""

# ──────────────────────────────
# βœ… Gradio UI (ChatGPT μŠ€νƒ€μΌ)
# ──────────────────────────────
with gr.Blocks(theme=gr.themes.Soft(), title="PIXAL Assistant (HuggingFace OAuth)") as demo:
    with gr.Row(elem_id="header"):
        gr.HTML("""
    <div style="background:#f5f5f5;padding:12px;border-bottom:1px solid #ddd;
                display:flex;align-items:center;justify-content:space-between;">
        <h2 style="margin:0;">πŸ€– PIXAL Assistant</h2>
    </div>
    """)
        user_avatar = gr.Image(show_label=False, width=40, height=40, elem_id="avatar")
        user_name = gr.Markdown("둜그인 ν•„μš”", elem_id="username", elem_classes="text-right")
        # --- κΈ°μ‘΄ μ½”λ“œ 쀑 μˆ˜μ • λΆ€λΆ„λ§Œ ---

        login_btn = gr.LoginButton("πŸ” HuggingFace 둜그인", elem_id="login-btn")
        hf_token = gr.State("")

        def on_login(token):
            info = get_hf_userinfo(token)
            return token, info["avatar"], f"**{info['name']}**"

# πŸ”½ 기쑴의 login_btn.login(...) β†’ click()으둜 μˆ˜μ •
        login_btn.click(on_login, inputs=login_btn, outputs=[hf_token, user_avatar, user_name])

    with gr.Row():
        with gr.Column(scale=3):
            chatbot = gr.Chatbot(label=None, height=600, render_markdown=True)
            msg = gr.Textbox(placeholder="λ©”μ‹œμ§€λ₯Ό μž…λ ₯ν•˜μ„Έμš”...", show_label=False)
            send = gr.Button("전솑", variant="primary")
            clear = gr.Button("🧹 μ΄ˆκΈ°ν™”")

            msg.submit(chat, [msg, chatbot, hf_token], [chatbot, chatbot, msg])
            send.click(chat, [msg, chatbot, hf_token], [chatbot, chatbot, msg])
            clear.click(lambda: None, None, chatbot, queue=False)

        with gr.Column(scale=1):
            gr.Markdown("### πŸ’Ύ μ €μž₯된 λŒ€ν™”")
            convo_list = gr.Dropdown(label="λŒ€ν™” 선택", choices=[])
            refresh_btn = gr.Button("πŸ”„ μƒˆλ‘œκ³ μΉ¨")
            load_btn = gr.Button("πŸ“‚ 뢈러였기")

            refresh_btn.click(refresh_conversation_list, [hf_token], convo_list)
            load_btn.click(load_conversation, [hf_token, convo_list], chatbot)

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860)