instant_chatbot / src /handler.py
galuhalifani's picture
add usage loggers
b15cb21
import re
from datetime import datetime, timezone
from pymongo import MongoClient
import os
from dotenv import load_dotenv
from langdetect import detect
from deep_translator import GoogleTranslator
# Load Environment Variables
load_dotenv()
MONGODB_URI = os.environ.get("MONGO_URI")
OPENAI_KEY = os.environ.get("OPENAI_API_KEY")
def init_mongodb():
try:
client = MongoClient(MONGODB_URI, serverSelectionTimeoutMS=5000)
client.server_info()
return client
except Exception as e:
error_msg = f"⚠️ Failed connecting to database: {str(e)}"
return error_msg
client = init_mongodb()
if client:
db = client['instant_bot']
collection = db['instant']
feedback_collection = db['feedback']
web_counter_collection = db['web_counter']
else:
collection = None
feedback_collection = None
web_counter_collection = None
FEEDBACK_KEYWORDS = ['Feedback', 'feedback', 'Feedback', 'Feedback?', 'feedback?', 'feedback!', 'not helpful', 'not helpful', 'not helpful?', 'not helpful?', 'not helpful!', 'helpful', 'helpful?', 'helpful!', "'helpful'", "'not helpful", "'feedback'"]
last_qna_map = {}
def store_last_qna(user_id, question, answer):
last_qna_map[user_id] = {"question": question, "answer": answer}
last_qna_map["anonymous"] = {"question": question, "answer": answer}
def get_last_question(user_id="anonymous"):
return last_qna_map.get(user_id, {}).get("question", "")
def get_last_answer(user_id="anonymous"):
return last_qna_map.get(user_id, {}).get("answer", "")
def normalize(text):
return re.sub(r'[^\w\s]', '', text).strip().lower()
def is_feedback_message(text: str) -> bool:
cleaned = normalize(text)
return any(cleaned.startswith(keyword) for keyword in FEEDBACK_KEYWORDS)
def check_question_feedback(query, user_id="anonymous"):
feedback_obj = None
last_qna = {"question": None, "answer": None}
if is_feedback_message(query):
feedback_obj = extract_feedback_content(query)
last_qna = {
"question": get_last_question(user_id),
"answer": get_last_answer(user_id),
}
return {"is_feedback": True, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna}
else:
return {"is_feedback": False, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna}
def extract_feedback_content(raw_message: str) -> dict:
cleaned = normalize(raw_message)
for keyword in FEEDBACK_KEYWORDS:
if cleaned.startswith(keyword):
remaining = cleaned[len(keyword):].strip()
return {"feedback": keyword, "comment": remaining}
return {"feedback": None, "comment": cleaned}
def save_feedback(feedback_obj: dict, last_qna: dict) -> str:
feedback_data = {
"feedback": feedback_obj["feedback"],
"comment": feedback_obj["comment"],
"question": last_qna.get("question"),
"answer": last_qna.get("answer"),
"timestamp": datetime.now(timezone.utc).isoformat()
}
try:
feedback_collection.insert_one(feedback_data)
return "Thank you for your feedback! We have recorded it successfully"
except Exception as e:
return f"An error occured when saving feedback: {str(e)}"
def translate_answer(question, answer):
if len(question) < 5:
return answer
detected_lang_question = detect(question)
detected_lang_answer = detect(answer)
if detected_lang_question != detected_lang_answer:
translated = GoogleTranslator(source='auto', target=detected_lang_question).translate(answer)
return translated
else:
return answer
def translate_list(question, list_of_answers):
if len(question) < 3:
return list_of_answers
detected_lang_question = detect(question)
translated = []
if detected_lang_question != 'en':
for a in list_of_answers:
answer = GoogleTranslator(source='auto', target=detected_lang_question).translate(a)
translated.append(answer)
return translated
else:
return list_of_answers
def translate_text(lang, text):
supported_languages = ['en', 'id', 'fr', 'de', 'th', 'es', 'it', 'pt', 'ja', 'ko', 'zh-cn', 'zh-tw', 'ru']
try:
if lang in supported_languages:
translated = GoogleTranslator(source='auto', target=lang).translate(text)
return translated
else:
return text
except Exception as e:
return text
def add_question_ticker(query=None):
try:
web_counter_collection.update_one(
{"type": "web"},
{
"$inc": {"counter": +1},
"$set": {"timestamp": datetime.now(timezone.utc)},
"$set": {"query": query}
},
upsert=True
)
return True
except Exception as e:
print("Error updating web counter:", e)
return False
def refresh_web_counter():
current_hour = datetime.now().hour
refresh_hour = current_hour == 1
if refresh_hour:
try:
web_counter_collection.update_one(
{"type": "web"},
{
"$set": {"counter": 0, "timestamp": datetime.now(timezone.utc)}
}
)
except Exception as e:
print("Error refreshing web counter:", e)
return True
def is_overlimit():
try:
counter = web_counter_collection.find_one({"type": "web"})
total_chats = counter.get("counter", 0) if counter else 0
if total_chats > 10:
return True
else:
return False
except Exception as e:
print("Error updating web counter:", e)
return False