File size: 9,926 Bytes
0870bc8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
"""
Created By: ishwor subedi
Date: 2024-08-28
"""
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from dateutil.parser import isoparse
from fastapi.routing import APIRouter
from src.pipeline.conversai_analytic_pipeline import ConversAIAnalyticPipeline
from fastapi import Request
from src.utils.error_handling import create_success_response, raise_http_exception, \
success_response_user_management
from src.models.apis_models import FeedbackRequest, DailyActiveEndUserRequest, AverageSessionInteractionRequest, \
TokenUsageRequest, UserSatisfactionRateRequest
from src import logging as logger
analytic_endpoints_router = APIRouter(tags=["Analytics Endpoints"])
conversai_analytic_pipeline = ConversAIAnalyticPipeline()
@analytic_endpoints_router.post("/daily_chat_count")
async def daily_chat_count(
request: DailyActiveEndUserRequest):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> daily_chat_count API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)
dates = [
isoparse(i["timestamp"]).date()
for i in response
if start_date <= isoparse(i["timestamp"]).date() <= end_date
]
date_count = Counter(dates)
data = [{"date": date.isoformat(), "count": count} for date, count in date_count.items()]
response = create_success_response(code=200, data=dict(output=data))
logger.info(f">>> daily_chat_count API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> daily_chat_count API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/daily_active_end_user")
async def daily_active_end_user(
request: DailyActiveEndUserRequest
):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> daily_active_end_user API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)
ip_by_date = defaultdict(set)
for i in response:
timestamp = isoparse(i["timestamp"])
ip_address = i["IpAddress"]
if start_date <= timestamp.date() <= end_date:
date = timestamp.date()
ip_by_date[date].add(ip_address)
data = [{"date": date.isoformat(), "terminal": len(ips)} for date, ips in ip_by_date.items() if len(ips) > 1]
response = create_success_response(code=200, data=dict(output=data))
logger.info(f">>> daily_active_end_user API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> daily_active_end_user API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/average_session_interaction")
async def average_session_interaction(
request: AverageSessionInteractionRequest
):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> average_session_interaction API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)
total_messages_by_date = defaultdict(int)
unique_ips_by_date = defaultdict(set)
for i in response:
timestamp = isoparse(i["timestamp"])
ip_address = i["IpAddress"]
if start_date <= timestamp.date() <= end_date:
date = timestamp.date()
total_messages_by_date[date] += 1
unique_ips_by_date[date].add(ip_address)
data = []
for date in sorted(total_messages_by_date.keys()):
total_messages = total_messages_by_date[date]
unique_ips = len(unique_ips_by_date[date])
average_interactions = total_messages / unique_ips if unique_ips > 0 else 0
data.append({"date": date.isoformat(), "interactions": average_interactions})
response = create_success_response(code=200, data=dict(data=data))
logger.info(f">>> average_session_interaction API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> average_session_interaction API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/token_usages")
async def token_usages(request: TokenUsageRequest):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> token_usages API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
response = conversai_analytic_pipeline.chat_history_table_(vectorstore=vectorstore)
token_usage_by_date = defaultdict(int)
for i in response:
timestamp = isoparse(i["timestamp"])
if start_date <= timestamp.date() <= end_date:
date = timestamp.date()
response_token_count = i.get("ResponseTokenCount")
if response_token_count is not None:
token_usage_by_date[date] += response_token_count
data = [{"date": date.isoformat(), "total_tokens": total_tokens} for date, total_tokens in
token_usage_by_date.items()]
response = create_success_response(code=200, data=dict(output=data))
logger.info(f">>> token_usages API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> token_usages API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/add_feedback")
async def add_feedback(req: Request, request: FeedbackRequest):
feedback, user_id, vectorstore = request.feedback, request.user_id, request.vectorstore
try:
logger.info(f">>> add_feedback API Triggered by {request.vectorstore} <<<")
client_ip = req.client.host
city = conversai_analytic_pipeline.get_ip_info(client_ip)
conversai_analytic_pipeline.add_feedback_(feedback, user_id, city, client_ip, vectorstore)
response = success_response_user_management(code=200, message="Add Feedback Sucess")
logger.info(f">>> add_feedback API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.error(f">>> add_feedback API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
@analytic_endpoints_router.post("/user_satisfaction_rate")
async def user_satisfaction_rate(
request: UserSatisfactionRateRequest
):
start_date, end_date, vectorstore = request.start_date, request.end_date, request.vectorstore
logger.info(f">>> user_satisfaction_rate API Triggered by {vectorstore} <<<")
try:
if not start_date or not end_date:
end_date = datetime.now().astimezone().date()
start_date = end_date - timedelta(days=7)
else:
start_date = isoparse(start_date).date()
end_date = isoparse(end_date).date()
feedback_counts = defaultdict(lambda: {"like": 0, "dislike": 0})
response = conversai_analytic_pipeline.feedback_table_(vectorstore)
for i in response:
timestamp = isoparse(i["timestamp"])
if start_date <= timestamp.date() <= end_date:
date = timestamp.date()
feedback = i.get("feedback")
if feedback == "like":
feedback_counts[date]["like"] += 1
elif feedback == "dislike":
feedback_counts[date]["dislike"] += 1
data = []
for date in sorted(feedback_counts.keys()):
like_count = feedback_counts[date]["like"]
dislike_count = feedback_counts[date]["dislike"]
total_feedback = like_count + dislike_count
satisfaction_rate = (like_count / total_feedback * 100) if total_feedback > 0 else 0
data.append({"date": date.isoformat(), "rate": satisfaction_rate})
response = create_success_response(code=200, data=dict(output=data))
logger.info(f">>> user_satisfaction_rate API Response Success for {vectorstore} <<<")
return response
except Exception as e:
logger.info(f">>> user_satisfaction_rate API Response Failed for {vectorstore} {e}<<<")
raise_http_exception(500, "Internal Server Error")
|