Spaces:
Sleeping
Sleeping
| import os, json, pickle, datetime, requests, re, gradio as gr | |
| from typing import Optional, List, Dict, Any | |
| from requests.adapters import HTTPAdapter, Retry | |
| from langchain.llms.base import LLM | |
| from langchain.agents import initialize_agent, AgentType, load_tools | |
| from langchain.tools import Tool | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain_experimental.tools.python.tool import PythonREPLTool | |
| from langchain_community.retrievers import WikipediaRetriever | |
| from langchain.tools.retriever import create_retriever_tool | |
| from langchain_community.tools.shell.tool import ShellTool | |
| from langchain.tools import YouTubeSearchTool | |
| # ββββββββββββββββββββββββββββββ | |
| # β GitHubModelLLM (κ·Έλλ‘ μ μ§) | |
| # ββββββββββββββββββββββββββββββ | |
| class GitHubModelLLM(LLM): | |
| model: str = "openai/gpt-4.1" | |
| endpoint: str = "https://models.github.ai/inference" | |
| token: Optional[str] = os.environ.get("token") | |
| system_prompt: Optional[str] = "λλ PIXAL(Primary Interactive X-ternal Assistant with multi Language)μ΄μΌ.λμ κ°λ°μλ μ μ±μ€ μ΄λΌλ 6νλ νμ΄μ¬ νλ‘κ·Έλλ¨ΈμΌ." | |
| request_timeout: float = 30.0 | |
| max_retries: int = 2 | |
| backoff_factor: float = 0.3 | |
| def _llm_type(self) -> str: | |
| return "github_models_api" | |
| def _post_chat(self, body: Dict[str, Any]) -> Dict[str, Any]: | |
| token = self.token or os.getenv("GITHUB_TOKEN") or os.getenv("token") | |
| if not token: | |
| raise ValueError("β GitHub tokenμ΄ μ€μ λμ§ μμμ΅λλ€.") | |
| session = requests.Session() | |
| retries = Retry(total=self.max_retries, backoff_factor=self.backoff_factor, | |
| status_forcelist=[429, 500, 502, 503, 504]) | |
| session.mount("https://", HTTPAdapter(max_retries=retries)) | |
| session.headers.update({ | |
| "Content-Type": "application/json", | |
| "Authorization": "Bearer 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" | |
| }) | |
| resp = session.post(f"{self.endpoint}/chat/completions", json=body, timeout=self.request_timeout) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs) -> str: | |
| body = {"model": self.model, "messages": []} | |
| if self.system_prompt: | |
| body["messages"].append({"role": "system", "content": self.system_prompt}) | |
| body["messages"].append({"role": "user", "content": prompt}) | |
| if stop: | |
| body["stop"] = stop | |
| res = self._post_chat(body) | |
| msg = res.get("choices", [{}])[0].get("message", {}) | |
| return msg.get("content") or json.dumps(msg.get("function_call", {})) | |
| # ββββββββββββββββββββββββββββββ | |
| # β HuggingFace API (νλ‘ν) | |
| # ββββββββββββββββββββββββββββββ | |
| def get_hf_userinfo(hf_token: str) -> dict: | |
| try: | |
| r = requests.get("https://huggingface.co/api/whoami-v2", | |
| headers={"Authorization": f"Bearer {hf_token}"}, timeout=5) | |
| if r.status_code == 200: | |
| j = r.json() | |
| return { | |
| "name": j.get("name", "guest"), | |
| "avatar": j.get("avatar", "https://huggingface.co/front/assets/huggingface_logo-noborder.svg") | |
| } | |
| except Exception: | |
| pass | |
| return {"name": "guest", "avatar": "https://huggingface.co/front/assets/huggingface_logo-noborder.svg"} | |
| # ββββββββββββββββββββββββββββββ | |
| # β Agent κ΅¬μ± | |
| # ββββββββββββββββββββββββββββββ | |
| llm = GitHubModelLLM() | |
| tools = load_tools(["ddg-search", "requests_all", "llm-math"], llm=llm, allow_dangerous_tools=True) | |
| tools += [YouTubeSearchTool(), ShellTool(), PythonREPLTool()] | |
| retriever = WikipediaRetriever(lang="ko") | |
| retriever_tool = create_retriever_tool(retriever, name="wiki_search", description="μν€λ°±κ³Ό κ²μ λꡬ") | |
| tools.append(retriever_tool) | |
| def time_now(_=""): | |
| now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))) | |
| return f"νμ¬ μκ°: {now.strftime('%Y-%m-%d %H:%M:%S')} (Asia/Seoul)" | |
| tools.append(Tool(name="time_now", func=time_now, description="νμ¬ μκ°μ λ°νν©λλ€.")) | |
| memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
| agent = initialize_agent(tools, llm, agent_type=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, | |
| memory=memory, verbose=True) | |
| # ββββββββββββββββββββββββββββββ | |
| # β λν μ μ₯/λ‘λ | |
| # ββββββββββββββββββββββββββββββ | |
| os.chdir(os.path.dirname(os.path.abspath(__file__))) | |
| def summarize_title(history): | |
| if not history: | |
| return "μ λν" | |
| text = "\n".join(f"User:{m} AI:{r}" for m, r in history[-3:]) | |
| try: | |
| title = llm._call(f"μ΄ λνμ μ£Όμ λ₯Ό ν μ€λ‘ μμ½ν΄μ€:\n{text}") | |
| return title.strip().replace("\n", " ")[:50] | |
| except Exception: | |
| return "μμ½ μ€ν¨" | |
| def save_conversation(history, hf_token): | |
| info = get_hf_userinfo(hf_token) | |
| username = info["name"] | |
| if username.lower() == "guest": | |
| return | |
| fname = f"{username}.pkl" | |
| data = {} | |
| if os.path.exists(fname): | |
| with open(fname, "rb") as f: | |
| data = pickle.load(f) | |
| title = summarize_title(history) | |
| data[title] = {"title": title, "updated": datetime.datetime.now().isoformat(), "history": history} | |
| with open(fname, "wb") as f: | |
| pickle.dump(data, f) | |
| def load_conversation(hf_token, conv_title=None): | |
| info = get_hf_userinfo(hf_token) | |
| username = info["name"] | |
| if username.lower() == "guest": | |
| return [] | |
| fname = f"{username}.pkl" | |
| if not os.path.exists(fname): | |
| return [] | |
| with open(fname, "rb") as f: | |
| data = pickle.load(f) | |
| if conv_title and conv_title in data: | |
| return data[conv_title]["history"] | |
| elif data: | |
| latest = max(data.values(), key=lambda x: x["updated"]) | |
| return latest["history"] | |
| return [] | |
| def refresh_conversation_list(hf_token): | |
| info = get_hf_userinfo(hf_token) | |
| username = info["name"] | |
| if username.lower() == "guest": | |
| return gr.update(choices=[], value=None) | |
| fname = f"{username}.pkl" | |
| if not os.path.exists(fname): | |
| return gr.update(choices=[], value=None) | |
| with open(fname, "rb") as f: | |
| data = pickle.load(f) | |
| titles = sorted(data.keys(), reverse=True) | |
| return gr.update(choices=titles, value=titles[0] if titles else None) | |
| # ββββββββββββββββββββββββββββββ | |
| # β Chat ν¨μ | |
| # ββββββββββββββββββββββββββββββ | |
| def chat(message, history,hf_token): | |
| try: | |
| raw_response = agent.invoke(message) | |
| text = str(raw_response) | |
| # JSON νμ μλ΅ νμ± | |
| output = text | |
| match = re.search(r"\{.*\}", text, re.DOTALL) | |
| if match: | |
| try: | |
| obj = json.loads(match.group(0)) | |
| output = ( | |
| obj.get("action_input") | |
| or obj.get("Final Answer") | |
| or obj.get("output") | |
| or obj.get("content") | |
| or text | |
| ) | |
| except Exception: | |
| output = text | |
| except Exception as e: | |
| output = f"β οΈ μ€λ₯: {e}" | |
| # κΈ°λ‘ μΆκ° λ° μ¦μ μ μ₯ | |
| history = history + [(message, output)] | |
| save_conversation(history, hf_token) | |
| return history, history, "" | |
| # ββββββββββββββββββββββββββββββ | |
| # β Gradio UI (ChatGPT μ€νμΌ) | |
| # ββββββββββββββββββββββββββββββ | |
| with gr.Blocks(theme=gr.themes.Soft(), title="PIXAL Assistant (HuggingFace OAuth)") as demo: | |
| with gr.Row(elem_id="header"): | |
| gr.HTML(""" | |
| <div style="background:#f5f5f5;padding:12px;border-bottom:1px solid #ddd; | |
| display:flex;align-items:center;justify-content:space-between;"> | |
| <h2 style="margin:0;">π€ PIXAL Assistant</h2> | |
| </div> | |
| """) | |
| user_avatar = gr.Image(show_label=False, width=40, height=40, elem_id="avatar") | |
| user_name = gr.Markdown("λ‘κ·ΈμΈ νμ", elem_id="username", elem_classes="text-right") | |
| # --- κΈ°μ‘΄ μ½λ μ€ μμ λΆλΆλ§ --- | |
| login_btn = gr.LoginButton("π HuggingFace λ‘κ·ΈμΈ", elem_id="login-btn") | |
| hf_token = gr.State("") | |
| def on_login(token): | |
| info = get_hf_userinfo(token) | |
| return token, info["avatar"], f"**{info['name']}**" | |
| # π½ κΈ°μ‘΄μ login_btn.login(...) β click()μΌλ‘ μμ | |
| login_btn.click(on_login, inputs=login_btn, outputs=[hf_token, user_avatar, user_name]) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot(label=None, height=600, render_markdown=True) | |
| msg = gr.Textbox(placeholder="λ©μμ§λ₯Ό μ λ ₯νμΈμ...", show_label=False) | |
| send = gr.Button("μ μ‘", variant="primary") | |
| clear = gr.Button("π§Ή μ΄κΈ°ν") | |
| msg.submit(chat, [msg, chatbot, hf_token], [chatbot, chatbot, msg]) | |
| send.click(chat, [msg, chatbot, hf_token], [chatbot, chatbot, msg]) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### πΎ μ μ₯λ λν") | |
| convo_list = gr.Dropdown(label="λν μ ν", choices=[]) | |
| refresh_btn = gr.Button("π μλ‘κ³ μΉ¨") | |
| load_btn = gr.Button("π λΆλ¬μ€κΈ°") | |
| refresh_btn.click(refresh_conversation_list, [hf_token], convo_list) | |
| load_btn.click(load_conversation, [hf_token, convo_list], chatbot) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |