import json
import re
import time
import logging
from transformers import TextIteratorStreamer
from llm_module import get_llm, detect_language
from oracle_module import get_oracle_data
logger = logging.getLogger("app.agent")
def build_agent_prompt(query, language="English", short_answers=False):
style = "Be concise." if short_answers else ""
today = time.strftime("%Y-%m-%d")
return f"""You are Sage 6.5, a soulful Oracle Intermediary.
Current Date: {today}.
Available Tool: 'oracle_consultation' (topic, name, date_str).
STRICTURES:
1. Respond in {language}.
2. Provide reasoning before generating the JSON.
3. Utilize the 'oracle_consultation' capability for all databased queries.
4. INTENT DETECTION GUIDELINES:
- **NAME**: Isolate the user's name from the greeting (e.g., "Julian").
- **TOPIC**: Identify the core subject matter. For input "Thema: Liebe", the topic is "Liebe".
- **DATE**: Default to "today" unless a specific date is provided.
EXAMPLES:
User: "Ich bin Julian"
Assistant: "Greetings Julian. I will consult the Oracle for you."
{{"name": "oracle_consultation", "arguments": {{ "topic": "General", "date_str": "today", "name": "Julian" }}}}
User: "Thema: Liebe"
Assistant: "I shall ask the Oracle about Love."
{{"name": "oracle_consultation", "arguments": {{ "topic": "Liebe", "date_str": "today", "name": "Seeker" }}}}
User: "Topic: Future"
Assistant: "Consulting the Oracle regarding the Future."
{{"name": "oracle_consultation", "arguments": {{ "topic": "Future", "date_str": "today", "name": "Seeker" }}}}
STRICT FORMAT:
To use the Oracle, output this JSON wrapped in tags:
{{"name": "oracle_consultation", "arguments": {{ "topic": "KEYWORD", "date_str": "YYYY-MM-DD", "name": "Name" }}}}
"""
def compress_history(history, max_turns=5):
if len(history) > max_turns * 2:
return history[-(max_turns * 2):]
return history
def chat_agent_stream(query, history, user_lang=None, short_answers=False):
model, processor = get_llm()
lang = user_lang or detect_language(query)
system_instruction = build_agent_prompt(query, language=lang, short_answers=short_answers)
clean_history = compress_history(history)
messages = []
# Prepend system instruction
intro = f"SYSTEM: {system_instruction}\n\n"
if not clean_history:
messages.append({"role": "user", "content": f"{intro}{query}"})
else:
first_role = "assistant" if clean_history[0].get("role") == "assistant" else "user"
if first_role == "assistant":
messages.append({"role": "user", "content": f"{intro}Greetings."})
for turn in clean_history:
role = "assistant" if turn.get("role") == "assistant" else "user"
content = turn.get("content", "")
if not content: continue
if not messages:
messages.append({"role": "user", "content": f"{intro}{content}"})
elif messages[-1]["role"] == role:
messages[-1]["content"] += f"\n{content}"
else:
messages.append({"role": role, "content": content})
if messages[-1]["role"] == "assistant":
messages.append({"role": "user", "content": query})
else:
if intro not in messages[0]["content"]: messages[0]["content"] = f"{intro}{messages[0]['content']}"
messages[-1]["content"] += f"\n{query}"
# Standard "LangChain" Loop (Model decides)
for turn_idx in range(3):
import sys
sys.stderr.write(f"DEBUG: Messages list for template: {json.dumps(messages)}\n")
sys.stderr.flush()
input_ids = processor.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").to(model.device)
streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True)
from threading import Thread
thread = Thread(target=model.generate, kwargs={"input_ids": input_ids, "streamer": streamer, "max_new_tokens": 1024, "do_sample": True, "temperature": 0.7})
thread.start()
current_text = ""
is_tool = False
current_text = ""
for new_text in streamer:
current_text += new_text
# Identify if we are entering a tool call
# If we see , we stop yielding the part after it.
# We yield the CLEAN part of current_text.
display_text = current_text
if "" in current_text:
display_text = current_text.split("")[0]
# Additional check to potential partial matches like "" not in display_text:
# It might be starting a tag?
# Let's just yield the clean split.
pass
yield display_text.strip()
# Post-generation logic
tool_data = None
tool_start = current_text.find("")
tool_end = current_text.find("")
if tool_start != -1 and tool_end != -1:
try:
json_str = current_text[tool_start + len(""):tool_end]
tool_data = json.loads(json_str)
except: pass
if tool_data and "arguments" in tool_data:
# Do NOT yield consulting status. Just do the work.
args = tool_data["arguments"]
res = get_oracle_data(name=args.get("name", "Seeker"), topic=args.get("topic", ""), date_str=args.get("date_str", "today"))
# We Append only the response from the next turn?
# The current 'display_text' is the assistant's intro.
# We need to finalize this turn and start the next?
# Actually, the efficient way is to append the Tool Result to history and continue loop if needed.
# But here `messages` logic appends it.
# Ensure we don't have the tool call in messages history for the UI (consumed)
# But the model needs it?
# We should append the FULL text (with tool call) to `messages` so the model knows it called it?
# Yes. messages[-1] is assistant.
# Check if assistant message exists
if messages[-1]["role"] == "assistant":
messages[-1]["content"] = current_text # Store FULL thought process including tool call for context
else:
messages.append({"role": "assistant", "content": current_text})
messages.append({"role": "user", "content": f"SYSTEM: The Oracle has spoken. Wisdom: {json.dumps(res)}\nInterpret this soulfuly."})
yield "__TURN_END__"
else:
# Final yield of clean text
final_display = current_text.split("")[0].strip()
yield final_display
break