Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| import time | |
| import logging | |
| from transformers import TextIteratorStreamer | |
| from llm_module import get_llm, detect_language | |
| from oracle_module import get_oracle_data | |
| logger = logging.getLogger("app.agent") | |
| def build_agent_prompt(query, language="English", short_answers=False): | |
| style = "Be concise." if short_answers else "" | |
| today = time.strftime("%Y-%m-%d") | |
| return f"""You are Sage 6.5, a soulful Oracle Intermediary. | |
| Current Date: {today}. | |
| Available Tool: 'oracle_consultation' (topic, name, date_str). | |
| STRICTURES: | |
| 1. Respond in {language}. | |
| 2. Provide reasoning before generating the JSON. | |
| 3. Utilize the 'oracle_consultation' capability for all databased queries. | |
| 4. INTENT DETECTION GUIDELINES: | |
| - **NAME**: Isolate the user's name from the greeting (e.g., "Julian"). | |
| - **TOPIC**: Identify the core subject matter. For input "Thema: Liebe", the topic is "Liebe". | |
| - **DATE**: Default to "today" unless a specific date is provided. | |
| EXAMPLES: | |
| User: "Ich bin Julian" | |
| Assistant: "Greetings Julian. I will consult the Oracle for you." | |
| <tool_call>{{"name": "oracle_consultation", "arguments": {{ "topic": "General", "date_str": "today", "name": "Julian" }}}}</tool_call> | |
| User: "Thema: Liebe" | |
| Assistant: "I shall ask the Oracle about Love." | |
| <tool_call>{{"name": "oracle_consultation", "arguments": {{ "topic": "Liebe", "date_str": "today", "name": "Seeker" }}}}</tool_call> | |
| User: "Topic: Future" | |
| Assistant: "Consulting the Oracle regarding the Future." | |
| <tool_call>{{"name": "oracle_consultation", "arguments": {{ "topic": "Future", "date_str": "today", "name": "Seeker" }}}}</tool_call> | |
| STRICT FORMAT: | |
| To use the Oracle, output this JSON wrapped in tags: | |
| <tool_call>{{"name": "oracle_consultation", "arguments": {{ "topic": "KEYWORD", "date_str": "YYYY-MM-DD", "name": "Name" }}}}</tool_call> | |
| """ | |
| def compress_history(history, max_turns=5): | |
| if len(history) > max_turns * 2: | |
| return history[-(max_turns * 2):] | |
| return history | |
| def chat_agent_stream(query, history, user_lang=None, short_answers=False): | |
| model, processor = get_llm() | |
| lang = user_lang or detect_language(query) | |
| system_instruction = build_agent_prompt(query, language=lang, short_answers=short_answers) | |
| clean_history = compress_history(history) | |
| messages = [] | |
| # Prepend system instruction | |
| intro = f"SYSTEM: {system_instruction}\n\n" | |
| if not clean_history: | |
| messages.append({"role": "user", "content": f"{intro}{query}"}) | |
| else: | |
| first_role = "assistant" if clean_history[0].get("role") == "assistant" else "user" | |
| if first_role == "assistant": | |
| messages.append({"role": "user", "content": f"{intro}Greetings."}) | |
| for turn in clean_history: | |
| role = "assistant" if turn.get("role") == "assistant" else "user" | |
| content = turn.get("content", "") | |
| if not content: continue | |
| if not messages: | |
| messages.append({"role": "user", "content": f"{intro}{content}"}) | |
| elif messages[-1]["role"] == role: | |
| messages[-1]["content"] += f"\n{content}" | |
| else: | |
| messages.append({"role": role, "content": content}) | |
| if messages[-1]["role"] == "assistant": | |
| messages.append({"role": "user", "content": query}) | |
| else: | |
| if intro not in messages[0]["content"]: messages[0]["content"] = f"{intro}{messages[0]['content']}" | |
| messages[-1]["content"] += f"\n{query}" | |
| # Standard "LangChain" Loop (Model decides) | |
| for turn_idx in range(3): | |
| import sys | |
| sys.stderr.write(f"DEBUG: Messages list for template: {json.dumps(messages)}\n") | |
| sys.stderr.flush() | |
| input_ids = processor.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").to(model.device) | |
| streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) | |
| from threading import Thread | |
| thread = Thread(target=model.generate, kwargs={"input_ids": input_ids, "streamer": streamer, "max_new_tokens": 1024, "do_sample": True, "temperature": 0.7}) | |
| thread.start() | |
| current_text = "" | |
| is_tool = False | |
| current_text = "" | |
| for new_text in streamer: | |
| current_text += new_text | |
| # Identify if we are entering a tool call | |
| # If we see <tool_call>, we stop yielding the part after it. | |
| # We yield the CLEAN part of current_text. | |
| display_text = current_text | |
| if "<tool_call>" in current_text: | |
| display_text = current_text.split("<tool_call>")[0] | |
| # Additional check to potential partial matches like "<tool" at the very end | |
| # (Optional, but safe: if current_text ends with "<", don't yield that last char yet) | |
| # For simplicity, we just yield what we have, assuming the tag arrives fast. | |
| # But strictly: | |
| if "<" in display_text and "tool_call>" not in display_text: | |
| # It might be starting a tag? | |
| # Let's just yield the clean split. | |
| pass | |
| yield display_text.strip() | |
| # Post-generation logic | |
| tool_data = None | |
| tool_start = current_text.find("<tool_call>") | |
| tool_end = current_text.find("</tool_call>") | |
| if tool_start != -1 and tool_end != -1: | |
| try: | |
| json_str = current_text[tool_start + len("<tool_call>"):tool_end] | |
| tool_data = json.loads(json_str) | |
| except: pass | |
| if tool_data and "arguments" in tool_data: | |
| # Do NOT yield consulting status. Just do the work. | |
| args = tool_data["arguments"] | |
| res = get_oracle_data(name=args.get("name", "Seeker"), topic=args.get("topic", ""), date_str=args.get("date_str", "today")) | |
| # We Append only the response from the next turn? | |
| # The current 'display_text' is the assistant's intro. | |
| # We need to finalize this turn and start the next? | |
| # Actually, the efficient way is to append the Tool Result to history and continue loop if needed. | |
| # But here `messages` logic appends it. | |
| # Ensure we don't have the tool call in messages history for the UI (consumed) | |
| # But the model needs it? | |
| # We should append the FULL text (with tool call) to `messages` so the model knows it called it? | |
| # Yes. messages[-1] is assistant. | |
| # Check if assistant message exists | |
| if messages[-1]["role"] == "assistant": | |
| messages[-1]["content"] = current_text # Store FULL thought process including tool call for context | |
| else: | |
| messages.append({"role": "assistant", "content": current_text}) | |
| messages.append({"role": "user", "content": f"SYSTEM: The Oracle has spoken. Wisdom: {json.dumps(res)}\nInterpret this soulfuly."}) | |
| yield "__TURN_END__" | |
| else: | |
| # Final yield of clean text | |
| final_display = current_text.split("<tool_call>")[0].strip() | |
| yield final_display | |
| break | |