import json import re import time import logging from transformers import TextIteratorStreamer from llm_module import get_llm, detect_language from oracle_module import get_oracle_data logger = logging.getLogger("app.agent") def build_agent_prompt(query, language="English", short_answers=False): style = "Be concise." if short_answers else "" today = time.strftime("%Y-%m-%d") return f"""You are Sage 6.5, a soulful Oracle Intermediary. Current Date: {today}. Available Tool: 'oracle_consultation' (topic, name, date_str). STRICTURES: 1. Respond in {language}. 2. Provide reasoning before generating the JSON. 3. Utilize the 'oracle_consultation' capability for all databased queries. 4. INTENT DETECTION GUIDELINES: - **NAME**: Isolate the user's name from the greeting (e.g., "Julian"). - **TOPIC**: Identify the core subject matter. For input "Thema: Liebe", the topic is "Liebe". - **DATE**: Default to "today" unless a specific date is provided. EXAMPLES: User: "Ich bin Julian" Assistant: "Greetings Julian. I will consult the Oracle for you." {{"name": "oracle_consultation", "arguments": {{ "topic": "General", "date_str": "today", "name": "Julian" }}}} User: "Thema: Liebe" Assistant: "I shall ask the Oracle about Love." {{"name": "oracle_consultation", "arguments": {{ "topic": "Liebe", "date_str": "today", "name": "Seeker" }}}} User: "Topic: Future" Assistant: "Consulting the Oracle regarding the Future." {{"name": "oracle_consultation", "arguments": {{ "topic": "Future", "date_str": "today", "name": "Seeker" }}}} STRICT FORMAT: To use the Oracle, output this JSON wrapped in tags: {{"name": "oracle_consultation", "arguments": {{ "topic": "KEYWORD", "date_str": "YYYY-MM-DD", "name": "Name" }}}} """ def compress_history(history, max_turns=5): if len(history) > max_turns * 2: return history[-(max_turns * 2):] return history def chat_agent_stream(query, history, user_lang=None, short_answers=False): model, processor = get_llm() lang = user_lang or detect_language(query) system_instruction = build_agent_prompt(query, language=lang, short_answers=short_answers) clean_history = compress_history(history) messages = [] # Prepend system instruction intro = f"SYSTEM: {system_instruction}\n\n" if not clean_history: messages.append({"role": "user", "content": f"{intro}{query}"}) else: first_role = "assistant" if clean_history[0].get("role") == "assistant" else "user" if first_role == "assistant": messages.append({"role": "user", "content": f"{intro}Greetings."}) for turn in clean_history: role = "assistant" if turn.get("role") == "assistant" else "user" content = turn.get("content", "") if not content: continue if not messages: messages.append({"role": "user", "content": f"{intro}{content}"}) elif messages[-1]["role"] == role: messages[-1]["content"] += f"\n{content}" else: messages.append({"role": role, "content": content}) if messages[-1]["role"] == "assistant": messages.append({"role": "user", "content": query}) else: if intro not in messages[0]["content"]: messages[0]["content"] = f"{intro}{messages[0]['content']}" messages[-1]["content"] += f"\n{query}" # Standard "LangChain" Loop (Model decides) for turn_idx in range(3): import sys sys.stderr.write(f"DEBUG: Messages list for template: {json.dumps(messages)}\n") sys.stderr.flush() input_ids = processor.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").to(model.device) streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) from threading import Thread thread = Thread(target=model.generate, kwargs={"input_ids": input_ids, "streamer": streamer, "max_new_tokens": 1024, "do_sample": True, "temperature": 0.7}) thread.start() current_text = "" is_tool = False current_text = "" for new_text in streamer: current_text += new_text # Identify if we are entering a tool call # If we see , we stop yielding the part after it. # We yield the CLEAN part of current_text. display_text = current_text if "" in current_text: display_text = current_text.split("")[0] # Additional check to potential partial matches like "" not in display_text: # It might be starting a tag? # Let's just yield the clean split. pass yield display_text.strip() # Post-generation logic tool_data = None tool_start = current_text.find("") tool_end = current_text.find("") if tool_start != -1 and tool_end != -1: try: json_str = current_text[tool_start + len(""):tool_end] tool_data = json.loads(json_str) except: pass if tool_data and "arguments" in tool_data: # Do NOT yield consulting status. Just do the work. args = tool_data["arguments"] res = get_oracle_data(name=args.get("name", "Seeker"), topic=args.get("topic", ""), date_str=args.get("date_str", "today")) # We Append only the response from the next turn? # The current 'display_text' is the assistant's intro. # We need to finalize this turn and start the next? # Actually, the efficient way is to append the Tool Result to history and continue loop if needed. # But here `messages` logic appends it. # Ensure we don't have the tool call in messages history for the UI (consumed) # But the model needs it? # We should append the FULL text (with tool call) to `messages` so the model knows it called it? # Yes. messages[-1] is assistant. # Check if assistant message exists if messages[-1]["role"] == "assistant": messages[-1]["content"] = current_text # Store FULL thought process including tool call for context else: messages.append({"role": "assistant", "content": current_text}) messages.append({"role": "user", "content": f"SYSTEM: The Oracle has spoken. Wisdom: {json.dumps(res)}\nInterpret this soulfuly."}) yield "__TURN_END__" else: # Final yield of clean text final_display = current_text.split("")[0].strip() yield final_display break