File size: 5,986 Bytes
e91d828
 
 
 
 
8ab1f18
 
e91d828
 
 
 
 
 
 
 
 
 
4587c91
e91d828
 
 
 
4587c91
 
 
 
 
c9dbeb3
4587c91
 
 
c9dbeb3
e91d828
 
 
9e89296
 
 
 
 
 
 
 
 
 
 
 
e91d828
 
 
 
 
4aca022
e91d828
9e89296
 
 
 
4aca022
 
9e89296
 
 
 
4aca022
9e89296
4aca022
9e89296
e91d828
 
 
 
4aca022
 
e91d828
 
4aca022
e91d828
 
 
 
 
 
 
 
 
 
 
57b007b
e91d828
57b007b
8ab1f18
 
388c629
 
 
8ab1f18
 
 
 
 
 
 
 
b7d464c
 
 
 
 
 
 
 
 
 
 
 
 
 
ea729ef
 
 
 
 
 
 
 
 
 
 
 
c9dbeb3
 
b15cb21
c9dbeb3
 
 
 
 
b15cb21
 
c9dbeb3
 
 
 
 
 
 
 
f9ef51a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c9dbeb3
 
 
 
990d4e5
c9dbeb3
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import re
from datetime import datetime, timezone
from pymongo import MongoClient
import os
from dotenv import load_dotenv
from langdetect import detect
from deep_translator import GoogleTranslator

# Load Environment Variables
load_dotenv()
MONGODB_URI = os.environ.get("MONGO_URI")
OPENAI_KEY = os.environ.get("OPENAI_API_KEY")

def init_mongodb():
    try:
        client = MongoClient(MONGODB_URI, serverSelectionTimeoutMS=5000)
        client.server_info()
        return client
    except Exception as e:
        error_msg = f"⚠️ Failed connecting to database: {str(e)}"
        return error_msg

client = init_mongodb()
if client:
    db = client['instant_bot']
    collection = db['instant']
    feedback_collection = db['feedback']
    web_counter_collection = db['web_counter']
else:
    collection = None
    feedback_collection = None
    web_counter_collection = None

FEEDBACK_KEYWORDS = ['Feedback', 'feedback', 'Feedback', 'Feedback?', 'feedback?', 'feedback!', 'not helpful', 'not helpful', 'not helpful?', 'not helpful?', 'not helpful!', 'helpful', 'helpful?', 'helpful!', "'helpful'", "'not helpful", "'feedback'"]

last_qna_map = {}

def store_last_qna(user_id, question, answer):
    last_qna_map[user_id] = {"question": question, "answer": answer}
    last_qna_map["anonymous"] = {"question": question, "answer": answer}

def get_last_question(user_id="anonymous"):
    return last_qna_map.get(user_id, {}).get("question", "")

def get_last_answer(user_id="anonymous"):
    return last_qna_map.get(user_id, {}).get("answer", "")
    
def normalize(text):
    return re.sub(r'[^\w\s]', '', text).strip().lower()

def is_feedback_message(text: str) -> bool:
    cleaned = normalize(text)
    return any(cleaned.startswith(keyword) for keyword in FEEDBACK_KEYWORDS)

def check_question_feedback(query, user_id="anonymous"):
    feedback_obj = None
    last_qna = {"question": None, "answer": None}

    if is_feedback_message(query):
        feedback_obj = extract_feedback_content(query)
        last_qna = {
            "question": get_last_question(user_id),
            "answer": get_last_answer(user_id),
        }
        return {"is_feedback": True, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna}
    else:
        return {"is_feedback": False, "query": query, "feedback_obj": feedback_obj, "last_qna": last_qna}
    
def extract_feedback_content(raw_message: str) -> dict:
    cleaned = normalize(raw_message)

    for keyword in FEEDBACK_KEYWORDS:
        if cleaned.startswith(keyword):
            remaining = cleaned[len(keyword):].strip()
            return {"feedback": keyword, "comment": remaining}
        
    return {"feedback": None, "comment": cleaned}

def save_feedback(feedback_obj: dict, last_qna: dict) -> str:
    feedback_data = {
        "feedback": feedback_obj["feedback"],
        "comment": feedback_obj["comment"],
        "question": last_qna.get("question"),
        "answer": last_qna.get("answer"),
        "timestamp": datetime.now(timezone.utc).isoformat()
    }
    try:
        feedback_collection.insert_one(feedback_data)
        return "Thank you for your feedback! We have recorded it successfully"
    except Exception as e:
        return f"An error occured when saving feedback: {str(e)}"
    
def translate_answer(question, answer):
    if len(question) < 5:
        return answer

    detected_lang_question = detect(question)
    detected_lang_answer = detect(answer)

    if detected_lang_question != detected_lang_answer:
        translated = GoogleTranslator(source='auto', target=detected_lang_question).translate(answer)
        return translated
    else:
        return answer

def translate_list(question, list_of_answers):
    if len(question) < 3:
        return list_of_answers

    detected_lang_question = detect(question)

    translated = []
    if detected_lang_question != 'en':
        for a in list_of_answers:
            answer = GoogleTranslator(source='auto', target=detected_lang_question).translate(a)
            translated.append(answer)
        return translated
    else:
        return list_of_answers


def translate_text(lang, text):
    supported_languages = ['en', 'id', 'fr', 'de', 'th', 'es', 'it', 'pt', 'ja', 'ko', 'zh-cn', 'zh-tw', 'ru']
    try:
        if lang in supported_languages:
            translated = GoogleTranslator(source='auto', target=lang).translate(text)
            return translated
        else:
            return text
    except Exception as e:
        return text
    
def add_question_ticker(query=None):
    try:
        web_counter_collection.update_one(
                {"type": "web"},
                {
                    "$inc": {"counter": +1},
                    "$set": {"timestamp": datetime.now(timezone.utc)},
                    "$set": {"query": query}
                },
                upsert=True
            )
        return True
    except Exception as e:
        print("Error updating web counter:", e)
        return False

def refresh_web_counter():
    current_hour = datetime.now().hour
    refresh_hour = current_hour == 1

    if refresh_hour:
        try:
            web_counter_collection.update_one(
                {"type": "web"},
                {
                    "$set": {"counter": 0, "timestamp": datetime.now(timezone.utc)}
                }
            )
        except Exception as e:
            print("Error refreshing web counter:", e)

    return True

def is_overlimit():
    try:
        counter = web_counter_collection.find_one({"type": "web"})
        total_chats = counter.get("counter", 0) if counter else 0
        if total_chats > 10:
            return True
        else:
            return False
    except Exception as e:
        print("Error updating web counter:", e)
        return False