Admin-Desk2 / app /services /analytics.py
Fred808's picture
Upload 94 files
1f8ac0c verified
from datetime import datetime, timedelta
from sqlalchemy import func, and_, select, extract
from ..db.database import db
from ..utils.cache import cache
from ..db.models import StaffActivity, PerformanceMetric, User, ActivityType
from typing import Dict, List, Any, Optional
import numpy as np
from collections import defaultdict
class AnalyticsService:
@staticmethod
async def get_sales_analytics(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
cache_key = f"sales_analytics:{start_date.date()}:{end_date.date()}"
cached_data = await cache.get_cache(cache_key)
if cached_data:
return cached_data
pipeline = [
{
"$match": {
"created_at": {
"$gte": start_date,
"$lte": end_date
},
"status": {"$in": ["completed", "delivered"]}
}
},
{
"$group": {
"_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
"total_sales": {"$sum": "$total_amount"},
"order_count": {"$sum": 1}
}
},
{"$sort": {"_id": 1}}
]
sales_data = await db.db["orders"].aggregate(pipeline).to_list(None)
result = {
"daily_sales": sales_data,
"total_revenue": sum(day["total_sales"] for day in sales_data),
"total_orders": sum(day["order_count"] for day in sales_data),
"average_order_value": sum(day["total_sales"] for day in sales_data) /
(sum(day["order_count"] for day in sales_data) or 1)
}
await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour
return result
@staticmethod
async def get_product_analytics() -> Dict[str, Any]:
cache_key = "product_analytics"
cached_data = await cache.get_cache(cache_key)
if cached_data:
return cached_data
pipeline = [
{
"$unwind": "$products"
},
{
"$group": {
"_id": "$products.product_id",
"total_quantity": {"$sum": "$products.quantity"},
"total_revenue": {
"$sum": {
"$multiply": ["$products.price", "$products.quantity"]
}
}
}
},
{
"$sort": {"total_revenue": -1}
},
{
"$limit": 10
}
]
top_products = await db.db["orders"].aggregate(pipeline).to_list(None)
# Get product details
for product in top_products:
product_detail = await db.db["products"].find_one({"_id": product["_id"]})
if product_detail:
product["name"] = product_detail["name"]
product["category"] = product_detail["category"]
result = {
"top_products": top_products,
"total_products": await db.db["products"].count_documents({}),
"low_stock_products": await db.db["products"].count_documents({"inventory_count": {"$lt": 10}})
}
await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour
return result
@staticmethod
async def get_customer_analytics() -> Dict[str, Any]:
cache_key = "customer_analytics"
cached_data = await cache.get_cache(cache_key)
if cached_data:
return cached_data
pipeline = [
{
"$group": {
"_id": "$customer_id",
"total_orders": {"$sum": 1},
"total_spent": {"$sum": "$total_amount"},
"last_order": {"$max": "$created_at"}
}
},
{
"$sort": {"total_spent": -1}
}
]
customer_data = await db.db["orders"].aggregate(pipeline).to_list(None)
result = {
"total_customers": len(customer_data),
"top_customers": customer_data[:10],
"average_customer_value": sum(c["total_spent"] for c in customer_data) / (len(customer_data) or 1),
"customer_segments": {
"high_value": len([c for c in customer_data if c["total_spent"] > 1000]),
"medium_value": len([c for c in customer_data if 500 <= c["total_spent"] <= 1000]),
"low_value": len([c for c in customer_data if c["total_spent"] < 500])
}
}
await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour
return result
class StaffAnalyticsService:
@staticmethod
async def record_activity(
user_id: int,
branch_id: int,
activity_type: ActivityType,
details: dict,
duration: Optional[float] = None
) -> StaffActivity:
"""Record a staff activity with performance scoring and notifications"""
# Calculate performance score based on activity type and details
score = None
if activity_type == ActivityType.SALE:
# Score based on sale amount and speed
amount = details.get('amount', 0)
duration = details.get('duration', 0) # duration in minutes
if duration > 0:
score = min((amount / 100) * (5 / duration), 10) # Max score of 10
elif activity_type == ActivityType.CUSTOMER_SERVICE:
# Score based on interaction quality
satisfaction = details.get('customer_satisfaction', 0)
resolution_time = details.get('resolution_time', 0)
if resolution_time > 0:
score = min((satisfaction * 2) * (10 / resolution_time), 10)
async with db() as session:
activity = StaffActivity(
user_id=user_id,
branch_id=branch_id,
activity_type=activity_type,
details=details,
duration=duration,
performance_score=score
)
session.add(activity)
await session.commit()
await session.refresh(activity)
# Get current metrics before update
prev_metrics = await StaffAnalyticsService._get_current_metrics(user_id, branch_id)
# Update daily performance metrics
new_metrics = await StaffAnalyticsService._update_performance_metrics(
user_id, branch_id, activity
)
return activity, prev_metrics, new_metrics
@staticmethod
async def _get_current_metrics(user_id: int, branch_id: int) -> Optional[Dict[str, Any]]:
"""Get current day's metrics for a user"""
today = datetime.utcnow().date()
async with db() as session:
stmt = select(PerformanceMetric).where(
and_(
PerformanceMetric.user_id == user_id,
PerformanceMetric.branch_id == branch_id,
func.date(PerformanceMetric.metric_date) == today
)
)
result = await session.execute(stmt)
metric = result.scalar_one_or_none()
if metric:
return {
"total_sales": metric.total_sales,
"transaction_count": metric.transaction_count,
"void_count": metric.void_count,
"efficiency_score": metric.efficiency_score,
"customer_interaction_count": metric.customer_interaction_count
}
return None
@staticmethod
async def _update_performance_metrics(
user_id: int,
branch_id: int,
activity: StaffActivity
) -> Dict[str, Any]:
"""Update daily performance metrics based on new activity"""
today = datetime.utcnow().date()
async with db() as session:
# Get or create today's metrics
stmt = select(PerformanceMetric).where(
and_(
PerformanceMetric.user_id == user_id,
PerformanceMetric.branch_id == branch_id,
func.date(PerformanceMetric.metric_date) == today
)
)
result = await session.execute(stmt)
metric = result.scalar_one_or_none()
if not metric:
metric = PerformanceMetric(
user_id=user_id,
branch_id=branch_id,
metric_date=datetime.utcnow()
)
session.add(metric)
# Update metrics based on activity type
if activity.activity_type == ActivityType.SALE:
metric.total_sales += activity.details.get('amount', 0)
metric.transaction_count += 1
metric.average_transaction_value = metric.total_sales / metric.transaction_count
elif activity.activity_type == ActivityType.VOID:
metric.void_count += 1
elif activity.activity_type == ActivityType.CUSTOMER_SERVICE:
metric.customer_interaction_count += 1
elif activity.activity_type in [ActivityType.LOGIN, ActivityType.LOGOUT]:
if activity.duration:
metric.login_time += activity.duration
# Calculate efficiency score
# Weight different factors in the score
weights = {
'sales': 0.4,
'speed': 0.2,
'accuracy': 0.2,
'customer_service': 0.2
}
# Calculate component scores
sales_score = min((metric.total_sales / 1000) * 10, 10) # Scale sales to 0-10
speed_score = 10 * (1 - (metric.login_time / (8 * 60))) # Assuming 8-hour day
accuracy_score = 10 * (1 - (metric.void_count / max(metric.transaction_count, 1)))
cs_score = min((metric.customer_interaction_count / 10) * 10, 10)
metric.efficiency_score = (
(sales_score * weights['sales']) +
(speed_score * weights['speed']) +
(accuracy_score * weights['accuracy']) +
(cs_score * weights['customer_service'])
)
await session.commit()
await session.refresh(metric)
return {
"total_sales": metric.total_sales,
"transaction_count": metric.transaction_count,
"average_transaction_value": metric.average_transaction_value,
"void_count": metric.void_count,
"customer_interaction_count": metric.customer_interaction_count,
"login_time": metric.login_time,
"efficiency_score": metric.efficiency_score
}
@staticmethod
async def get_staff_performance(
branch_id: Optional[int] = None,
user_id: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get comprehensive staff performance metrics"""
if not start_date:
start_date = datetime.utcnow() - timedelta(days=30)
if not end_date:
end_date = datetime.utcnow()
cache_key = f"staff_performance:{branch_id}:{user_id}:{start_date.date()}:{end_date.date()}"
cached_data = await cache.get_cache(cache_key)
if cached_data:
return cached_data
async with db() as session:
# Base query conditions
conditions = [
PerformanceMetric.metric_date.between(start_date, end_date)
]
if branch_id:
conditions.append(PerformanceMetric.branch_id == branch_id)
if user_id:
conditions.append(PerformanceMetric.user_id == user_id)
# Get aggregated metrics
metrics_stmt = select(
PerformanceMetric.user_id,
func.sum(PerformanceMetric.total_sales).label('total_sales'),
func.sum(PerformanceMetric.transaction_count).label('transaction_count'),
func.avg(PerformanceMetric.average_transaction_value).label('avg_transaction_value'),
func.sum(PerformanceMetric.void_count).label('void_count'),
func.sum(PerformanceMetric.customer_interaction_count).label('customer_interactions'),
func.sum(PerformanceMetric.login_time).label('total_login_time'),
func.avg(PerformanceMetric.efficiency_score).label('avg_efficiency_score')
).where(
and_(*conditions)
).group_by(
PerformanceMetric.user_id
)
result = await session.execute(metrics_stmt)
metrics_data = result.all()
# Get user details
user_ids = [m.user_id for m in metrics_data]
users_stmt = select(User).where(User.id.in_(user_ids))
users = (await session.execute(users_stmt)).scalars().all()
users_dict = {u.id: u for u in users}
# Format response data
performance_data = []
for metric in metrics_data:
user = users_dict.get(metric.user_id)
if user:
performance_data.append({
"user_id": user.id,
"username": user.username,
"full_name": user.full_name,
"metrics": {
"total_sales": metric.total_sales,
"transaction_count": metric.transaction_count,
"average_transaction_value": metric.avg_transaction_value,
"void_count": metric.void_count,
"customer_interactions": metric.customer_interactions,
"total_login_time": metric.total_login_time,
"efficiency_score": metric.avg_efficiency_score
}
})
# Calculate branch averages if branch_id is specified
branch_averages = None
if branch_id:
avg_stmt = select(
func.avg(PerformanceMetric.total_sales).label('avg_sales'),
func.avg(PerformanceMetric.transaction_count).label('avg_transactions'),
func.avg(PerformanceMetric.average_transaction_value).label('avg_transaction_value'),
func.avg(PerformanceMetric.efficiency_score).label('avg_efficiency')
).where(
and_(
PerformanceMetric.branch_id == branch_id,
PerformanceMetric.metric_date.between(start_date, end_date)
)
)
avg_result = await session.execute(avg_stmt)
branch_avg = avg_result.one()
branch_averages = {
"average_daily_sales": branch_avg.avg_sales,
"average_daily_transactions": branch_avg.avg_transactions,
"average_transaction_value": branch_avg.avg_transaction_value,
"average_efficiency_score": branch_avg.avg_efficiency
}
response = {
"staff_performance": performance_data,
"date_range": {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat()
}
}
if branch_averages:
response["branch_averages"] = branch_averages
# Cache the response for 1 hour
await cache.set_cache(cache_key, response, expire=3600)
return response
analytics = AnalyticsService()
staff_analytics = StaffAnalyticsService()