File size: 9,926 Bytes
0870bc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
Created By: ishwor subedi
Date: 2024-08-28
"""
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from dateutil.parser import isoparse
from fastapi.routing import APIRouter
from src.pipeline.conversai_analytic_pipeline import ConversAIAnalyticPipeline
from fastapi import Request
from src.utils.error_handling import create_success_response, raise_http_exception, \
    success_response_user_management
from src.models.apis_models import FeedbackRequest, DailyActiveEndUserRequest, AverageSessionInteractionRequest, \
    TokenUsageRequest, UserSatisfactionRateRequest
from src import logging as logger

analytic_endpoints_router = APIRouter(tags=["Analytics Endpoints"])

conversai_analytic_pipeline = ConversAIAnalyticPipeline()


@analytic_endpoints_router.post("/daily_chat_count")
async def daily_chat_count(
        request: DailyActiveEndUserRequest):
    start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
    logger.info(f">>> daily_chat_count API Triggered by {vectorstore} <<<")
    try:
        if not start_date or not end_date:
            end_date = datetime.now().astimezone().date()
            start_date = end_date - timedelta(days=7)
        else:
            start_date = isoparse(start_date).date()
            end_date = isoparse(end_date).date()

        response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)

        dates = [
            isoparse(i["timestamp"]).date()
            for i in response
            if start_date <= isoparse(i["timestamp"]).date() <= end_date
        ]

        date_count = Counter(dates)

        data = [{"date": date.isoformat(), "count": count} for date, count in date_count.items()]

        response = create_success_response(code=200, data=dict(output=data))
        logger.info(f">>> daily_chat_count API Response Success  for {vectorstore} <<<")

        return response

    except Exception as e:
        logger.error(f">>> daily_chat_count API Response Failed for {vectorstore} {e}<<<")

        raise_http_exception(500, "Internal Server Error")


@analytic_endpoints_router.post("/daily_active_end_user")
async def daily_active_end_user(
        request: DailyActiveEndUserRequest
):
    start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
    logger.info(f">>> daily_active_end_user API Triggered by {vectorstore} <<<")
    try:
        if not start_date or not end_date:
            end_date = datetime.now().astimezone().date()
            start_date = end_date - timedelta(days=7)
        else:
            start_date = isoparse(start_date).date()
            end_date = isoparse(end_date).date()

        response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)

        ip_by_date = defaultdict(set)

        for i in response:
            timestamp = isoparse(i["timestamp"])
            ip_address = i["IpAddress"]
            if start_date <= timestamp.date() <= end_date:
                date = timestamp.date()
                ip_by_date[date].add(ip_address)

        data = [{"date": date.isoformat(), "terminal": len(ips)} for date, ips in ip_by_date.items() if len(ips) > 1]

        response = create_success_response(code=200, data=dict(output=data))
        logger.info(f">>> daily_active_end_user API Response Success for {vectorstore} <<<")

        return response
    except Exception as e:
        logger.error(f">>> daily_active_end_user API Response Failed for {vectorstore} {e}<<<")

        raise_http_exception(500, "Internal Server Error")


@analytic_endpoints_router.post("/average_session_interaction")
async def average_session_interaction(
        request: AverageSessionInteractionRequest
):
    start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
    logger.info(f">>> average_session_interaction API Triggered by {vectorstore} <<<")
    try:
        if not start_date or not end_date:
            end_date = datetime.now().astimezone().date()
            start_date = end_date - timedelta(days=7)
        else:
            start_date = isoparse(start_date).date()
            end_date = isoparse(end_date).date()

        response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)

        total_messages_by_date = defaultdict(int)
        unique_ips_by_date = defaultdict(set)

        for i in response:
            timestamp = isoparse(i["timestamp"])
            ip_address = i["IpAddress"]
            if start_date <= timestamp.date() <= end_date:
                date = timestamp.date()
                total_messages_by_date[date] += 1
                unique_ips_by_date[date].add(ip_address)

        data = []
        for date in sorted(total_messages_by_date.keys()):
            total_messages = total_messages_by_date[date]
            unique_ips = len(unique_ips_by_date[date])
            average_interactions = total_messages / unique_ips if unique_ips > 0 else 0
            data.append({"date": date.isoformat(), "interactions": average_interactions})

        response = create_success_response(code=200, data=dict(data=data))
        logger.info(f">>> average_session_interaction API Response Success for {vectorstore} <<<")

        return response
    except Exception as e:
        logger.error(f">>> average_session_interaction API Response Failed for {vectorstore} {e}<<<")
        raise_http_exception(500, "Internal Server Error")


@analytic_endpoints_router.post("/token_usages")
async def token_usages(request: TokenUsageRequest):
    start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
    logger.info(f">>> token_usages API Triggered by {vectorstore} <<<")
    try:
        if not start_date or not end_date:
            end_date = datetime.now().astimezone().date()
            start_date = end_date - timedelta(days=7)
        else:
            start_date = isoparse(start_date).date()
            end_date = isoparse(end_date).date()

        response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)

        token_usage_by_date = defaultdict(int)

        for i in response:
            timestamp = isoparse(i["timestamp"])
            if start_date <= timestamp.date() <= end_date:
                date = timestamp.date()
                response_token_count = i.get("ResponseTokenCount")
                if response_token_count is not None:
                    token_usage_by_date[date] += response_token_count

        data = [{"date": date.isoformat(), "total_tokens": total_tokens} for date, total_tokens in
                token_usage_by_date.items()]

        response = create_success_response(code=200, data=dict(output=data))
        logger.info(f">>> token_usages API Response Success for {vectorstore} <<<")

        return response
    except Exception as e:
        logger.error(f">>> token_usages API Response Failed for {vectorstore} {e}<<<")
        raise_http_exception(500, "Internal Server Error")


@analytic_endpoints_router.post("/add_feedback")
async def add_feedback(req: Request, request: FeedbackRequest):
    feedback, user_id, vectorstore = request.feedback, request.user_id, request.vectorstore
    try:
        logger.info(f">>> add_feedback API Triggered by {request.vectorstore} <<<")

        client_ip = req.client.host
        city = conversai_analytic_pipeline.get_ip_info(client_ip)

        conversai_analytic_pipeline.add_feedback_(feedback, user_id, city, client_ip, vectorstore)

        response = success_response_user_management(code=200, message="Add Feedback Sucess")
        logger.info(f">>> add_feedback API Response Success for {vectorstore} <<<")

        return response

    except Exception as e:
        logger.error(f">>> add_feedback API Response Failed for {vectorstore} {e}<<<")
        raise_http_exception(500, "Internal Server Error")


@analytic_endpoints_router.post("/user_satisfaction_rate")
async def user_satisfaction_rate(
        request: UserSatisfactionRateRequest
):
    start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
    logger.info(f">>> user_satisfaction_rate API Triggered by {vectorstore} <<<")
    try:
        if not start_date or not end_date:
            end_date = datetime.now().astimezone().date()
            start_date = end_date - timedelta(days=7)
        else:
            start_date = isoparse(start_date).date()
            end_date = isoparse(end_date).date()

        feedback_counts = defaultdict(lambda: {"like": 0, "dislike": 0})
        response = conversai_analytic_pipeline.feedback_table_(vectorstore)
        for i in response:
            timestamp = isoparse(i["timestamp"])
            if start_date <= timestamp.date() <= end_date:
                date = timestamp.date()
                feedback = i.get("feedback")
                if feedback == "like":
                    feedback_counts[date]["like"] += 1
                elif feedback == "dislike":
                    feedback_counts[date]["dislike"] += 1

        data = []
        for date in sorted(feedback_counts.keys()):
            like_count = feedback_counts[date]["like"]
            dislike_count = feedback_counts[date]["dislike"]
            total_feedback = like_count + dislike_count
            satisfaction_rate = (like_count / total_feedback * 100) if total_feedback > 0 else 0
            data.append({"date": date.isoformat(), "rate": satisfaction_rate})

        response = create_success_response(code=200, data=dict(output=data))
        logger.info(f">>> user_satisfaction_rate API Response Success for {vectorstore} <<<")

        return response
    except Exception as e:
        logger.info(f">>> user_satisfaction_rate API Response Failed for {vectorstore} {e}<<<")
        raise_http_exception(500, "Internal Server Error")