h7-chat-backend / chat_engine.py
hashan-7's picture
Update code
e78d829 verified
import asyncio
import datetime
import re
from typing import Any, Dict, Optional
import requests
from deep_translator import GoogleTranslator
from router_logic import detect_route, check_identity_question
from search_engine import build_search_context, validate_intent_with_llama
from models_loader import MODEL_NAME
OLLAMA_URL = "http://localhost:11434/api/generate"
def translate_to_english(text: str) -> str:
try:
return GoogleTranslator(source="auto", target="en").translate(text)
except Exception:
return text
def translate_to_sinhala(text: str) -> str:
try:
return GoogleTranslator(source="en", target="si").translate(text)
except Exception:
return text
def is_sinhala(text: str) -> bool:
return any("\u0D80" <= char <= "\u0DFF" for char in text)
def clean_text(text: str) -> str:
return re.sub(r"\s+", " ", str(text)).strip() if text else ""
def format_chat_history(history) -> str:
if not history:
return ""
formatted = []
for msg in history[-4:]:
role = "User" if str(msg.role).lower() == "user" else "H7"
formatted.append(f"{role}: {msg.content}")
return "\n".join(formatted)
def is_price_query(text: str) -> bool:
t = clean_text(text).lower()
keywords = [
"price",
"rate",
"cost",
"value",
"gold price",
"fuel price",
"stock price",
"exchange rate",
"market price",
"bitcoin price",
"crypto price",
"silver price",
]
return any(k in t for k in keywords)
def has_country_hint(text: str) -> bool:
t = clean_text(text).lower()
country_terms = [
"sri lanka",
"lk",
"colombo",
"india",
"indian",
"usa",
"us",
"united states",
"uk",
"united kingdom",
"dubai",
"uae",
"singapore",
"australia",
"canada",
"japan",
"china",
"global",
]
return any(term in t for term in country_terms)
def has_unit_hint(text: str) -> bool:
t = clean_text(text).lower()
unit_terms = [
"per gram",
"gram",
"kg",
"kilogram",
"ounce",
"oz",
"tola",
]
return any(term in t for term in unit_terms)
def has_purity_hint(text: str) -> bool:
t = clean_text(text).lower()
purity_terms = [
"24k",
"22k",
"21k",
"18k",
"carat",
"karat",
"kt",
]
return any(term in t for term in purity_terms)
def is_ambiguous_price_query(text: str) -> bool:
if not is_price_query(text):
return False
score = 0
if has_country_hint(text):
score += 1
if has_unit_hint(text):
score += 1
if has_purity_hint(text):
score += 1
return score == 0
def build_ambiguous_price_response(text: str) -> str:
t = clean_text(text).lower()
if "gold" in t:
return (
"Gold price depends on the country, currency, purity, and unit. "
"Please specify what you want: Sri Lanka 24K per gram, India 22K per gram, "
"or global spot price in USD per ounce."
)
if "fuel" in t:
return (
"Fuel price depends on the country and fuel type. "
"Please specify what you want, for example Sri Lanka petrol 92, "
"Sri Lanka diesel, or India petrol price."
)
if "stock" in t:
return (
"Stock price depends on the company and exchange. "
"Please specify the company name or ticker symbol, for example Apple stock price or TSLA price."
)
if "exchange" in t or "currency" in t:
return (
"Exchange rate depends on the currency pair. "
"Please specify what you want, for example USD to LKR or EUR to USD."
)
return (
"This price query needs more detail. "
"Please specify the country, unit, or exact item so I can give the correct live value."
)
def format_currency(currency: str) -> str:
mapping = {
"LKR": "LKR",
"RS": "LKR",
"RS.": "LKR",
"INR": "INR",
"USD": "USD",
"$": "USD",
"EUR": "EUR",
"GBP": "GBP",
"UNKNOWN": "UNKNOWN",
}
return mapping.get(currency, currency)
def format_unit(unit: str) -> str:
mapping = {
"g": "gram",
"gram": "gram",
"oz": "ounce",
"ounce": "ounce",
"kg": "kg",
"tola": "tola",
"unknown": "unknown",
}
return mapping.get(unit, unit)
def detect_price_label(user_msg: str) -> str:
text = clean_text(user_msg).lower()
location = ""
if "sri lanka" in text:
location = "Sri Lanka "
elif "india" in text:
location = "India "
elif "global" in text:
location = "global "
purity = ""
if "24k" in text:
purity = "24K "
elif "22k" in text:
purity = "22K "
elif "21k" in text:
purity = "21K "
elif "18k" in text:
purity = "18K "
item = "price"
if "gold" in text:
item = "gold price"
elif "silver" in text:
item = "silver price"
elif "fuel" in text:
item = "fuel price"
elif "bitcoin" in text:
item = "Bitcoin price"
elif "stock" in text:
item = "stock price"
elif "president" in text:
item = "president"
return f"{location}{purity}{item}".strip()
def build_direct_price_response(user_msg: str, agreed_value: float, currency: str, unit: str) -> str:
label = detect_price_label(user_msg)
formatted_currency = format_currency(currency)
formatted_unit = format_unit(unit)
value = f"{agreed_value:,.2f}"
if formatted_unit == "unknown":
return f"The current {label} is around {formatted_currency} {value} based on multiple live sources."
return f"The current {label} is around {formatted_currency} {value} per {formatted_unit} based on multiple live sources."
def build_conflict_price_response(user_msg: str) -> str:
t = clean_text(user_msg).lower()
if "gold" in t:
return (
"I found conflicting live gold prices across sources, so I cannot confirm one exact value reliably right now. "
"Please check a trusted bullion or financial source for confirmation."
)
if "fuel" in t:
return (
"I found conflicting live fuel prices across sources, so I cannot confirm one exact value reliably right now. "
"Please check an official local source for confirmation."
)
if "stock" in t:
return (
"I found conflicting live stock prices across sources, so I cannot confirm one exact value reliably right now. "
"Please check a trusted market source for confirmation."
)
return (
"I found conflicting live values across sources, so I cannot confirm one exact value reliably right now."
)
def build_search_failure_response(user_msg: str) -> str:
t = clean_text(user_msg).lower()
if "gold" in t:
return (
"I could not confirm a reliable live gold value from the available sources right now. "
"Please specify 22K or 24K and the unit if you want a narrower result."
)
if "weather" in t:
return "I could not confirm a reliable live weather update from the available sources right now."
if "president" in t or "minister" in t:
return "I could not confirm a reliable current public-office answer from the available sources right now."
return "I could not confirm a reliable live answer from the available sources right now."
def build_codex_redirect_response() -> str:
return (
"This looks like a coding request. It should be handled by Code X for better results. "
"Please switch to Code X and send the same request there."
)
def build_image_redirect_response() -> str:
return (
"This looks like an image-related request. It should be handled by the image workflow rather than the normal chat backend."
)
def build_restricted_response() -> str:
return "I cannot help with harmful, abusive, or clearly unsafe requests."
def build_base_response(
response: str,
route: str,
intent: str,
confidence: float,
search_ok: bool = False,
source_count: int = 0,
agreement_ok: bool = False,
agreed_value=None,
currency: str = "UNKNOWN",
unit: str = "unknown",
model_fallback_used: bool = False,
chat_id: Optional[str] = None,
) -> Dict[str, Any]:
return {
"response": response,
"route": route,
"intent": intent,
"confidence": confidence,
"search_ok": search_ok,
"source_count": source_count,
"agreement_ok": agreement_ok,
"agreed_value": agreed_value,
"currency": format_currency(currency),
"unit": format_unit(unit),
"model_fallback_used": model_fallback_used,
"chat_id": chat_id,
}
async def post_to_ollama(payload: dict, timeout: int = 60) -> dict:
loop = asyncio.get_running_loop()
def _request():
response = requests.post(OLLAMA_URL, json=payload, timeout=timeout)
response.raise_for_status()
return response.json()
return await loop.run_in_executor(None, _request)
async def ask_ollama_async(
prompt: str,
history_context: str = "",
search_context: str = "",
search_ok: bool = False,
source_count: int = 0,
) -> str:
current_time_str = datetime.datetime.now().strftime("%Y-%m-%d")
system_instruction = (
f"Current Date: {current_time_str}. "
"You are H7 Assistant. "
"Reply naturally for greetings and casual conversation. "
"Keep answers concise unless the user clearly asks for more detail. "
"For factual questions, use the provided search results when available. "
"Do not invent facts, numbers, prices, dates, or statistics when evidence is weak. "
"For role based factual questions, answer directly and cleanly without unnecessary words like our or my unless the user explicitly asks from a personal perspective. "
"If search results are available, use them carefully and answer directly."
)
search_status_text = (
f"Reliable Search Available: {'YES' if search_ok else 'NO'}\n"
f"Unique Source Count: {source_count}\n"
)
full_prompt = (
f"SYSTEM:\n{system_instruction}\n\n"
f"[SEARCH STATUS]\n{search_status_text}\n"
f"[SEARCH RESULTS]\n{search_context if search_context else 'No strong results found.'}\n\n"
f"[HISTORY]\n{history_context if history_context else 'No previous conversation.'}\n\n"
f"USER: {prompt}\n"
f"H7:"
)
payload = {
"model": MODEL_NAME,
"prompt": full_prompt,
"stream": False,
"options": {
"temperature": 0,
"top_p": 0.9,
},
}
try:
data = await post_to_ollama(payload, timeout=60)
return data.get("response", "").strip() or "Processing error."
except Exception:
return "Error generating response."
def cleanup_response(text: str) -> str:
cleaned = re.sub(r"\n{3,}", "\n\n", str(text).strip())
cleaned = re.sub(r"^Our current ", "The current ", cleaned)
cleaned = re.sub(r"^My current ", "The current ", cleaned)
cleaned = re.sub(r"let me check again for you\.?", "", cleaned, flags=re.IGNORECASE)
cleaned = clean_text(cleaned)
return cleaned
def choose_route(raw_msg: str, processed_msg: str):
raw_route, raw_intent, raw_conf = detect_route(raw_msg)
processed_route, processed_intent, processed_conf = detect_route(processed_msg)
priority_order = ["restricted", "image", "identity", "codex", "search", "direct", "smart"]
candidates = [
(raw_route, raw_intent, raw_conf),
(processed_route, processed_intent, processed_conf),
]
for preferred in priority_order:
for route, intent, conf in candidates:
if route == preferred:
return route, intent, conf
return processed_route, processed_intent, processed_conf
def is_public_role_query(text: str) -> bool:
t = clean_text(text).lower()
role_terms = [
"president",
"prime minister",
"minister",
"ceo",
"governor",
"mayor",
"current leader",
"current head",
]
return any(term in t for term in role_terms)
def is_vague_follow_up(text: str) -> bool:
t = clean_text(text).lower()
vague_terms = [
"what about today",
"what about now",
"what about it",
"what about this",
"today?",
"now?",
"and today",
"and now",
"what about current",
"what about the current one",
]
return any(t == item or t.startswith(item) for item in vague_terms)
def extract_direct_answer_from_context(search_context: str) -> str:
if not search_context:
return ""
blocks = [clean_text(block) for block in search_context.split("\n\n") if clean_text(block)]
if not blocks:
return ""
priority_markers = ["[Direct Answer]", "[Answer Box]", "[News]", "[Organic]"]
for marker in priority_markers:
for block in blocks:
if block.startswith(marker):
return block
return blocks[0]
def looks_like_person_name(text: str) -> bool:
candidate = clean_text(text)
if not candidate:
return False
lowered = candidate.lower()
invalid_starts = [
"as of",
"according to",
"current date",
"today",
"february",
"march",
"april",
"may",
"june",
"july",
"august",
"september",
"october",
"november",
"december",
]
if any(lowered.startswith(item) for item in invalid_starts):
return False
if re.search(r"\d", candidate):
return False
words = candidate.split()
if len(words) < 2 or len(words) > 4:
return False
for word in words:
if not re.fullmatch(r"[A-Z][A-Za-z.\-]*", word):
return False
return True
def extract_president_name_from_context(search_context: str) -> str:
if not search_context:
return ""
known_names = [
"Anura Kumara Dissanayake",
"Ranil Wickremesinghe",
"Maithripala Sirisena",
"Gotabaya Rajapaksa",
]
lowered = search_context.lower()
for candidate in known_names:
if candidate.lower() in lowered:
return candidate
patterns = [
r"President of Sri Lanka(?: is|:)?\s+([A-Z][A-Za-z.\-]+(?:\s+[A-Z][A-Za-z.\-]+){1,3})",
r"\b([A-Z][A-Za-z.\-]+(?:\s+[A-Z][A-Za-z.\-]+){1,3})\b(?:\s+is)?\s+the\s+President of Sri Lanka",
]
for pattern in patterns:
matches = re.findall(pattern, search_context, flags=re.IGNORECASE)
for match in matches:
candidate = clean_text(match)
if looks_like_person_name(candidate):
return candidate
return ""
def build_grounded_search_response(
processed_msg: str,
history_text: str,
search_context: str,
) -> str:
current_date_str = datetime.datetime.now().strftime("%Y-%m-%d")
direct_block = extract_direct_answer_from_context(search_context)
message_lower = clean_text(processed_msg).lower()
if is_public_role_query(processed_msg) or (is_vague_follow_up(processed_msg) and "president" in history_text.lower()):
president_name = extract_president_name_from_context(search_context)
if president_name and looks_like_person_name(president_name):
if is_vague_follow_up(processed_msg) or "today" in message_lower or "now" in message_lower:
return f"As of {current_date_str}, the President of Sri Lanka is {president_name}."
return f"The President of Sri Lanka is {president_name}."
if direct_block:
cleaned = re.sub(r"^\[(Direct Answer|Answer Box|News|Organic)\]\s*", "", direct_block)
cleaned = re.sub(r"\s*\|\s*Link:.*$", "", cleaned)
cleaned = re.sub(r"\s*\|\s*Source:.*$", "", cleaned)
cleaned = re.sub(r"\s*\|\s*Rank:.*$", "", cleaned)
cleaned = clean_text(cleaned)
if cleaned:
return cleaned
return ""
async def process_chat_request(message: str, history, chat_id: Optional[str] = None):
raw_msg = clean_text(message)
history_text = format_chat_history(history or [])
user_is_sinhala = is_sinhala(raw_msg)
if not raw_msg:
return build_base_response(
response="Please enter a message.",
route="direct",
intent="empty_message",
confidence=1.0,
chat_id=chat_id,
)
if user_is_sinhala and ("කවුද" in raw_msg and "ඔයා" in raw_msg):
return build_base_response(
response="මම H7 Chat, H7 විසින් නිර්මාණය කරන ලද බුද්ධිමත් සහායකයා.",
route="identity",
intent="identity",
confidence=1.0,
chat_id=chat_id,
)
processed_msg = translate_to_english(raw_msg) if user_is_sinhala else raw_msg
processed_msg = clean_text(processed_msg)
route, intent_label, confidence = choose_route(raw_msg, processed_msg)
if route == "restricted":
response_en = build_restricted_response()
final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en
return build_base_response(
response=final_response,
route=route,
intent=intent_label,
confidence=confidence,
chat_id=chat_id,
)
if route == "image":
response_en = build_image_redirect_response()
final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en
return build_base_response(
response=final_response,
route=route,
intent=intent_label,
confidence=confidence,
chat_id=chat_id,
)
if check_identity_question(processed_msg):
final_res = "I am H7 Chat, an intelligent assistant developed by H7."
final_response = translate_to_sinhala(final_res) if user_is_sinhala else final_res
return build_base_response(
response=final_response,
route="identity",
intent="identity",
confidence=1.0,
chat_id=chat_id,
)
if is_ambiguous_price_query(processed_msg):
response_en = build_ambiguous_price_response(processed_msg)
final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en
return build_base_response(
response=final_response,
route="clarify",
intent="ambiguous_price_query",
confidence=1.0,
chat_id=chat_id,
)
try:
if route == "codex":
response_en = build_codex_redirect_response()
final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en
return build_base_response(
response=final_response,
route=route,
intent=intent_label,
confidence=confidence,
chat_id=chat_id,
)
refined_context = ""
source_count = 0
search_ok = False
agreement_ok = False
agreed_value = None
currency = "UNKNOWN"
unit = "unknown"
query_is_price = is_price_query(processed_msg)
if route == "search":
search_payload = await build_search_context(processed_msg, history_text)
refined_context = search_payload.get("context", "")
source_count = search_payload.get("source_count", 0)
search_ok = search_payload.get("search_ok", False)
agreement_ok = search_payload.get("agreement_ok", False)
agreed_value = search_payload.get("agreed_value")
currency = search_payload.get("currency", "UNKNOWN")
unit = search_payload.get("unit", "unknown")
query_is_price = search_payload.get("is_price_query", query_is_price)
elif route == "smart":
needs_search = await validate_intent_with_llama(processed_msg)
if needs_search:
search_payload = await build_search_context(processed_msg, history_text)
refined_context = search_payload.get("context", "")
source_count = search_payload.get("source_count", 0)
search_ok = search_payload.get("search_ok", False)
agreement_ok = search_payload.get("agreement_ok", False)
agreed_value = search_payload.get("agreed_value")
currency = search_payload.get("currency", "UNKNOWN")
unit = search_payload.get("unit", "unknown")
query_is_price = search_payload.get("is_price_query", query_is_price)
if query_is_price and search_ok and agreement_ok and agreed_value is not None:
response_en = build_direct_price_response(processed_msg, agreed_value, currency, unit)
final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en
return build_base_response(
response=final_response,
route=route,
intent=intent_label,
confidence=confidence,
search_ok=search_ok,
source_count=source_count,
agreement_ok=agreement_ok,
agreed_value=agreed_value,
currency=currency,
unit=unit,
model_fallback_used=False,
chat_id=chat_id,
)
if query_is_price and search_ok and not agreement_ok:
response_en = build_conflict_price_response(processed_msg)
final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en
return build_base_response(
response=final_response,
route=route,
intent=intent_label,
confidence=confidence,
search_ok=search_ok,
source_count=source_count,
agreement_ok=agreement_ok,
agreed_value=agreed_value,
currency=currency,
unit=unit,
model_fallback_used=False,
chat_id=chat_id,
)
if route == "search" and not search_ok:
response_en = build_search_failure_response(processed_msg)
final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en
return build_base_response(
response=final_response,
route=route,
intent=intent_label,
confidence=confidence,
search_ok=search_ok,
source_count=source_count,
agreement_ok=agreement_ok,
agreed_value=agreed_value,
currency=currency,
unit=unit,
model_fallback_used=False,
chat_id=chat_id,
)
if route == "search" and search_ok:
grounded_response_en = build_grounded_search_response(
processed_msg=processed_msg,
history_text=history_text,
search_context=refined_context,
)
if grounded_response_en:
final_response = translate_to_sinhala(grounded_response_en) if user_is_sinhala else grounded_response_en
return build_base_response(
response=final_response,
route=route,
intent=intent_label,
confidence=confidence,
search_ok=search_ok,
source_count=source_count,
agreement_ok=agreement_ok,
agreed_value=agreed_value,
currency=currency,
unit=unit,
model_fallback_used=False,
chat_id=chat_id,
)
ai_response_en = await ask_ollama_async(
prompt=processed_msg,
history_context=history_text,
search_context=refined_context,
search_ok=search_ok,
source_count=source_count,
)
ai_response_en = cleanup_response(ai_response_en)
final_response = translate_to_sinhala(ai_response_en) if user_is_sinhala else ai_response_en
return build_base_response(
response=final_response,
route=route,
intent=intent_label,
confidence=confidence,
search_ok=search_ok,
source_count=source_count,
agreement_ok=agreement_ok,
agreed_value=agreed_value,
currency=currency,
unit=unit,
model_fallback_used=False,
chat_id=chat_id,
)
except Exception as e:
return {
"response": "Internal Server Error.",
"error": str(e),
"chat_id": chat_id,
}