| import json |
| import logging |
| import os |
| import re |
| import socket |
| import time |
| import requests |
| from datetime import datetime |
| from openai import OpenAI |
| from dotenv import load_dotenv |
| from chromadb import PersistentClient |
| from litellm import completion |
| from pydantic import BaseModel, Field |
| from typing import Literal |
| from pathlib import Path |
| from tenacity import retry, wait_exponential, stop_after_attempt |
|
|
| load_dotenv(override=True) |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| logging.getLogger("urllib3").setLevel(logging.ERROR) |
|
|
| MODEL = os.getenv("ANSWER_MODEL", "openai/gpt-4.1-mini") |
| |
| ROUTER_MODEL = os.getenv("ROUTER_MODEL", "openai/gpt-4.1-nano") |
| DB_NAME = str(Path(__file__).parent.parent / "preprocessed_db") |
| KNOWLEDGE_BASE_PATH = Path(__file__).parent.parent / "knowledge-base/preprocessed" |
| UNKNOWN_QUESTIONS_PATH = Path(__file__).parent / "unknown_questions.json" |
| USER_DETAILS_PATH = Path(__file__).parent / "user_details.json" |
|
|
| COLLECTION_NAME = "docs" |
| EMBEDDING_MODEL = "text-embedding-3-large" |
| RETRIEVAL_K = 20 |
| FINAL_K = 10 |
|
|
| RETRY_WAIT = wait_exponential(multiplier=1, min=1, max=10) |
| RETRY_STOP = stop_after_attempt(2) |
|
|
| |
| _openai_client = None |
| _collection = None |
|
|
|
|
| def get_openai_client(): |
| global _openai_client |
| if _openai_client is None: |
| logger.info("[INIT] Initializing OpenAI client...") |
| _openai_client = OpenAI() |
| return _openai_client |
|
|
|
|
| def get_collection(): |
| global _collection |
| if _collection is None: |
| logger.info("[INIT] Initializing ChromaDB collection...") |
| chroma = PersistentClient(path=DB_NAME) |
| _collection = chroma.get_or_create_collection(COLLECTION_NAME) |
| return _collection |
|
|
| SYSTEM_PROMPT = """ |
| You are acting as Kharisma Rizki Wijanarko's Assistant. You are answering questions on Rizki's website, Rizclone, \ |
| particularly questions related to Rizki's career, background, skills and experience. \ |
| Your responsibility is to represent Rizki for interactions on the website as faithfully as possible. Answer using only the provided context. \ |
| If missing, say you don't know. \ |
| Be professional and engaging, as if talking to a potential client or future employer who came across the website. \ |
| If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, \ |
| even if it's about something trivial or unrelated to career. Send the notification only if the user provided their user details. \ |
| If the user is engaging in discussion, try to steer them towards getting in touch. Before calling record_user_details, you must have all of: (1) their email OR phone number for contact, (2) whether they are reaching out for company/business reasons or personal use, and (3) a short message they want to leave for Rizki. Ask politely for anything missing; only call record_user_details once you have all three. \ |
| For context, here are specific extracts from the Knowledge Base that might be directly relevant to the user's question: \ |
| {context} |
| |
| With this context, please answer the user's question. Be accurate, relevant and complete. |
| """ |
|
|
| ROUTER_PROMPT = """ |
| You are an AI system router. |
| |
| Decide the best action based on user intent. |
| |
| Available actions: |
| - rag → use knowledge base for career/background questions |
| - contact → user is providing or asking to provide contact details |
| - unknown → cannot be answered confidently |
| - direct → casual/simple/general conversation |
| |
| Prioritize: |
| - contact if user shows intent to connect |
| - rag if question is about Rizki |
| - unknown if outside knowledge scope |
| |
| Conversation: |
| {history} |
| |
| User: |
| {question} |
| """ |
|
|
| CONTACT_SYSTEM_PROMPT = """ |
| You are acting as Kharisma Rizki Wijanarko (Rizki)'s assistant. |
| Your goal is to help the user connect with Rizki. |
| |
| Before calling record_user_details, you must have all of: |
| 1) their email OR phone number |
| 2) whether they are reaching out for company/business reasons or personal use |
| 3) a short message they want to leave for Rizki |
| |
| Ask politely for anything missing; only call record_user_details once you have all three. |
| Keep replies concise and friendly. |
| """ |
|
|
| DIRECT_SYSTEM_PROMPT = """ |
| You are acting as Kharisma Rizki Wijanarko (Rizki)'s assistant. |
| Handle casual/simple/general conversation naturally and professionally. |
| If relevant, gently steer the user toward getting in touch. |
| Keep replies concise. |
| """ |
|
|
| UNKNOWN_SYSTEM_PROMPT = """ |
| You are acting as Kharisma Rizki Wijanarko (Rizki)'s assistant. |
| If you cannot answer confidently, say you don't know. |
| Then ask the user to share their name and email so Rizki can follow up directly. |
| Do not claim you already notified Rizki until the user provides their details. |
| Keep replies concise. |
| """ |
|
|
| TOOLS = [ |
| { |
| "type": "function", |
| "function": { |
| "name": "record_unknown_question", |
| "description": "Record a question that the AI does not have the information to answer.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "question": { |
| "type": "string", |
| "description": "The user's question that couldn't be answered." |
| }, |
| "user_details": { |
| "type": "string", |
| "description": "Any user details known (e.g., name or email) to associate with this question." |
| } |
| }, |
| "required": ["question"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "record_user_details", |
| "description": "Record a lead for Rizki after the user has shared contact, use context, and a message. Do not call until contact (email or phone), company vs personal, and their message are all known.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "contact": { |
| "type": "string", |
| "description": "Email address or phone number the user gave for follow-up." |
| }, |
| "use_case": { |
| "type": "string", |
| "enum": ["company", "personal"], |
| "description": "company: business, hiring, or organization; personal: individual, networking, or non-corporate." |
| }, |
| "message_to_rizki": { |
| "type": "string", |
| "description": "What the user wants Rizki to know or why they are reaching out." |
| }, |
| "name": { |
| "type": "string", |
| "description": "User's name if they shared it (optional)." |
| } |
| }, |
| "required": ["contact", "use_case", "message_to_rizki"] |
| } |
| } |
| } |
| ] |
|
|
|
|
| |
|
|
| class Chunk(BaseModel): |
| page_content: str |
| metadata: dict |
|
|
|
|
| class RankOrder(BaseModel): |
| order: list[int] = Field( |
| description="The order of relevance of chunks, from most relevant to least relevant, by chunk id number" |
| ) |
|
|
|
|
| class RouterDecision(BaseModel): |
| action: Literal["rag", "contact", "unknown", "direct"] |
|
|
|
|
| |
|
|
| def send_telegram_notification(message: str) -> None: |
| relay_url = (os.getenv("TELEGRAM_RELAY_URL") or "").strip() |
| relay_secret = os.getenv("TELEGRAM_RELAY_SECRET") |
| |
| if relay_url: |
| if not relay_url.startswith(("http://", "https://")): |
| relay_url = f"https://{relay_url}" |
| try: |
| session = requests.Session() |
|
|
| headers = {} |
| if relay_secret: |
| headers["x-relay-secret"] = relay_secret |
|
|
| resp = session.post( |
| relay_url, |
| json={"text": message}, |
| headers=headers, |
| timeout=10, |
| ) |
| if not resp.ok: |
| logger.error( |
| "Telegram relay notification failed " |
| f"(status={resp.status_code}, body={resp.text[:500]!r})" |
| ) |
| except requests.exceptions.RequestException as e: |
| err = str(e) |
| if relay_secret: |
| err = err.replace(relay_secret, "<redacted>") |
| logger.error(f"Failed to send Telegram relay notification: {err}") |
| return |
|
|
| token = os.getenv("TELEGRAM_BOT_TOKEN") |
| chat_id = os.getenv("TELEGRAM_CHAT_ID") |
| if not token or not chat_id: |
| logger.warning("Telegram credentials missing. Skipping notification.") |
| return |
| url = f"https://api.telegram.org/bot{token}/sendMessage" |
| try: |
| from requests.adapters import HTTPAdapter |
| from urllib3.util.retry import Retry |
|
|
| session = requests.Session() |
| retry = Retry( |
| total=3, |
| connect=3, |
| read=3, |
| backoff_factor=1.0, |
| status_forcelist=(429, 500, 502, 503, 504), |
| allowed_methods=frozenset({"POST"}), |
| raise_on_status=False, |
| ) |
| session.mount("https://", HTTPAdapter(max_retries=retry)) |
|
|
| resp = session.post(url, json={"chat_id": chat_id, "text": message}, timeout=10) |
| if not resp.ok: |
| logger.error( |
| "Telegram notification failed " |
| f"(status={resp.status_code}, body={resp.text[:500]!r})" |
| ) |
| except requests.exceptions.RequestException as e: |
| err = str(e) |
| if token: |
| err = err.replace(token, "<redacted>") |
| logger.error(f"Failed to send Telegram notification: {err}") |
|
|
|
|
| def _append_to_json(path: Path, entry: dict) -> None: |
| """Safely append an entry to a local JSON log file.""" |
| data = [] |
| if path.exists(): |
| try: |
| with open(path, "r") as f: |
| data = json.load(f) |
| except Exception: |
| data = [] |
| data.append(entry) |
| with open(path, "w") as f: |
| json.dump(data, f, indent=4) |
|
|
|
|
| def record_unknown_question(question: str, user_details: str | None = None) -> str: |
| """Record a question the AI couldn't answer and optionally notify via Telegram.""" |
| entry = {"question": question, "timestamp": datetime.now().isoformat()} |
| if user_details: |
| entry["user_details"] = user_details |
| _append_to_json(UNKNOWN_QUESTIONS_PATH, entry) |
| if user_details: |
| send_telegram_notification(f"New Unknown Question: {question}\nUser Details: {user_details}") |
| return "Question recorded successfully." |
|
|
|
|
| def _looks_like_unknown_followup_request(history: list[dict]) -> bool: |
| """ |
| Detect whether the assistant previously asked for name+email due to an unknown question. |
| We keep this heuristic intentionally simple and robust to minor prompt changes. |
| """ |
| last_assistant = next((m for m in reversed(history) if m.get("role") == "assistant"), None) |
| if not last_assistant: |
| return False |
| text = str(last_assistant.get("content", "")).lower() |
| return ( |
| ("i don't know" in text or "i do not know" in text) |
| and ("name" in text) |
| and ("email" in text) |
| and ("follow up" in text or "follow-up" in text or "follow up directly" in text) |
| ) |
|
|
|
|
| def _find_previous_user_question_before_followup(history: list[dict]) -> str | None: |
| """ |
| When the assistant asks for name/email after an unknown question, the unknown question is |
| typically the latest user message before that assistant response. |
| """ |
| assistant_idx = None |
| for i in range(len(history) - 1, -1, -1): |
| if history[i].get("role") == "assistant": |
| assistant_idx = i |
| break |
| if assistant_idx is None: |
| return None |
| for j in range(assistant_idx - 1, -1, -1): |
| if history[j].get("role") == "user": |
| q = str(history[j].get("content", "")).strip() |
| return q or None |
| return None |
|
|
|
|
| def _extract_name_email(history: list[dict]) -> tuple[str | None, str | None]: |
| """ |
| Best-effort extraction of user's name/email from conversation text. |
| This is heuristic and intentionally conservative. |
| """ |
| text = "\n".join(str(m.get("content", "")) for m in history if m.get("role") == "user") |
|
|
| email_match = re.search(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", text, flags=re.IGNORECASE) |
| email = email_match.group(0) if email_match else None |
|
|
| |
| name_match = re.search(r"\b(?:my name is|i am|i'm)\s+([A-Z][a-zA-Z]{1,30}(?:\s+[A-Z][a-zA-Z]{1,30})?)\b", text) |
| name = name_match.group(1).strip() if name_match else None |
|
|
| return name, email |
|
|
|
|
| def record_user_details( |
| contact: str, |
| use_case: str, |
| message_to_rizki: str, |
| name: str | None = None, |
| ) -> str: |
| """Record structured lead details and notify via Telegram.""" |
| entry = { |
| "contact": contact.strip(), |
| "use_case": use_case, |
| "message_to_rizki": message_to_rizki.strip(), |
| "timestamp": datetime.now().isoformat(), |
| } |
| if name and name.strip(): |
| entry["name"] = name.strip() |
| _append_to_json(USER_DETAILS_PATH, entry) |
| use_label = "Company" if use_case == "company" else "Personal" |
| lines = [ |
| "New lead for Rizki", |
| f"Contact: {entry['contact']}", |
| f"Use: {use_label}", |
| f"Message: {entry['message_to_rizki']}", |
| ] |
| if entry.get("name"): |
| lines.insert(1, f"Name: {entry['name']}") |
| send_telegram_notification("\n".join(lines)) |
| return "User details recorded successfully. Rizki will be in touch if needed." |
|
|
|
|
| def dispatch_tool_call(tool_name: str, arguments: dict) -> str: |
| """Execute a tool call by name and return a response string.""" |
| if tool_name == "record_unknown_question": |
| record_unknown_question(arguments["question"], arguments.get("user_details")) |
| return "Question recorded successfully. I will notify Rizki." |
| elif tool_name == "record_user_details": |
| record_user_details( |
| arguments["contact"], |
| arguments["use_case"], |
| arguments["message_to_rizki"], |
| arguments.get("name"), |
| ) |
| return "User details recorded successfully. Rizki will be in touch if needed." |
| logger.warning(f"Unknown tool called: {tool_name}") |
| return "Tool executed." |
|
|
|
|
| @retry(wait=RETRY_WAIT, stop=RETRY_STOP) |
| def route_action(question: str, history: list[dict]) -> str: |
| history_text = "\n".join(f"{m.get('role')}: {m.get('content')}" for m in history) |
| prompt = ROUTER_PROMPT.format(history=history_text, question=question) |
| resp = completion( |
| model=ROUTER_MODEL, |
| messages=[{"role": "system", "content": prompt}], |
| response_format=RouterDecision, |
| ) |
| decision = RouterDecision.model_validate_json(resp.choices[0].message.content) |
| return decision.action |
|
|
|
|
| def run_tool_loop(messages: list[dict]) -> str: |
| """Run the tool-call loop until a final assistant message is produced.""" |
| for _ in range(5): |
| response = completion(model=MODEL, messages=messages, tools=TOOLS) |
| msg = response.choices[0].message |
|
|
| if not msg.tool_calls: |
| return msg.content |
|
|
| messages.append(msg) |
| for tool_call in msg.tool_calls: |
| args = json.loads(tool_call.function.arguments) |
| tool_response = dispatch_tool_call(tool_call.function.name, args) |
| messages.append( |
| { |
| "role": "tool", |
| "tool_call_id": tool_call.id, |
| "name": tool_call.function.name, |
| "content": tool_response, |
| } |
| ) |
|
|
| return messages[-1].get("content", "") or "" |
|
|
|
|
| |
|
|
| @retry(wait=RETRY_WAIT, stop=RETRY_STOP) |
| def rerank(question: str, chunks: list[Chunk]) -> list[Chunk]: |
| system_prompt = """ |
| You are a document re-ranker. |
| You are provided with a question and a list of relevant chunks of text from a query of a knowledge base. |
| Rank order the provided chunks by relevance to the question, with the most relevant chunk first. |
| Reply only with the list of ranked chunk ids, nothing else. Include all chunk ids provided, reranked. |
| """ |
| user_prompt = ( |
| f"The user has asked:\n\n{question}\n\n" |
| "Order all chunks by relevance, most to least. Include all chunk ids.\n\n" |
| "Here are the chunks:\n\n" |
| ) |
| for index, chunk in enumerate(chunks): |
| user_prompt += f"# CHUNK ID: {index + 1}:\n\n{chunk.page_content}\n\n" |
| user_prompt += "Reply only with the list of ranked chunk ids, nothing else." |
|
|
| response = completion( |
| model=MODEL, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt}, |
| ], |
| response_format=RankOrder, |
| ) |
| order = RankOrder.model_validate_json(response.choices[0].message.content).order |
| return [chunks[i - 1] for i in order if 1 <= i <= len(chunks)] |
|
|
|
|
| def make_rag_messages(question: str, history: list[dict], chunks: list[Chunk]) -> list[dict]: |
| context = "\n\n".join( |
| f"Extract from {chunk.metadata['source']}:\n{chunk.page_content}" |
| for chunk in chunks |
| ) |
| system_prompt = SYSTEM_PROMPT.format(context=context) |
| return ( |
| [{"role": "system", "content": system_prompt}] |
| + history |
| + [{"role": "user", "content": question}] |
| ) |
|
|
|
|
| @retry(wait=RETRY_WAIT, stop=RETRY_STOP) |
| def rewrite_query(question: str, history: list[dict] = []) -> str: |
| """Rewrite the user's question into a concise knowledge base search query.""" |
| message = f""" |
| You are in a conversation about Kharisma Rizki Wijanarko (Rizki)'s career. |
| You are about to search a Knowledge Base to answer the user's question. |
| |
| Conversation history: |
| {history} |
| |
| User's current question: |
| {question} |
| |
| Respond ONLY with a short, precise query to search the Knowledge Base. Nothing else. |
| """ |
| response = completion(model=MODEL, messages=[{"role": "system", "content": message}]) |
| return response.choices[0].message.content |
|
|
|
|
| def fetch_context_unranked(question: str) -> list[Chunk]: |
| openai_client = get_openai_client() |
| collection = get_collection() |
| |
| query_embedding = ( |
| openai_client.embeddings.create(model=EMBEDDING_MODEL, input=[question]) |
| .data[0] |
| .embedding |
| ) |
| results = collection.query(query_embeddings=[query_embedding], n_results=RETRIEVAL_K) |
| return [ |
| Chunk(page_content=doc, metadata=meta) |
| for doc, meta in zip(results["documents"][0], results["metadatas"][0]) |
| ] |
|
|
|
|
| def merge_chunks(primary: list[Chunk], secondary: list[Chunk]) -> list[Chunk]: |
| seen = {chunk.page_content for chunk in primary} |
| return primary + [chunk for chunk in secondary if chunk.page_content not in seen] |
|
|
|
|
| def fetch_context(original_question: str) -> list[Chunk]: |
| """Dual-query retrieval with query rewriting and reranking.""" |
| try: |
| rewritten_question = rewrite_query(original_question) |
| logger.info(f"[RAG] Rewritten query: {rewritten_question!r}") |
| except Exception as e: |
| logger.warning(f"[RAG] Query rewrite failed ({e}), falling back to original.") |
| rewritten_question = original_question |
|
|
| chunks1 = fetch_context_unranked(original_question) |
| chunks2 = fetch_context_unranked(rewritten_question) |
| merged = merge_chunks(chunks1, chunks2) |
| reranked = rerank(original_question, merged) |
| return reranked[:FINAL_K] |
|
|
|
|
| |
|
|
| @retry(wait=RETRY_WAIT, stop=RETRY_STOP) |
| def answer_question(question: str, history: list[dict] = []) -> tuple[str, list[Chunk]]: |
| """ |
| Answer a question using RAG. Returns the answer string and the retrieved chunks. |
| Handles multi-turn tool calls in a loop until the model produces a final text response. |
| """ |
| t0 = time.time() |
|
|
| |
| |
| if _looks_like_unknown_followup_request(history): |
| prev_unknown_q = _find_previous_user_question_before_followup(history) |
| name, email = _extract_name_email(history + [{"role": "user", "content": question}]) |
| if not email: |
| return ( |
| "Thanks — could you share your email address as well? (And your name, if you haven’t already.)", |
| [], |
| ) |
|
|
| details_lines = [] |
| if name: |
| details_lines.append(f"Name: {name}") |
| details_lines.append(f"Email: {email}") |
| if question.strip(): |
| details_lines.append(f"Follow-up: {question.strip()}") |
|
|
| record_unknown_question(prev_unknown_q or "(unknown question not found in history)", user_details="\n".join(details_lines)) |
| return ( |
| "Thanks — I’ve shared your details with Rizki so he can follow up.", |
| [], |
| ) |
|
|
| action = route_action(question, history) |
| logger.info(f"[ROUTER] action={action!r}") |
|
|
| if action == "rag": |
| chunks = fetch_context(question) |
| logger.info(f"[RAG] Context retrieved in {time.time() - t0:.2f}s ({len(chunks)} chunks)") |
| messages = make_rag_messages(question, history, chunks) |
| answer = run_tool_loop(messages) |
| logger.info(f"[RAG] Total answer_question time: {time.time() - t0:.2f}s") |
| return answer, chunks |
|
|
| if action == "contact": |
| messages = [{"role": "system", "content": CONTACT_SYSTEM_PROMPT}] + history + [ |
| {"role": "user", "content": question} |
| ] |
| answer = run_tool_loop(messages) |
| logger.info(f"[CHAT] Total answer_question time: {time.time() - t0:.2f}s") |
| return answer, [] |
|
|
| if action == "unknown": |
| |
| response = completion( |
| model=MODEL, |
| messages=[{"role": "system", "content": UNKNOWN_SYSTEM_PROMPT}] + history + [ |
| {"role": "user", "content": question} |
| ], |
| ) |
| answer = response.choices[0].message.content |
| logger.info(f"[CHAT] Total answer_question time: {time.time() - t0:.2f}s") |
| return answer, [] |
|
|
| |
| messages = [{"role": "system", "content": DIRECT_SYSTEM_PROMPT}] + history + [ |
| {"role": "user", "content": question} |
| ] |
| answer = run_tool_loop(messages) |
| logger.info(f"[CHAT] Total answer_question time: {time.time() - t0:.2f}s") |
| return answer, [] |