import asyncio import datetime import re from typing import Any, Dict, Optional import requests from deep_translator import GoogleTranslator from router_logic import detect_route, check_identity_question from search_engine import build_search_context, validate_intent_with_llama from models_loader import MODEL_NAME OLLAMA_URL = "http://localhost:11434/api/generate" def translate_to_english(text: str) -> str: try: return GoogleTranslator(source="auto", target="en").translate(text) except Exception: return text def translate_to_sinhala(text: str) -> str: try: return GoogleTranslator(source="en", target="si").translate(text) except Exception: return text def is_sinhala(text: str) -> bool: return any("\u0D80" <= char <= "\u0DFF" for char in text) def clean_text(text: str) -> str: return re.sub(r"\s+", " ", str(text)).strip() if text else "" def format_chat_history(history) -> str: if not history: return "" formatted = [] for msg in history[-4:]: role = "User" if str(msg.role).lower() == "user" else "H7" formatted.append(f"{role}: {msg.content}") return "\n".join(formatted) def is_price_query(text: str) -> bool: t = clean_text(text).lower() keywords = [ "price", "rate", "cost", "value", "gold price", "fuel price", "stock price", "exchange rate", "market price", "bitcoin price", "crypto price", "silver price", ] return any(k in t for k in keywords) def has_country_hint(text: str) -> bool: t = clean_text(text).lower() country_terms = [ "sri lanka", "lk", "colombo", "india", "indian", "usa", "us", "united states", "uk", "united kingdom", "dubai", "uae", "singapore", "australia", "canada", "japan", "china", "global", ] return any(term in t for term in country_terms) def has_unit_hint(text: str) -> bool: t = clean_text(text).lower() unit_terms = [ "per gram", "gram", "kg", "kilogram", "ounce", "oz", "tola", ] return any(term in t for term in unit_terms) def has_purity_hint(text: str) -> bool: t = clean_text(text).lower() purity_terms = [ "24k", "22k", "21k", "18k", "carat", "karat", "kt", ] return any(term in t for term in purity_terms) def is_ambiguous_price_query(text: str) -> bool: if not is_price_query(text): return False score = 0 if has_country_hint(text): score += 1 if has_unit_hint(text): score += 1 if has_purity_hint(text): score += 1 return score == 0 def build_ambiguous_price_response(text: str) -> str: t = clean_text(text).lower() if "gold" in t: return ( "Gold price depends on the country, currency, purity, and unit. " "Please specify what you want: Sri Lanka 24K per gram, India 22K per gram, " "or global spot price in USD per ounce." ) if "fuel" in t: return ( "Fuel price depends on the country and fuel type. " "Please specify what you want, for example Sri Lanka petrol 92, " "Sri Lanka diesel, or India petrol price." ) if "stock" in t: return ( "Stock price depends on the company and exchange. " "Please specify the company name or ticker symbol, for example Apple stock price or TSLA price." ) if "exchange" in t or "currency" in t: return ( "Exchange rate depends on the currency pair. " "Please specify what you want, for example USD to LKR or EUR to USD." ) return ( "This price query needs more detail. " "Please specify the country, unit, or exact item so I can give the correct live value." ) def format_currency(currency: str) -> str: mapping = { "LKR": "LKR", "RS": "LKR", "RS.": "LKR", "INR": "INR", "USD": "USD", "$": "USD", "EUR": "EUR", "GBP": "GBP", "UNKNOWN": "UNKNOWN", } return mapping.get(currency, currency) def format_unit(unit: str) -> str: mapping = { "g": "gram", "gram": "gram", "oz": "ounce", "ounce": "ounce", "kg": "kg", "tola": "tola", "unknown": "unknown", } return mapping.get(unit, unit) def detect_price_label(user_msg: str) -> str: text = clean_text(user_msg).lower() location = "" if "sri lanka" in text: location = "Sri Lanka " elif "india" in text: location = "India " elif "global" in text: location = "global " purity = "" if "24k" in text: purity = "24K " elif "22k" in text: purity = "22K " elif "21k" in text: purity = "21K " elif "18k" in text: purity = "18K " item = "price" if "gold" in text: item = "gold price" elif "silver" in text: item = "silver price" elif "fuel" in text: item = "fuel price" elif "bitcoin" in text: item = "Bitcoin price" elif "stock" in text: item = "stock price" elif "president" in text: item = "president" return f"{location}{purity}{item}".strip() def build_direct_price_response(user_msg: str, agreed_value: float, currency: str, unit: str) -> str: label = detect_price_label(user_msg) formatted_currency = format_currency(currency) formatted_unit = format_unit(unit) value = f"{agreed_value:,.2f}" if formatted_unit == "unknown": return f"The current {label} is around {formatted_currency} {value} based on multiple live sources." return f"The current {label} is around {formatted_currency} {value} per {formatted_unit} based on multiple live sources." def build_conflict_price_response(user_msg: str) -> str: t = clean_text(user_msg).lower() if "gold" in t: return ( "I found conflicting live gold prices across sources, so I cannot confirm one exact value reliably right now. " "Please check a trusted bullion or financial source for confirmation." ) if "fuel" in t: return ( "I found conflicting live fuel prices across sources, so I cannot confirm one exact value reliably right now. " "Please check an official local source for confirmation." ) if "stock" in t: return ( "I found conflicting live stock prices across sources, so I cannot confirm one exact value reliably right now. " "Please check a trusted market source for confirmation." ) return ( "I found conflicting live values across sources, so I cannot confirm one exact value reliably right now." ) def build_search_failure_response(user_msg: str) -> str: t = clean_text(user_msg).lower() if "gold" in t: return ( "I could not confirm a reliable live gold value from the available sources right now. " "Please specify 22K or 24K and the unit if you want a narrower result." ) if "weather" in t: return "I could not confirm a reliable live weather update from the available sources right now." if "president" in t or "minister" in t: return "I could not confirm a reliable current public-office answer from the available sources right now." return "I could not confirm a reliable live answer from the available sources right now." def build_codex_redirect_response() -> str: return ( "This looks like a coding request. It should be handled by Code X for better results. " "Please switch to Code X and send the same request there." ) def build_image_redirect_response() -> str: return ( "This looks like an image-related request. It should be handled by the image workflow rather than the normal chat backend." ) def build_restricted_response() -> str: return "I cannot help with harmful, abusive, or clearly unsafe requests." def build_base_response( response: str, route: str, intent: str, confidence: float, search_ok: bool = False, source_count: int = 0, agreement_ok: bool = False, agreed_value=None, currency: str = "UNKNOWN", unit: str = "unknown", model_fallback_used: bool = False, chat_id: Optional[str] = None, ) -> Dict[str, Any]: return { "response": response, "route": route, "intent": intent, "confidence": confidence, "search_ok": search_ok, "source_count": source_count, "agreement_ok": agreement_ok, "agreed_value": agreed_value, "currency": format_currency(currency), "unit": format_unit(unit), "model_fallback_used": model_fallback_used, "chat_id": chat_id, } async def post_to_ollama(payload: dict, timeout: int = 60) -> dict: loop = asyncio.get_running_loop() def _request(): response = requests.post(OLLAMA_URL, json=payload, timeout=timeout) response.raise_for_status() return response.json() return await loop.run_in_executor(None, _request) async def ask_ollama_async( prompt: str, history_context: str = "", search_context: str = "", search_ok: bool = False, source_count: int = 0, ) -> str: current_time_str = datetime.datetime.now().strftime("%Y-%m-%d") system_instruction = ( f"Current Date: {current_time_str}. " "You are H7 Assistant. " "Reply naturally for greetings and casual conversation. " "Keep answers concise unless the user clearly asks for more detail. " "For factual questions, use the provided search results when available. " "Do not invent facts, numbers, prices, dates, or statistics when evidence is weak. " "For role based factual questions, answer directly and cleanly without unnecessary words like our or my unless the user explicitly asks from a personal perspective. " "If search results are available, use them carefully and answer directly." ) search_status_text = ( f"Reliable Search Available: {'YES' if search_ok else 'NO'}\n" f"Unique Source Count: {source_count}\n" ) full_prompt = ( f"SYSTEM:\n{system_instruction}\n\n" f"[SEARCH STATUS]\n{search_status_text}\n" f"[SEARCH RESULTS]\n{search_context if search_context else 'No strong results found.'}\n\n" f"[HISTORY]\n{history_context if history_context else 'No previous conversation.'}\n\n" f"USER: {prompt}\n" f"H7:" ) payload = { "model": MODEL_NAME, "prompt": full_prompt, "stream": False, "options": { "temperature": 0, "top_p": 0.9, }, } try: data = await post_to_ollama(payload, timeout=60) return data.get("response", "").strip() or "Processing error." except Exception: return "Error generating response." def cleanup_response(text: str) -> str: cleaned = re.sub(r"\n{3,}", "\n\n", str(text).strip()) cleaned = re.sub(r"^Our current ", "The current ", cleaned) cleaned = re.sub(r"^My current ", "The current ", cleaned) cleaned = re.sub(r"let me check again for you\.?", "", cleaned, flags=re.IGNORECASE) cleaned = clean_text(cleaned) return cleaned def choose_route(raw_msg: str, processed_msg: str): raw_route, raw_intent, raw_conf = detect_route(raw_msg) processed_route, processed_intent, processed_conf = detect_route(processed_msg) priority_order = ["restricted", "image", "identity", "codex", "search", "direct", "smart"] candidates = [ (raw_route, raw_intent, raw_conf), (processed_route, processed_intent, processed_conf), ] for preferred in priority_order: for route, intent, conf in candidates: if route == preferred: return route, intent, conf return processed_route, processed_intent, processed_conf def is_public_role_query(text: str) -> bool: t = clean_text(text).lower() role_terms = [ "president", "prime minister", "minister", "ceo", "governor", "mayor", "current leader", "current head", ] return any(term in t for term in role_terms) def is_vague_follow_up(text: str) -> bool: t = clean_text(text).lower() vague_terms = [ "what about today", "what about now", "what about it", "what about this", "today?", "now?", "and today", "and now", "what about current", "what about the current one", ] return any(t == item or t.startswith(item) for item in vague_terms) def extract_direct_answer_from_context(search_context: str) -> str: if not search_context: return "" blocks = [clean_text(block) for block in search_context.split("\n\n") if clean_text(block)] if not blocks: return "" priority_markers = ["[Direct Answer]", "[Answer Box]", "[News]", "[Organic]"] for marker in priority_markers: for block in blocks: if block.startswith(marker): return block return blocks[0] def looks_like_person_name(text: str) -> bool: candidate = clean_text(text) if not candidate: return False lowered = candidate.lower() invalid_starts = [ "as of", "according to", "current date", "today", "february", "march", "april", "may", "june", "july", "august", "september", "october", "november", "december", ] if any(lowered.startswith(item) for item in invalid_starts): return False if re.search(r"\d", candidate): return False words = candidate.split() if len(words) < 2 or len(words) > 4: return False for word in words: if not re.fullmatch(r"[A-Z][A-Za-z.\-]*", word): return False return True def extract_president_name_from_context(search_context: str) -> str: if not search_context: return "" known_names = [ "Anura Kumara Dissanayake", "Ranil Wickremesinghe", "Maithripala Sirisena", "Gotabaya Rajapaksa", ] lowered = search_context.lower() for candidate in known_names: if candidate.lower() in lowered: return candidate patterns = [ r"President of Sri Lanka(?: is|:)?\s+([A-Z][A-Za-z.\-]+(?:\s+[A-Z][A-Za-z.\-]+){1,3})", r"\b([A-Z][A-Za-z.\-]+(?:\s+[A-Z][A-Za-z.\-]+){1,3})\b(?:\s+is)?\s+the\s+President of Sri Lanka", ] for pattern in patterns: matches = re.findall(pattern, search_context, flags=re.IGNORECASE) for match in matches: candidate = clean_text(match) if looks_like_person_name(candidate): return candidate return "" def build_grounded_search_response( processed_msg: str, history_text: str, search_context: str, ) -> str: current_date_str = datetime.datetime.now().strftime("%Y-%m-%d") direct_block = extract_direct_answer_from_context(search_context) message_lower = clean_text(processed_msg).lower() if is_public_role_query(processed_msg) or (is_vague_follow_up(processed_msg) and "president" in history_text.lower()): president_name = extract_president_name_from_context(search_context) if president_name and looks_like_person_name(president_name): if is_vague_follow_up(processed_msg) or "today" in message_lower or "now" in message_lower: return f"As of {current_date_str}, the President of Sri Lanka is {president_name}." return f"The President of Sri Lanka is {president_name}." if direct_block: cleaned = re.sub(r"^\[(Direct Answer|Answer Box|News|Organic)\]\s*", "", direct_block) cleaned = re.sub(r"\s*\|\s*Link:.*$", "", cleaned) cleaned = re.sub(r"\s*\|\s*Source:.*$", "", cleaned) cleaned = re.sub(r"\s*\|\s*Rank:.*$", "", cleaned) cleaned = clean_text(cleaned) if cleaned: return cleaned return "" async def process_chat_request(message: str, history, chat_id: Optional[str] = None): raw_msg = clean_text(message) history_text = format_chat_history(history or []) user_is_sinhala = is_sinhala(raw_msg) if not raw_msg: return build_base_response( response="Please enter a message.", route="direct", intent="empty_message", confidence=1.0, chat_id=chat_id, ) if user_is_sinhala and ("කවුද" in raw_msg and "ඔයා" in raw_msg): return build_base_response( response="මම H7 Chat, H7 විසින් නිර්මාණය කරන ලද බුද්ධිමත් සහායකයා.", route="identity", intent="identity", confidence=1.0, chat_id=chat_id, ) processed_msg = translate_to_english(raw_msg) if user_is_sinhala else raw_msg processed_msg = clean_text(processed_msg) route, intent_label, confidence = choose_route(raw_msg, processed_msg) if route == "restricted": response_en = build_restricted_response() final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en return build_base_response( response=final_response, route=route, intent=intent_label, confidence=confidence, chat_id=chat_id, ) if route == "image": response_en = build_image_redirect_response() final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en return build_base_response( response=final_response, route=route, intent=intent_label, confidence=confidence, chat_id=chat_id, ) if check_identity_question(processed_msg): final_res = "I am H7 Chat, an intelligent assistant developed by H7." final_response = translate_to_sinhala(final_res) if user_is_sinhala else final_res return build_base_response( response=final_response, route="identity", intent="identity", confidence=1.0, chat_id=chat_id, ) if is_ambiguous_price_query(processed_msg): response_en = build_ambiguous_price_response(processed_msg) final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en return build_base_response( response=final_response, route="clarify", intent="ambiguous_price_query", confidence=1.0, chat_id=chat_id, ) try: if route == "codex": response_en = build_codex_redirect_response() final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en return build_base_response( response=final_response, route=route, intent=intent_label, confidence=confidence, chat_id=chat_id, ) refined_context = "" source_count = 0 search_ok = False agreement_ok = False agreed_value = None currency = "UNKNOWN" unit = "unknown" query_is_price = is_price_query(processed_msg) if route == "search": search_payload = await build_search_context(processed_msg, history_text) refined_context = search_payload.get("context", "") source_count = search_payload.get("source_count", 0) search_ok = search_payload.get("search_ok", False) agreement_ok = search_payload.get("agreement_ok", False) agreed_value = search_payload.get("agreed_value") currency = search_payload.get("currency", "UNKNOWN") unit = search_payload.get("unit", "unknown") query_is_price = search_payload.get("is_price_query", query_is_price) elif route == "smart": needs_search = await validate_intent_with_llama(processed_msg) if needs_search: search_payload = await build_search_context(processed_msg, history_text) refined_context = search_payload.get("context", "") source_count = search_payload.get("source_count", 0) search_ok = search_payload.get("search_ok", False) agreement_ok = search_payload.get("agreement_ok", False) agreed_value = search_payload.get("agreed_value") currency = search_payload.get("currency", "UNKNOWN") unit = search_payload.get("unit", "unknown") query_is_price = search_payload.get("is_price_query", query_is_price) if query_is_price and search_ok and agreement_ok and agreed_value is not None: response_en = build_direct_price_response(processed_msg, agreed_value, currency, unit) final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en return build_base_response( response=final_response, route=route, intent=intent_label, confidence=confidence, search_ok=search_ok, source_count=source_count, agreement_ok=agreement_ok, agreed_value=agreed_value, currency=currency, unit=unit, model_fallback_used=False, chat_id=chat_id, ) if query_is_price and search_ok and not agreement_ok: response_en = build_conflict_price_response(processed_msg) final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en return build_base_response( response=final_response, route=route, intent=intent_label, confidence=confidence, search_ok=search_ok, source_count=source_count, agreement_ok=agreement_ok, agreed_value=agreed_value, currency=currency, unit=unit, model_fallback_used=False, chat_id=chat_id, ) if route == "search" and not search_ok: response_en = build_search_failure_response(processed_msg) final_response = translate_to_sinhala(response_en) if user_is_sinhala else response_en return build_base_response( response=final_response, route=route, intent=intent_label, confidence=confidence, search_ok=search_ok, source_count=source_count, agreement_ok=agreement_ok, agreed_value=agreed_value, currency=currency, unit=unit, model_fallback_used=False, chat_id=chat_id, ) if route == "search" and search_ok: grounded_response_en = build_grounded_search_response( processed_msg=processed_msg, history_text=history_text, search_context=refined_context, ) if grounded_response_en: final_response = translate_to_sinhala(grounded_response_en) if user_is_sinhala else grounded_response_en return build_base_response( response=final_response, route=route, intent=intent_label, confidence=confidence, search_ok=search_ok, source_count=source_count, agreement_ok=agreement_ok, agreed_value=agreed_value, currency=currency, unit=unit, model_fallback_used=False, chat_id=chat_id, ) ai_response_en = await ask_ollama_async( prompt=processed_msg, history_context=history_text, search_context=refined_context, search_ok=search_ok, source_count=source_count, ) ai_response_en = cleanup_response(ai_response_en) final_response = translate_to_sinhala(ai_response_en) if user_is_sinhala else ai_response_en return build_base_response( response=final_response, route=route, intent=intent_label, confidence=confidence, search_ok=search_ok, source_count=source_count, agreement_ok=agreement_ok, agreed_value=agreed_value, currency=currency, unit=unit, model_fallback_used=False, chat_id=chat_id, ) except Exception as e: return { "response": "Internal Server Error.", "error": str(e), "chat_id": chat_id, }