Spaces:
Sleeping
Sleeping
| import re | |
| from datetime import datetime, timezone | |
| from pymongo import MongoClient | |
| import os | |
| from dotenv import load_dotenv | |
| from langdetect import detect | |
| from deep_translator import GoogleTranslator | |
| # Load Environment Variables | |
| load_dotenv() | |
| MONGODB_URI = os.environ.get("MONGO_URI") | |
| OPENAI_KEY = os.environ.get("OPENAI_API_KEY") | |
| def init_mongodb(): | |
| try: | |
| client = MongoClient(MONGODB_URI, serverSelectionTimeoutMS=5000) | |
| client.server_info() | |
| return client | |
| except Exception as e: | |
| error_msg = f"⚠️ Failed connecting to database: {str(e)}" | |
| return error_msg | |
| client = init_mongodb() | |
| if client: | |
| db = client['instant_bot'] | |
| collection = db['instant'] | |
| feedback_collection = db['feedback'] | |
| web_counter_collection = db['web_counter'] | |
| else: | |
| collection = None | |
| feedback_collection = None | |
| web_counter_collection = None | |
| FEEDBACK_KEYWORDS = ['Feedback', 'feedback', 'Feedback', 'Feedback?', 'feedback?', 'feedback!', 'not helpful', 'not helpful', 'not helpful?', 'not helpful?', 'not helpful!', 'helpful', 'helpful?', 'helpful!', "'helpful'", "'not helpful", "'feedback'"] | |
| last_qna_map = {} | |
| def store_last_qna(user_id, question, answer): | |
| last_qna_map[user_id] = {"question": question, "answer": answer} | |
| last_qna_map["anonymous"] = {"question": question, "answer": answer} | |
| def get_last_question(user_id="anonymous"): | |
| return last_qna_map.get(user_id, {}).get("question", "") | |
| def get_last_answer(user_id="anonymous"): | |
| return last_qna_map.get(user_id, {}).get("answer", "") | |
| def normalize(text): | |
| return re.sub(r'[^\w\s]', '', text).strip().lower() | |
| def is_feedback_message(text: str) -> bool: | |
| cleaned = normalize(text) | |
| return any(cleaned.startswith(keyword) for keyword in FEEDBACK_KEYWORDS) | |
| def check_question_feedback(query, user_id="anonymous"): | |
| feedback_obj = None | |
| last_qna = {"question": None, "answer": None} | |
| if is_feedback_message(query): | |
| feedback_obj = extract_feedback_content(query) | |
| last_qna = { | |
| "question": get_last_question(user_id), | |
| "answer": get_last_answer(user_id), | |
| } | |
| return {"is_feedback": True, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna} | |
| else: | |
| return {"is_feedback": False, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna} | |
| def extract_feedback_content(raw_message: str) -> dict: | |
| cleaned = normalize(raw_message) | |
| for keyword in FEEDBACK_KEYWORDS: | |
| if cleaned.startswith(keyword): | |
| remaining = cleaned[len(keyword):].strip() | |
| return {"feedback": keyword, "comment": remaining} | |
| return {"feedback": None, "comment": cleaned} | |
| def save_feedback(feedback_obj: dict, last_qna: dict) -> str: | |
| feedback_data = { | |
| "feedback": feedback_obj["feedback"], | |
| "comment": feedback_obj["comment"], | |
| "question": last_qna.get("question"), | |
| "answer": last_qna.get("answer"), | |
| "timestamp": datetime.now(timezone.utc).isoformat() | |
| } | |
| try: | |
| feedback_collection.insert_one(feedback_data) | |
| return "Thank you for your feedback! We have recorded it successfully" | |
| except Exception as e: | |
| return f"An error occured when saving feedback: {str(e)}" | |
| def translate_answer(question, answer): | |
| if len(question) < 5: | |
| return answer | |
| detected_lang_question = detect(question) | |
| detected_lang_answer = detect(answer) | |
| if detected_lang_question != detected_lang_answer: | |
| translated = GoogleTranslator(source='auto', target=detected_lang_question).translate(answer) | |
| return translated | |
| else: | |
| return answer | |
| def translate_list(question, list_of_answers): | |
| if len(question) < 3: | |
| return list_of_answers | |
| detected_lang_question = detect(question) | |
| translated = [] | |
| if detected_lang_question != 'en': | |
| for a in list_of_answers: | |
| answer = GoogleTranslator(source='auto', target=detected_lang_question).translate(a) | |
| translated.append(answer) | |
| return translated | |
| else: | |
| return list_of_answers | |
| def translate_text(lang, text): | |
| supported_languages = ['en', 'id', 'fr', 'de', 'th', 'es', 'it', 'pt', 'ja', 'ko', 'zh-cn', 'zh-tw', 'ru'] | |
| try: | |
| if lang in supported_languages: | |
| translated = GoogleTranslator(source='auto', target=lang).translate(text) | |
| return translated | |
| else: | |
| return text | |
| except Exception as e: | |
| return text | |
| def add_question_ticker(query=None): | |
| try: | |
| web_counter_collection.update_one( | |
| {"type": "web"}, | |
| { | |
| "$inc": {"counter": +1}, | |
| "$set": {"timestamp": datetime.now(timezone.utc)}, | |
| "$set": {"query": query} | |
| }, | |
| upsert=True | |
| ) | |
| return True | |
| except Exception as e: | |
| print("Error updating web counter:", e) | |
| return False | |
| def refresh_web_counter(): | |
| current_hour = datetime.now().hour | |
| refresh_hour = current_hour == 1 | |
| if refresh_hour: | |
| try: | |
| web_counter_collection.update_one( | |
| {"type": "web"}, | |
| { | |
| "$set": {"counter": 0, "timestamp": datetime.now(timezone.utc)} | |
| } | |
| ) | |
| except Exception as e: | |
| print("Error refreshing web counter:", e) | |
| return True | |
| def is_overlimit(): | |
| try: | |
| counter = web_counter_collection.find_one({"type": "web"}) | |
| total_chats = counter.get("counter", 0) if counter else 0 | |
| if total_chats > 10: | |
| return True | |
| else: | |
| return False | |
| except Exception as e: | |
| print("Error updating web counter:", e) | |
| return False |