import re from datetime import datetime, timezone from pymongo import MongoClient import os from dotenv import load_dotenv from langdetect import detect from deep_translator import GoogleTranslator # Load Environment Variables load_dotenv() MONGODB_URI = os.environ.get("MONGO_URI") OPENAI_KEY = os.environ.get("OPENAI_API_KEY") def init_mongodb(): try: client = MongoClient(MONGODB_URI, serverSelectionTimeoutMS=5000) client.server_info() return client except Exception as e: error_msg = f"⚠️ Failed connecting to database: {str(e)}" return error_msg client = init_mongodb() if client: db = client['instant_bot'] collection = db['instant'] feedback_collection = db['feedback'] web_counter_collection = db['web_counter'] else: collection = None feedback_collection = None web_counter_collection = None FEEDBACK_KEYWORDS = ['Feedback', 'feedback', 'Feedback', 'Feedback?', 'feedback?', 'feedback!', 'not helpful', 'not helpful', 'not helpful?', 'not helpful?', 'not helpful!', 'helpful', 'helpful?', 'helpful!', "'helpful'", "'not helpful", "'feedback'"] last_qna_map = {} def store_last_qna(user_id, question, answer): last_qna_map[user_id] = {"question": question, "answer": answer} last_qna_map["anonymous"] = {"question": question, "answer": answer} def get_last_question(user_id="anonymous"): return last_qna_map.get(user_id, {}).get("question", "") def get_last_answer(user_id="anonymous"): return last_qna_map.get(user_id, {}).get("answer", "") def normalize(text): return re.sub(r'[^\w\s]', '', text).strip().lower() def is_feedback_message(text: str) -> bool: cleaned = normalize(text) return any(cleaned.startswith(keyword) for keyword in FEEDBACK_KEYWORDS) def check_question_feedback(query, user_id="anonymous"): feedback_obj = None last_qna = {"question": None, "answer": None} if is_feedback_message(query): feedback_obj = extract_feedback_content(query) last_qna = { "question": get_last_question(user_id), "answer": get_last_answer(user_id), } return {"is_feedback": True, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna} else: return {"is_feedback": False, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna} def extract_feedback_content(raw_message: str) -> dict: cleaned = normalize(raw_message) for keyword in FEEDBACK_KEYWORDS: if cleaned.startswith(keyword): remaining = cleaned[len(keyword):].strip() return {"feedback": keyword, "comment": remaining} return {"feedback": None, "comment": cleaned} def save_feedback(feedback_obj: dict, last_qna: dict) -> str: feedback_data = { "feedback": feedback_obj["feedback"], "comment": feedback_obj["comment"], "question": last_qna.get("question"), "answer": last_qna.get("answer"), "timestamp": datetime.now(timezone.utc).isoformat() } try: feedback_collection.insert_one(feedback_data) return "Thank you for your feedback! We have recorded it successfully" except Exception as e: return f"An error occured when saving feedback: {str(e)}" def translate_answer(question, answer): if len(question) < 5: return answer detected_lang_question = detect(question) detected_lang_answer = detect(answer) if detected_lang_question != detected_lang_answer: translated = GoogleTranslator(source='auto', target=detected_lang_question).translate(answer) return translated else: return answer def translate_list(question, list_of_answers): if len(question) < 3: return list_of_answers detected_lang_question = detect(question) translated = [] if detected_lang_question != 'en': for a in list_of_answers: answer = GoogleTranslator(source='auto', target=detected_lang_question).translate(a) translated.append(answer) return translated else: return list_of_answers def translate_text(lang, text): supported_languages = ['en', 'id', 'fr', 'de', 'th', 'es', 'it', 'pt', 'ja', 'ko', 'zh-cn', 'zh-tw', 'ru'] try: if lang in supported_languages: translated = GoogleTranslator(source='auto', target=lang).translate(text) return translated else: return text except Exception as e: return text def add_question_ticker(query=None): try: web_counter_collection.update_one( {"type": "web"}, { "$inc": {"counter": +1}, "$set": {"timestamp": datetime.now(timezone.utc)}, "$set": {"query": query} }, upsert=True ) return True except Exception as e: print("Error updating web counter:", e) return False def refresh_web_counter(): current_hour = datetime.now().hour refresh_hour = current_hour == 1 if refresh_hour: try: web_counter_collection.update_one( {"type": "web"}, { "$set": {"counter": 0, "timestamp": datetime.now(timezone.utc)} } ) except Exception as e: print("Error refreshing web counter:", e) return True def is_overlimit(): try: counter = web_counter_collection.find_one({"type": "web"}) total_chats = counter.get("counter", 0) if counter else 0 if total_chats > 10: return True else: return False except Exception as e: print("Error updating web counter:", e) return False