Spaces:
Sleeping
Sleeping
File size: 5,986 Bytes
e91d828 8ab1f18 e91d828 4587c91 e91d828 4587c91 c9dbeb3 4587c91 c9dbeb3 e91d828 9e89296 e91d828 4aca022 e91d828 9e89296 4aca022 9e89296 4aca022 9e89296 4aca022 9e89296 e91d828 4aca022 e91d828 4aca022 e91d828 57b007b e91d828 57b007b 8ab1f18 388c629 8ab1f18 b7d464c ea729ef c9dbeb3 b15cb21 c9dbeb3 b15cb21 c9dbeb3 f9ef51a c9dbeb3 990d4e5 c9dbeb3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | import re
from datetime import datetime, timezone
from pymongo import MongoClient
import os
from dotenv import load_dotenv
from langdetect import detect
from deep_translator import GoogleTranslator
# Load Environment Variables
load_dotenv()
MONGODB_URI = os.environ.get("MONGO_URI")
OPENAI_KEY = os.environ.get("OPENAI_API_KEY")
def init_mongodb():
try:
client = MongoClient(MONGODB_URI, serverSelectionTimeoutMS=5000)
client.server_info()
return client
except Exception as e:
error_msg = f"⚠️ Failed connecting to database: {str(e)}"
return error_msg
client = init_mongodb()
if client:
db = client['instant_bot']
collection = db['instant']
feedback_collection = db['feedback']
web_counter_collection = db['web_counter']
else:
collection = None
feedback_collection = None
web_counter_collection = None
FEEDBACK_KEYWORDS = ['Feedback', 'feedback', 'Feedback', 'Feedback?', 'feedback?', 'feedback!', 'not helpful', 'not helpful', 'not helpful?', 'not helpful?', 'not helpful!', 'helpful', 'helpful?', 'helpful!', "'helpful'", "'not helpful", "'feedback'"]
last_qna_map = {}
def store_last_qna(user_id, question, answer):
last_qna_map[user_id] = {"question": question, "answer": answer}
last_qna_map["anonymous"] = {"question": question, "answer": answer}
def get_last_question(user_id="anonymous"):
return last_qna_map.get(user_id, {}).get("question", "")
def get_last_answer(user_id="anonymous"):
return last_qna_map.get(user_id, {}).get("answer", "")
def normalize(text):
return re.sub(r'[^\w\s]', '', text).strip().lower()
def is_feedback_message(text: str) -> bool:
cleaned = normalize(text)
return any(cleaned.startswith(keyword) for keyword in FEEDBACK_KEYWORDS)
def check_question_feedback(query, user_id="anonymous"):
feedback_obj = None
last_qna = {"question": None, "answer": None}
if is_feedback_message(query):
feedback_obj = extract_feedback_content(query)
last_qna = {
"question": get_last_question(user_id),
"answer": get_last_answer(user_id),
}
return {"is_feedback": True, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna}
else:
return {"is_feedback": False, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna}
def extract_feedback_content(raw_message: str) -> dict:
cleaned = normalize(raw_message)
for keyword in FEEDBACK_KEYWORDS:
if cleaned.startswith(keyword):
remaining = cleaned[len(keyword):].strip()
return {"feedback": keyword, "comment": remaining}
return {"feedback": None, "comment": cleaned}
def save_feedback(feedback_obj: dict, last_qna: dict) -> str:
feedback_data = {
"feedback": feedback_obj["feedback"],
"comment": feedback_obj["comment"],
"question": last_qna.get("question"),
"answer": last_qna.get("answer"),
"timestamp": datetime.now(timezone.utc).isoformat()
}
try:
feedback_collection.insert_one(feedback_data)
return "Thank you for your feedback! We have recorded it successfully"
except Exception as e:
return f"An error occured when saving feedback: {str(e)}"
def translate_answer(question, answer):
if len(question) < 5:
return answer
detected_lang_question = detect(question)
detected_lang_answer = detect(answer)
if detected_lang_question != detected_lang_answer:
translated = GoogleTranslator(source='auto', target=detected_lang_question).translate(answer)
return translated
else:
return answer
def translate_list(question, list_of_answers):
if len(question) < 3:
return list_of_answers
detected_lang_question = detect(question)
translated = []
if detected_lang_question != 'en':
for a in list_of_answers:
answer = GoogleTranslator(source='auto', target=detected_lang_question).translate(a)
translated.append(answer)
return translated
else:
return list_of_answers
def translate_text(lang, text):
supported_languages = ['en', 'id', 'fr', 'de', 'th', 'es', 'it', 'pt', 'ja', 'ko', 'zh-cn', 'zh-tw', 'ru']
try:
if lang in supported_languages:
translated = GoogleTranslator(source='auto', target=lang).translate(text)
return translated
else:
return text
except Exception as e:
return text
def add_question_ticker(query=None):
try:
web_counter_collection.update_one(
{"type": "web"},
{
"$inc": {"counter": +1},
"$set": {"timestamp": datetime.now(timezone.utc)},
"$set": {"query": query}
},
upsert=True
)
return True
except Exception as e:
print("Error updating web counter:", e)
return False
def refresh_web_counter():
current_hour = datetime.now().hour
refresh_hour = current_hour == 1
if refresh_hour:
try:
web_counter_collection.update_one(
{"type": "web"},
{
"$set": {"counter": 0, "timestamp": datetime.now(timezone.utc)}
}
)
except Exception as e:
print("Error refreshing web counter:", e)
return True
def is_overlimit():
try:
counter = web_counter_collection.find_one({"type": "web"})
total_chats = counter.get("counter", 0) if counter else 0
if total_chats > 10:
return True
else:
return False
except Exception as e:
print("Error updating web counter:", e)
return False |