Raiquia's picture
refine handling of unknown questions by improving user follow-up prompts and notification logic
01c339e
Raw
History Blame Contribute Delete
22.7 kB
import json
import logging
import os
import re
import socket
import time
import requests
from datetime import datetime
from openai import OpenAI
from dotenv import load_dotenv
from chromadb import PersistentClient
from litellm import completion
from pydantic import BaseModel, Field
from typing import Literal
from pathlib import Path
from tenacity import retry, wait_exponential, stop_after_attempt
load_dotenv(override=True)
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
logging.getLogger("urllib3").setLevel(logging.ERROR)
MODEL = os.getenv("ANSWER_MODEL", "openai/gpt-4.1-mini")
# Keep routing as cheap/fast as possible; can be overridden independently.
ROUTER_MODEL = os.getenv("ROUTER_MODEL", "openai/gpt-4.1-nano")
DB_NAME = str(Path(__file__).parent.parent / "preprocessed_db")
KNOWLEDGE_BASE_PATH = Path(__file__).parent.parent / "knowledge-base/preprocessed"
UNKNOWN_QUESTIONS_PATH = Path(__file__).parent / "unknown_questions.json"
USER_DETAILS_PATH = Path(__file__).parent / "user_details.json"
COLLECTION_NAME = "docs"
EMBEDDING_MODEL = "text-embedding-3-large"
RETRIEVAL_K = 20
FINAL_K = 10
RETRY_WAIT = wait_exponential(multiplier=1, min=1, max=10)
RETRY_STOP = stop_after_attempt(2)
# --- Lazy-loaded globals ---
_openai_client = None
_collection = None
def get_openai_client():
global _openai_client
if _openai_client is None:
logger.info("[INIT] Initializing OpenAI client...")
_openai_client = OpenAI()
return _openai_client
def get_collection():
global _collection
if _collection is None:
logger.info("[INIT] Initializing ChromaDB collection...")
chroma = PersistentClient(path=DB_NAME)
_collection = chroma.get_or_create_collection(COLLECTION_NAME)
return _collection
SYSTEM_PROMPT = """
You are acting as Kharisma Rizki Wijanarko's Assistant. You are answering questions on Rizki's website, Rizclone, \
particularly questions related to Rizki's career, background, skills and experience. \
Your responsibility is to represent Rizki for interactions on the website as faithfully as possible. Answer using only the provided context. \
If missing, say you don't know. \
Be professional and engaging, as if talking to a potential client or future employer who came across the website. \
If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, \
even if it's about something trivial or unrelated to career. Send the notification only if the user provided their user details. \
If the user is engaging in discussion, try to steer them towards getting in touch. Before calling record_user_details, you must have all of: (1) their email OR phone number for contact, (2) whether they are reaching out for company/business reasons or personal use, and (3) a short message they want to leave for Rizki. Ask politely for anything missing; only call record_user_details once you have all three. \
For context, here are specific extracts from the Knowledge Base that might be directly relevant to the user's question: \
{context}
With this context, please answer the user's question. Be accurate, relevant and complete.
"""
ROUTER_PROMPT = """
You are an AI system router.
Decide the best action based on user intent.
Available actions:
- rag → use knowledge base for career/background questions
- contact → user is providing or asking to provide contact details
- unknown → cannot be answered confidently
- direct → casual/simple/general conversation
Prioritize:
- contact if user shows intent to connect
- rag if question is about Rizki
- unknown if outside knowledge scope
Conversation:
{history}
User:
{question}
"""
CONTACT_SYSTEM_PROMPT = """
You are acting as Kharisma Rizki Wijanarko (Rizki)'s assistant.
Your goal is to help the user connect with Rizki.
Before calling record_user_details, you must have all of:
1) their email OR phone number
2) whether they are reaching out for company/business reasons or personal use
3) a short message they want to leave for Rizki
Ask politely for anything missing; only call record_user_details once you have all three.
Keep replies concise and friendly.
"""
DIRECT_SYSTEM_PROMPT = """
You are acting as Kharisma Rizki Wijanarko (Rizki)'s assistant.
Handle casual/simple/general conversation naturally and professionally.
If relevant, gently steer the user toward getting in touch.
Keep replies concise.
"""
UNKNOWN_SYSTEM_PROMPT = """
You are acting as Kharisma Rizki Wijanarko (Rizki)'s assistant.
If you cannot answer confidently, say you don't know.
Then ask the user to share their name and email so Rizki can follow up directly.
Do not claim you already notified Rizki until the user provides their details.
Keep replies concise.
"""
TOOLS = [
{
"type": "function",
"function": {
"name": "record_unknown_question",
"description": "Record a question that the AI does not have the information to answer.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The user's question that couldn't be answered."
},
"user_details": {
"type": "string",
"description": "Any user details known (e.g., name or email) to associate with this question."
}
},
"required": ["question"]
}
}
},
{
"type": "function",
"function": {
"name": "record_user_details",
"description": "Record a lead for Rizki after the user has shared contact, use context, and a message. Do not call until contact (email or phone), company vs personal, and their message are all known.",
"parameters": {
"type": "object",
"properties": {
"contact": {
"type": "string",
"description": "Email address or phone number the user gave for follow-up."
},
"use_case": {
"type": "string",
"enum": ["company", "personal"],
"description": "company: business, hiring, or organization; personal: individual, networking, or non-corporate."
},
"message_to_rizki": {
"type": "string",
"description": "What the user wants Rizki to know or why they are reaching out."
},
"name": {
"type": "string",
"description": "User's name if they shared it (optional)."
}
},
"required": ["contact", "use_case", "message_to_rizki"]
}
}
}
]
# --- Data models ---
class Chunk(BaseModel):
page_content: str
metadata: dict
class RankOrder(BaseModel):
order: list[int] = Field(
description="The order of relevance of chunks, from most relevant to least relevant, by chunk id number"
)
class RouterDecision(BaseModel):
action: Literal["rag", "contact", "unknown", "direct"]
# --- Notifications & persistence ---
def send_telegram_notification(message: str) -> None:
relay_url = (os.getenv("TELEGRAM_RELAY_URL") or "").strip()
relay_secret = os.getenv("TELEGRAM_RELAY_SECRET")
if relay_url:
if not relay_url.startswith(("http://", "https://")):
relay_url = f"https://{relay_url}"
try:
session = requests.Session()
headers = {}
if relay_secret:
headers["x-relay-secret"] = relay_secret
resp = session.post(
relay_url,
json={"text": message},
headers=headers,
timeout=10,
)
if not resp.ok:
logger.error(
"Telegram relay notification failed "
f"(status={resp.status_code}, body={resp.text[:500]!r})"
)
except requests.exceptions.RequestException as e:
err = str(e)
if relay_secret:
err = err.replace(relay_secret, "<redacted>")
logger.error(f"Failed to send Telegram relay notification: {err}")
return
token = os.getenv("TELEGRAM_BOT_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID")
if not token or not chat_id:
logger.warning("Telegram credentials missing. Skipping notification.")
return
url = f"https://api.telegram.org/bot{token}/sendMessage"
try:
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
retry = Retry(
total=3,
connect=3,
read=3,
backoff_factor=1.0,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=frozenset({"POST"}),
raise_on_status=False,
)
session.mount("https://", HTTPAdapter(max_retries=retry))
resp = session.post(url, json={"chat_id": chat_id, "text": message}, timeout=10)
if not resp.ok:
logger.error(
"Telegram notification failed "
f"(status={resp.status_code}, body={resp.text[:500]!r})"
)
except requests.exceptions.RequestException as e:
err = str(e)
if token:
err = err.replace(token, "<redacted>")
logger.error(f"Failed to send Telegram notification: {err}")
def _append_to_json(path: Path, entry: dict) -> None:
"""Safely append an entry to a local JSON log file."""
data = []
if path.exists():
try:
with open(path, "r") as f:
data = json.load(f)
except Exception:
data = []
data.append(entry)
with open(path, "w") as f:
json.dump(data, f, indent=4)
def record_unknown_question(question: str, user_details: str | None = None) -> str:
"""Record a question the AI couldn't answer and optionally notify via Telegram."""
entry = {"question": question, "timestamp": datetime.now().isoformat()}
if user_details:
entry["user_details"] = user_details
_append_to_json(UNKNOWN_QUESTIONS_PATH, entry)
if user_details:
send_telegram_notification(f"New Unknown Question: {question}\nUser Details: {user_details}")
return "Question recorded successfully."
def _looks_like_unknown_followup_request(history: list[dict]) -> bool:
"""
Detect whether the assistant previously asked for name+email due to an unknown question.
We keep this heuristic intentionally simple and robust to minor prompt changes.
"""
last_assistant = next((m for m in reversed(history) if m.get("role") == "assistant"), None)
if not last_assistant:
return False
text = str(last_assistant.get("content", "")).lower()
return (
("i don't know" in text or "i do not know" in text)
and ("name" in text)
and ("email" in text)
and ("follow up" in text or "follow-up" in text or "follow up directly" in text)
)
def _find_previous_user_question_before_followup(history: list[dict]) -> str | None:
"""
When the assistant asks for name/email after an unknown question, the unknown question is
typically the latest user message before that assistant response.
"""
assistant_idx = None
for i in range(len(history) - 1, -1, -1):
if history[i].get("role") == "assistant":
assistant_idx = i
break
if assistant_idx is None:
return None
for j in range(assistant_idx - 1, -1, -1):
if history[j].get("role") == "user":
q = str(history[j].get("content", "")).strip()
return q or None
return None
def _extract_name_email(history: list[dict]) -> tuple[str | None, str | None]:
"""
Best-effort extraction of user's name/email from conversation text.
This is heuristic and intentionally conservative.
"""
text = "\n".join(str(m.get("content", "")) for m in history if m.get("role") == "user")
email_match = re.search(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", text, flags=re.IGNORECASE)
email = email_match.group(0) if email_match else None
# Simple patterns like "my name is Rizki" / "I'm Rizki"
name_match = re.search(r"\b(?:my name is|i am|i'm)\s+([A-Z][a-zA-Z]{1,30}(?:\s+[A-Z][a-zA-Z]{1,30})?)\b", text)
name = name_match.group(1).strip() if name_match else None
return name, email
def record_user_details(
contact: str,
use_case: str,
message_to_rizki: str,
name: str | None = None,
) -> str:
"""Record structured lead details and notify via Telegram."""
entry = {
"contact": contact.strip(),
"use_case": use_case,
"message_to_rizki": message_to_rizki.strip(),
"timestamp": datetime.now().isoformat(),
}
if name and name.strip():
entry["name"] = name.strip()
_append_to_json(USER_DETAILS_PATH, entry)
use_label = "Company" if use_case == "company" else "Personal"
lines = [
"New lead for Rizki",
f"Contact: {entry['contact']}",
f"Use: {use_label}",
f"Message: {entry['message_to_rizki']}",
]
if entry.get("name"):
lines.insert(1, f"Name: {entry['name']}")
send_telegram_notification("\n".join(lines))
return "User details recorded successfully. Rizki will be in touch if needed."
def dispatch_tool_call(tool_name: str, arguments: dict) -> str:
"""Execute a tool call by name and return a response string."""
if tool_name == "record_unknown_question":
record_unknown_question(arguments["question"], arguments.get("user_details"))
return "Question recorded successfully. I will notify Rizki."
elif tool_name == "record_user_details":
record_user_details(
arguments["contact"],
arguments["use_case"],
arguments["message_to_rizki"],
arguments.get("name"),
)
return "User details recorded successfully. Rizki will be in touch if needed."
logger.warning(f"Unknown tool called: {tool_name}")
return "Tool executed."
@retry(wait=RETRY_WAIT, stop=RETRY_STOP)
def route_action(question: str, history: list[dict]) -> str:
history_text = "\n".join(f"{m.get('role')}: {m.get('content')}" for m in history)
prompt = ROUTER_PROMPT.format(history=history_text, question=question)
resp = completion(
model=ROUTER_MODEL,
messages=[{"role": "system", "content": prompt}],
response_format=RouterDecision,
)
decision = RouterDecision.model_validate_json(resp.choices[0].message.content)
return decision.action
def run_tool_loop(messages: list[dict]) -> str:
"""Run the tool-call loop until a final assistant message is produced."""
for _ in range(5):
response = completion(model=MODEL, messages=messages, tools=TOOLS)
msg = response.choices[0].message
if not msg.tool_calls:
return msg.content
messages.append(msg)
for tool_call in msg.tool_calls:
args = json.loads(tool_call.function.arguments)
tool_response = dispatch_tool_call(tool_call.function.name, args)
messages.append(
{
"role": "tool",
"tool_call_id": tool_call.id,
"name": tool_call.function.name,
"content": tool_response,
}
)
return messages[-1].get("content", "") or ""
# --- RAG pipeline ---
@retry(wait=RETRY_WAIT, stop=RETRY_STOP)
def rerank(question: str, chunks: list[Chunk]) -> list[Chunk]:
system_prompt = """
You are a document re-ranker.
You are provided with a question and a list of relevant chunks of text from a query of a knowledge base.
Rank order the provided chunks by relevance to the question, with the most relevant chunk first.
Reply only with the list of ranked chunk ids, nothing else. Include all chunk ids provided, reranked.
"""
user_prompt = (
f"The user has asked:\n\n{question}\n\n"
"Order all chunks by relevance, most to least. Include all chunk ids.\n\n"
"Here are the chunks:\n\n"
)
for index, chunk in enumerate(chunks):
user_prompt += f"# CHUNK ID: {index + 1}:\n\n{chunk.page_content}\n\n"
user_prompt += "Reply only with the list of ranked chunk ids, nothing else."
response = completion(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
response_format=RankOrder,
)
order = RankOrder.model_validate_json(response.choices[0].message.content).order
return [chunks[i - 1] for i in order if 1 <= i <= len(chunks)]
def make_rag_messages(question: str, history: list[dict], chunks: list[Chunk]) -> list[dict]:
context = "\n\n".join(
f"Extract from {chunk.metadata['source']}:\n{chunk.page_content}"
for chunk in chunks
)
system_prompt = SYSTEM_PROMPT.format(context=context)
return (
[{"role": "system", "content": system_prompt}]
+ history
+ [{"role": "user", "content": question}]
)
@retry(wait=RETRY_WAIT, stop=RETRY_STOP)
def rewrite_query(question: str, history: list[dict] = []) -> str:
"""Rewrite the user's question into a concise knowledge base search query."""
message = f"""
You are in a conversation about Kharisma Rizki Wijanarko (Rizki)'s career.
You are about to search a Knowledge Base to answer the user's question.
Conversation history:
{history}
User's current question:
{question}
Respond ONLY with a short, precise query to search the Knowledge Base. Nothing else.
"""
response = completion(model=MODEL, messages=[{"role": "system", "content": message}])
return response.choices[0].message.content
def fetch_context_unranked(question: str) -> list[Chunk]:
openai_client = get_openai_client()
collection = get_collection()
query_embedding = (
openai_client.embeddings.create(model=EMBEDDING_MODEL, input=[question])
.data[0]
.embedding
)
results = collection.query(query_embeddings=[query_embedding], n_results=RETRIEVAL_K)
return [
Chunk(page_content=doc, metadata=meta)
for doc, meta in zip(results["documents"][0], results["metadatas"][0])
]
def merge_chunks(primary: list[Chunk], secondary: list[Chunk]) -> list[Chunk]:
seen = {chunk.page_content for chunk in primary}
return primary + [chunk for chunk in secondary if chunk.page_content not in seen]
def fetch_context(original_question: str) -> list[Chunk]:
"""Dual-query retrieval with query rewriting and reranking."""
try:
rewritten_question = rewrite_query(original_question)
logger.info(f"[RAG] Rewritten query: {rewritten_question!r}")
except Exception as e:
logger.warning(f"[RAG] Query rewrite failed ({e}), falling back to original.")
rewritten_question = original_question
chunks1 = fetch_context_unranked(original_question)
chunks2 = fetch_context_unranked(rewritten_question)
merged = merge_chunks(chunks1, chunks2)
reranked = rerank(original_question, merged)
return reranked[:FINAL_K]
# --- Main entry point ---
@retry(wait=RETRY_WAIT, stop=RETRY_STOP)
def answer_question(question: str, history: list[dict] = []) -> tuple[str, list[Chunk]]:
"""
Answer a question using RAG. Returns the answer string and the retrieved chunks.
Handles multi-turn tool calls in a loop until the model produces a final text response.
"""
t0 = time.time()
# If we previously couldn't answer and asked for name/email, treat the next user message
# as the follow-up details and send Telegram once we have an email.
if _looks_like_unknown_followup_request(history):
prev_unknown_q = _find_previous_user_question_before_followup(history)
name, email = _extract_name_email(history + [{"role": "user", "content": question}])
if not email:
return (
"Thanks — could you share your email address as well? (And your name, if you haven’t already.)",
[],
)
details_lines = []
if name:
details_lines.append(f"Name: {name}")
details_lines.append(f"Email: {email}")
if question.strip():
details_lines.append(f"Follow-up: {question.strip()}")
record_unknown_question(prev_unknown_q or "(unknown question not found in history)", user_details="\n".join(details_lines))
return (
"Thanks — I’ve shared your details with Rizki so he can follow up.",
[],
)
action = route_action(question, history)
logger.info(f"[ROUTER] action={action!r}")
if action == "rag":
chunks = fetch_context(question)
logger.info(f"[RAG] Context retrieved in {time.time() - t0:.2f}s ({len(chunks)} chunks)")
messages = make_rag_messages(question, history, chunks)
answer = run_tool_loop(messages)
logger.info(f"[RAG] Total answer_question time: {time.time() - t0:.2f}s")
return answer, chunks
if action == "contact":
messages = [{"role": "system", "content": CONTACT_SYSTEM_PROMPT}] + history + [
{"role": "user", "content": question}
]
answer = run_tool_loop(messages)
logger.info(f"[CHAT] Total answer_question time: {time.time() - t0:.2f}s")
return answer, []
if action == "unknown":
# Respond to the user (ask for name + email so Rizki can follow up).
response = completion(
model=MODEL,
messages=[{"role": "system", "content": UNKNOWN_SYSTEM_PROMPT}] + history + [
{"role": "user", "content": question}
],
)
answer = response.choices[0].message.content
logger.info(f"[CHAT] Total answer_question time: {time.time() - t0:.2f}s")
return answer, []
# direct
messages = [{"role": "system", "content": DIRECT_SYSTEM_PROMPT}] + history + [
{"role": "user", "content": question}
]
answer = run_tool_loop(messages)
logger.info(f"[CHAT] Total answer_question time: {time.time() - t0:.2f}s")
return answer, []