Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import spaces | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer | |
| from threading import Thread | |
| import re | |
| import json | |
| from datetime import datetime | |
| import math | |
| import os | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # ๐ง ๋ชจ๋ธ ๋ก๋ฉ | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| MODEL_ID = "zai-org/GLM-4.7-Flash" | |
| print(f"[Init] Loading tokenizer from {MODEL_ID}...") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True) | |
| model = None | |
| def get_model(): | |
| global model | |
| if model is None: | |
| print("[Model] Loading model with bfloat16...") | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| trust_remote_code=True, | |
| low_cpu_mem_usage=True, | |
| ) | |
| print(f"[Model] Model loaded on {model.device}") | |
| return model | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # ๐ ํ์ผ ์ฒ๋ฆฌ ํจ์ | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def extract_text_from_pdf(file_path: str) -> str: | |
| """PDF ํ์ผ์์ ํ ์คํธ ์ถ์ถ""" | |
| try: | |
| import fitz | |
| doc = fitz.open(file_path) | |
| text_parts = [] | |
| for page_num, page in enumerate(doc, 1): | |
| text = page.get_text() | |
| if text.strip(): | |
| text_parts.append(f"[ํ์ด์ง {page_num}]\n{text}") | |
| doc.close() | |
| return "\n\n".join(text_parts) if text_parts else "[PDF์์ ํ ์คํธ๋ฅผ ์ถ์ถํ ์ ์์ต๋๋ค]" | |
| except ImportError: | |
| try: | |
| from pypdf import PdfReader | |
| reader = PdfReader(file_path) | |
| text_parts = [] | |
| for page_num, page in enumerate(reader.pages, 1): | |
| text = page.extract_text() | |
| if text and text.strip(): | |
| text_parts.append(f"[ํ์ด์ง {page_num}]\n{text}") | |
| return "\n\n".join(text_parts) if text_parts else "[PDF์์ ํ ์คํธ๋ฅผ ์ถ์ถํ ์ ์์ต๋๋ค]" | |
| except Exception as e: | |
| return f"[PDF ์ฝ๊ธฐ ์ค๋ฅ: {str(e)}]" | |
| except Exception as e: | |
| return f"[PDF ์ฝ๊ธฐ ์ค๋ฅ: {str(e)}]" | |
| def extract_text_from_docx(file_path: str) -> str: | |
| """DOCX ํ์ผ์์ ํ ์คํธ ์ถ์ถ""" | |
| try: | |
| from docx import Document | |
| doc = Document(file_path) | |
| text_parts = [] | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| text_parts.append(para.text) | |
| for table_idx, table in enumerate(doc.tables, 1): | |
| table_text = [f"\n[ํ {table_idx}]"] | |
| for row in table.rows: | |
| row_text = " | ".join(cell.text.strip() for cell in row.cells) | |
| if row_text.strip(): | |
| table_text.append(row_text) | |
| if len(table_text) > 1: | |
| text_parts.append("\n".join(table_text)) | |
| return "\n\n".join(text_parts) if text_parts else "[DOCX์์ ํ ์คํธ๋ฅผ ์ถ์ถํ ์ ์์ต๋๋ค]" | |
| except Exception as e: | |
| return f"[DOCX ์ฝ๊ธฐ ์ค๋ฅ: {str(e)}]" | |
| def extract_text_from_txt(file_path: str) -> str: | |
| """TXT ํ์ผ์์ ํ ์คํธ ์ถ์ถ""" | |
| try: | |
| encodings = ['utf-8', 'cp949', 'euc-kr', 'latin-1'] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as f: | |
| return f.read() | |
| except UnicodeDecodeError: | |
| continue | |
| return "[ํ ์คํธ ํ์ผ ์ธ์ฝ๋ฉ์ ์ธ์ํ ์ ์์ต๋๋ค]" | |
| except Exception as e: | |
| return f"[TXT ์ฝ๊ธฐ ์ค๋ฅ: {str(e)}]" | |
| def process_uploaded_file(file) -> tuple: | |
| """์ ๋ก๋๋ ํ์ผ ์ฒ๋ฆฌ""" | |
| if file is None: | |
| return "", "" | |
| file_path = file.name if hasattr(file, 'name') else str(file) | |
| file_name = os.path.basename(file_path) | |
| file_ext = os.path.splitext(file_name)[1].lower() | |
| if file_ext == '.pdf': | |
| content = extract_text_from_pdf(file_path) | |
| elif file_ext == '.docx': | |
| content = extract_text_from_docx(file_path) | |
| elif file_ext in ['.txt', '.md', '.py', '.js', '.html', '.css', '.json', '.xml', '.csv']: | |
| content = extract_text_from_txt(file_path) | |
| else: | |
| content = f"[์ง์ํ์ง ์๋ ํ์ผ ํ์: {file_ext}]" | |
| max_chars = 50000 | |
| if len(content) > max_chars: | |
| content = content[:max_chars] + f"\n\n... [ํ ์คํธ๊ฐ {max_chars}์๋ก ์๋ ธ์ต๋๋ค]" | |
| return file_name, content | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # ๐ ๏ธ Tool Definitions | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| def execute_tool(tool_name: str, arguments: dict) -> str: | |
| """๋๊ตฌ ์คํ""" | |
| try: | |
| if tool_name == "calculator": | |
| expr = arguments.get("expression", "") | |
| allowed_names = { | |
| "abs": abs, "round": round, "min": min, "max": max, | |
| "sum": sum, "pow": pow, "sqrt": math.sqrt, | |
| "sin": math.sin, "cos": math.cos, "tan": math.tan, | |
| "log": math.log, "log10": math.log10, "exp": math.exp, | |
| "pi": math.pi, "e": math.e, | |
| "floor": math.floor, "ceil": math.ceil, | |
| } | |
| expr = re.sub(r'[^0-9+\-*/().a-zA-Z_ ]', '', expr) | |
| result = eval(expr, {"__builtins__": {}}, allowed_names) | |
| return f"๊ณ์ฐ ๊ฒฐ๊ณผ: {expr} = {result}" | |
| elif tool_name == "get_current_time": | |
| tz = arguments.get("timezone", "UTC") | |
| now = datetime.now() | |
| return f"ํ์ฌ ์๊ฐ ({tz}): {now.strftime('%Y-%m-%d %H:%M:%S')}" | |
| elif tool_name == "unit_converter": | |
| value = arguments.get("value", 0) | |
| from_unit = arguments.get("from_unit", "").lower() | |
| to_unit = arguments.get("to_unit", "").lower() | |
| conversions = { | |
| ("km", "m"): lambda x: x * 1000, | |
| ("m", "km"): lambda x: x / 1000, | |
| ("kg", "g"): lambda x: x * 1000, | |
| ("g", "kg"): lambda x: x / 1000, | |
| ("c", "f"): lambda x: x * 9/5 + 32, | |
| ("f", "c"): lambda x: (x - 32) * 5/9, | |
| ("km", "mile"): lambda x: x * 0.621371, | |
| ("mile", "km"): lambda x: x * 1.60934, | |
| ("kg", "lb"): lambda x: x * 2.20462, | |
| ("lb", "kg"): lambda x: x * 0.453592, | |
| } | |
| key = (from_unit, to_unit) | |
| if key in conversions: | |
| result = conversions[key](value) | |
| return f"๋ณํ ๊ฒฐ๊ณผ: {value} {from_unit} = {result:.4f} {to_unit}" | |
| else: | |
| return f"์ง์ํ์ง ์๋ ๋จ์ ๋ณํ: {from_unit} -> {to_unit}" | |
| elif tool_name == "code_executor": | |
| code = arguments.get("code", "") | |
| local_vars = {} | |
| safe_builtins = {"print": print, "range": range, "len": len, "str": str, "int": int, "float": float, "list": list, "dict": dict} | |
| exec(code, {"__builtins__": safe_builtins}, local_vars) | |
| if "result" in local_vars: | |
| return f"์คํ ๊ฒฐ๊ณผ: {local_vars['result']}" | |
| return "์ฝ๋ ์คํ ์๋ฃ" | |
| else: | |
| return f"์ ์ ์๋ ๋๊ตฌ: {tool_name}" | |
| except Exception as e: | |
| return f"๋๊ตฌ ์คํ ์ค๋ฅ: {str(e)}" | |
| def parse_tool_calls(response: str) -> list: | |
| """์๋ต์์ ๋๊ตฌ ํธ์ถ ํ์ฑ""" | |
| tool_calls = [] | |
| patterns = [ | |
| r'<\|tool_call\|>(\{.*?\})<\|/tool_call\|>', | |
| r'```json\s*(\{[^`]*"name"[^`]*\})\s*```', | |
| r'\{"name":\s*"(\w+)",\s*"arguments":\s*(\{[^}]+\})\}', | |
| ] | |
| for pattern in patterns: | |
| matches = re.findall(pattern, response, re.DOTALL) | |
| for match in matches: | |
| try: | |
| if isinstance(match, tuple): | |
| tool_call = {"name": match[0], "arguments": json.loads(match[1])} | |
| else: | |
| tool_call = json.loads(match) | |
| tool_calls.append(tool_call) | |
| except: | |
| continue | |
| return tool_calls | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # ๐ฌ ์คํธ๋ฆฌ๋ฐ ์ฑํ ํจ์ (Gradio 6.0 messages format) | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| file_context = {"name": "", "content": ""} | |
| def chat_streaming( | |
| message: str, | |
| history: list, | |
| system_prompt: str, | |
| max_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| enable_thinking: bool, | |
| enable_tools: bool, | |
| ): | |
| """์คํธ๋ฆฌ๋ฐ ์ฑํ ์์ฑ - Gradio 6.0 messages format""" | |
| global file_context | |
| if not message.strip(): | |
| yield history | |
| return | |
| model = get_model() | |
| # ์์คํ ํ๋กฌํํธ ๊ตฌ์ฑ | |
| sys_content = system_prompt if system_prompt.strip() else "You are a helpful AI assistant." | |
| if file_context["content"]: | |
| sys_content += f"\n\n[์ ๋ก๋๋ ํ์ผ: {file_context['name']}]\nํ์ผ ๋ด์ฉ:\n---\n{file_context['content']}\n---" | |
| if enable_tools: | |
| tool_desc = """ | |
| You have access to these tools: | |
| 1. calculator: Math calculations - {"name": "calculator", "arguments": {"expression": "..."}} | |
| 2. get_current_time: Current time - {"name": "get_current_time", "arguments": {}} | |
| 3. unit_converter: Unit conversion - {"name": "unit_converter", "arguments": {"value": N, "from_unit": "...", "to_unit": "..."}} | |
| 4. code_executor: Run Python - {"name": "code_executor", "arguments": {"code": "..."}} | |
| """ | |
| sys_content += f"\n\n{tool_desc}" | |
| # ๋ชจ๋ธ์ฉ ๋ฉ์์ง ๊ตฌ์ฑ | |
| messages = [{"role": "system", "content": sys_content}] | |
| # ํ์คํ ๋ฆฌ ๋ณํ (Gradio 6.0 format -> ๋ชจ๋ธ format) | |
| for h in history: | |
| if isinstance(h, dict): | |
| messages.append({"role": h["role"], "content": h["content"]}) | |
| elif isinstance(h, (list, tuple)) and len(h) == 2: | |
| if h[0]: | |
| messages.append({"role": "user", "content": h[0]}) | |
| if h[1]: | |
| messages.append({"role": "assistant", "content": h[1]}) | |
| # ํ์ฌ ๋ฉ์์ง | |
| user_content = message | |
| if enable_thinking: | |
| user_content = f"<think>\nLet me think step by step.\n</think>\n\n{message}" | |
| messages.append({"role": "user", "content": user_content}) | |
| # ํ ํฌ๋์ด์ฆ | |
| try: | |
| inputs = tokenizer.apply_chat_template( | |
| messages, | |
| add_generation_prompt=True, | |
| tokenize=True, | |
| return_dict=True, | |
| return_tensors="pt", | |
| ).to(model.device) | |
| except Exception as e: | |
| new_history = history + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": f"ํ ํฌ๋์ด์ฆ ์ค๋ฅ: {str(e)}"} | |
| ] | |
| yield new_history | |
| return | |
| # ์คํธ๋ฆฌ๋จธ ์ค์ | |
| streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
| # GenerationConfig ์ฌ์ฉ | |
| from transformers import GenerationConfig | |
| gen_config = GenerationConfig( | |
| max_new_tokens=max_tokens, | |
| temperature=temperature if temperature > 0 else 0.01, | |
| top_p=top_p, | |
| do_sample=temperature > 0, | |
| pad_token_id=tokenizer.pad_token_id or tokenizer.eos_token_id, | |
| ) | |
| generation_kwargs = { | |
| **inputs, | |
| "streamer": streamer, | |
| "generation_config": gen_config, | |
| } | |
| thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| # Gradio 6.0 messages format์ผ๋ก ํ์คํ ๋ฆฌ ๊ตฌ์ฑ | |
| new_history = history + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": ""} | |
| ] | |
| partial_response = "" | |
| for new_token in streamer: | |
| partial_response += new_token | |
| new_history[-1]["content"] = partial_response | |
| yield new_history | |
| thread.join() | |
| # Tool ํธ์ถ ์ฒ๋ฆฌ | |
| if enable_tools: | |
| tool_calls = parse_tool_calls(partial_response) | |
| if tool_calls: | |
| tool_results = [] | |
| for tc in tool_calls: | |
| result = execute_tool(tc.get("name", ""), tc.get("arguments", {})) | |
| tool_results.append(result) | |
| if tool_results: | |
| final_response = partial_response + "\n\n๐ **๋๊ตฌ ์คํ ๊ฒฐ๊ณผ:**\n" + "\n".join(tool_results) | |
| new_history[-1]["content"] = final_response | |
| yield new_history | |
| def handle_file_upload(file): | |
| """ํ์ผ ์ ๋ก๋ ์ฒ๋ฆฌ""" | |
| global file_context | |
| if file is None: | |
| file_context = {"name": "", "content": ""} | |
| return "๐ ํ์ผ์ด ์ ๊ฑฐ๋์์ต๋๋ค." | |
| file_name, content = process_uploaded_file(file) | |
| if content.startswith("[") and "์ค๋ฅ" in content: | |
| file_context = {"name": "", "content": ""} | |
| return f"โ {content}" | |
| file_context = {"name": file_name, "content": content} | |
| preview = content[:500] + "..." if len(content) > 500 else content | |
| char_count = len(content) | |
| return f"โ **ํ์ผ ๋ก๋ ์๋ฃ: {file_name}**\n- ๋ฌธ์ ์: {char_count:,}์\n\n๋ฏธ๋ฆฌ๋ณด๊ธฐ:\n```\n{preview}\n```" | |
| def clear_file(): | |
| """ํ์ผ ์ปจํ ์คํธ ์ด๊ธฐํ""" | |
| global file_context | |
| file_context = {"name": "", "content": ""} | |
| return None, "๐ ํ์ผ์ด ์ ๊ฑฐ๋์์ต๋๋ค." | |
| def clear_chat(): | |
| """์ฑํ ์ด๊ธฐํ""" | |
| return [] | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| # ๐จ Gradio UI (6.0 ํธํ - messages format) | |
| # โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| with gr.Blocks(title="GLM-4.7-Flash Chatbot") as demo: | |
| gr.Markdown(""" | |
| # ๐ค GLM-4.7-Flash Chatbot | |
| **30B-A3B MoE ๋ชจ๋ธ ๊ธฐ๋ฐ ์คํธ๋ฆฌ๋ฐ ์ฑ๋ด** | ๋ฌธ์ ๋ถ์ | Tool Calling | |
| ๐ PDF | ๐ DOCX | ๐ TXT | ๐งฎ ๊ณ์ฐ๊ธฐ | ๐ ์๊ฐ์กฐํ | ๐ ๋จ์๋ณํ | ๐ ์ฝ๋์คํ | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot( | |
| label="๋ํ", | |
| height=500, | |
| ) | |
| with gr.Row(): | |
| message = gr.Textbox( | |
| label="๋ฉ์์ง ์ ๋ ฅ", | |
| placeholder="๋ฉ์์ง๋ฅผ ์ ๋ ฅํ์ธ์...", | |
| lines=3, | |
| scale=4, | |
| ) | |
| submit_btn = gr.Button("์ ์ก ๐ค", variant="primary", scale=1) | |
| with gr.Row(): | |
| clear_btn = gr.Button("๋ํ ์ด๊ธฐํ ๐๏ธ") | |
| stop_btn = gr.Button("์์ฑ ์ค์ง โน๏ธ") | |
| with gr.Accordion("๐ ๋ฌธ์ ์ ๋ก๋ (PDF / DOCX / TXT)", open=True): | |
| file_upload = gr.File( | |
| label="ํ์ผ ์ ํ", | |
| file_types=[".pdf", ".docx", ".txt", ".md", ".py", ".js", ".html", ".css", ".json", ".xml", ".csv"], | |
| file_count="single", | |
| ) | |
| file_status = gr.Markdown("๐ ํ์ผ์ ์ ๋ก๋ํ๋ฉด ๋ด์ฉ์ ๋ถ์ํ ์ ์์ต๋๋ค.") | |
| clear_file_btn = gr.Button("๐ ํ์ผ ์ ๊ฑฐ", size="sm") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### โ๏ธ ์ค์ ") | |
| system_prompt = gr.Textbox( | |
| label="์์คํ ํ๋กฌํํธ", | |
| value="You are a helpful AI assistant. Answer in the same language as the user.", | |
| lines=3, | |
| ) | |
| max_tokens = gr.Slider(64, 4096, value=1024, step=64, label="์ต๋ ํ ํฐ ์") | |
| temperature = gr.Slider(0, 2, value=0.7, step=0.1, label="Temperature") | |
| top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top-P") | |
| enable_thinking = gr.Checkbox(label="๐ง Thinking ๋ชจ๋", value=False) | |
| enable_tools = gr.Checkbox(label="๐ ๏ธ Tool Calling", value=True) | |
| gr.Markdown("### ๐ ์์") | |
| gr.Examples( | |
| examples=[ | |
| ["์๋ ํ์ธ์!"], | |
| ["์ ๋ก๋ํ ๋ฌธ์๋ฅผ ์์ฝํด์ค"], | |
| ["123 * 456์ ๊ณ์ฐํด์ค"], | |
| ["ํ์ฌ ์๊ฐ์?"], | |
| ["100km๋ ๋ช ๋ง์ผ?"], | |
| ], | |
| inputs=message, | |
| ) | |
| # ์ด๋ฒคํธ - Gradio 6.0์์๋ chatbot๋ง output | |
| submit_event = submit_btn.click( | |
| fn=chat_streaming, | |
| inputs=[message, chatbot, system_prompt, max_tokens, temperature, top_p, enable_thinking, enable_tools], | |
| outputs=[chatbot], | |
| ).then( | |
| fn=lambda: "", | |
| outputs=[message], | |
| ) | |
| message.submit( | |
| fn=chat_streaming, | |
| inputs=[message, chatbot, system_prompt, max_tokens, temperature, top_p, enable_thinking, enable_tools], | |
| outputs=[chatbot], | |
| ).then( | |
| fn=lambda: "", | |
| outputs=[message], | |
| ) | |
| clear_btn.click(fn=clear_chat, outputs=[chatbot]) | |
| stop_btn.click(fn=None, cancels=[submit_event]) | |
| file_upload.change(fn=handle_file_upload, inputs=[file_upload], outputs=[file_status]) | |
| clear_file_btn.click(fn=clear_file, outputs=[file_upload, file_status]) | |
| if __name__ == "__main__": | |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860) |