""" Common utilities. """ import asyncio import base64 import re from typing import Callable, TypeVar import httpx from cbh.api.account.dto import AccountType from cbh.api.account.models import AccountModel from cbh.api.common.dto import ( LeaderboardStatisticsPosition, Scores, OrderType, SkillStatistics, IDNamePicture, ) from cbh.api.common.schemas import SkillsStatisticsResponse from cbh.core.config import settings T = TypeVar("T") def calculate_avg_scores(reports: list) -> Scores: if not reports: return Scores( communication=0, activeListening=0, conversation=0, objection=0, empathy=0, final=0, ) return Scores( communication=round(sum(report.scores.communication for report in reports) / len(reports)), activeListening=round( sum(report.scores.activeListening for report in reports) / len(reports) ), conversation=round(sum(report.scores.conversation for report in reports) / len(reports)), objection=round(sum(report.scores.objection for report in reports) / len(reports)), empathy=round(sum(report.scores.empathy for report in reports) / len(reports)), final=round(sum(report.scores.final for report in reports) / len(reports)), ) def form_user_stats(session_reports: list) -> dict[str, dict]: user_stats = {} for report in session_reports: user_id = report.account.id if user_id not in user_stats: user_stats[user_id] = { "account": report.account, "reports": [], "attempts": 0, } user_stats[user_id]["reports"].append(report) user_stats[user_id]["attempts"] += 1 return user_stats def leaderboard_sort_key(item: tuple, reverse: bool = False) -> tuple: user_data, score = item if reverse: return -score, user_data["attempts"], user_data["account"].name.lower() return score, -user_data["attempts"], user_data["account"].name.lower() def build_leaderboard( session_reports: list, position_builder: Callable[[dict, Scores], T], order: OrderType | None = None, ) -> list[T]: user_stats = form_user_stats(session_reports) user_scores = [ (user_data, calculate_avg_scores(user_data["reports"]).final) for user_data in user_stats.values() ] is_descending = order is None or order == OrderType.DESCENDING sorted_users = sorted( user_scores, key=lambda item: leaderboard_sort_key(item, reverse=is_descending) ) leaderboard = [] for user_data, _ in sorted_users: avg_scores = calculate_avg_scores(user_data["reports"]) leaderboard.append(position_builder(user_data, avg_scores)) return leaderboard def build_leaderboard_simple( session_reports: list, pageSize: int | None = None, pageIndex: int | None = None, order: OrderType | None = None, ) -> list[LeaderboardStatisticsPosition]: def position_builder(user_data: dict, avg_scores: Scores) -> LeaderboardStatisticsPosition: return LeaderboardStatisticsPosition( account=IDNamePicture( id=user_data["account"].id, name=user_data["account"].name, pictureUrl=user_data["account"].pictureUrl, ), scores=avg_scores, ) leaderboard = build_leaderboard(session_reports, position_builder, order) return paginate_list(leaderboard, pageSize, pageIndex) def paginate_list( items: list[T], page_size: int | None = None, page_index: int | None = None ) -> list[T]: if page_size is None or page_index is None: return items return items[page_index * page_size : (page_index + 1) * page_size] def form_additional_scenario_filter(account: AccountModel, allow_demo: bool = False): from cbh.api.scenario.dto import AssigneesType, ScenarioStatus filter_ = {"owner.organization.id": account.organization.id} if account.accountType == AccountType.USER: filter_.update( { "$or": [ {"assignees": {"$size": 0}}, { "assignees": { "$elemMatch": { "type": AssigneesType.USER.value, "account.id": account.id, } } }, { "assignees": { "$elemMatch": { "type": AssigneesType.TEAM.value, "team.members": {"$elemMatch": {"id": account.id}}, } } }, ], "isTemplate": False, "status": ScenarioStatus.ACTIVE.value, } ) if not allow_demo or account.accountType != AccountType.USER: filter_.update( { "isDemo": False, } ) return filter_ async def convert_document_to_text(file: bytes, filename: str) -> str: if filename.endswith(".txt"): return file.decode("utf-8", errors="ignore") filename = re.sub(r"[^\w\s.-]", "", filename) base64_file = base64.b64encode(file).decode("utf-8") headers = {"Content-Type": "application/json"} data = { "apikey": settings.CONVERTIO_API_KEY, "input": "base64", "file": base64_file, "filename": filename, "outputformat": "txt", } async with httpx.AsyncClient(timeout=httpx.Timeout(timeout=120)) as client: response = await client.post("https://api.convertio.co/convert", json=data, headers=headers) response = response.json() if response["code"] == 200: conversion_id = response["data"]["id"] status = "" attempt = 0 while status != "finish": if attempt > 50: raise Exception("Please, try again") get_status_response = await client.get( f"https://api.convertio.co/convert/{conversion_id}/status" ) get_status_response = get_status_response.json() if get_status_response["code"] != 200: raise Exception("Please, try again") else: status = get_status_response["data"]["step"] await asyncio.sleep(1) attempt += 1 file_url = get_status_response["data"]["output"]["url"] response = await client.get(file_url) response.raise_for_status() return response.content.decode("utf-8", errors="ignore") else: return "" def calculate_skills_statistics(session_reports: list) -> SkillsStatisticsResponse: """ Calculate team skills statistics. """ if not session_reports: empty_skill = SkillStatistics(score=0, bestAccount=None) return SkillsStatisticsResponse( communication=empty_skill, activeListening=empty_skill, conversation=empty_skill, objection=empty_skill, empathy=empty_skill, final=0, ) avg_scores = calculate_avg_scores(session_reports) skills = ["communication", "activeListening", "conversation", "objection", "empathy"] skill_stats = {} for skill in skills: best_report = sorted( session_reports, key=lambda r, s=skill: (getattr(r.scores, s), r.datetimeInserted.timestamp()), reverse=True, )[0] skill_stats[skill] = SkillStatistics( score=getattr(avg_scores, skill), bestAccount=IDNamePicture( id=best_report.account.id, name=best_report.account.name, pictureUrl=best_report.account.pictureUrl, ), ) return SkillsStatisticsResponse( communication=skill_stats["communication"], activeListening=skill_stats["activeListening"], conversation=skill_stats["conversation"], objection=skill_stats["objection"], empathy=skill_stats["empathy"], final=avg_scores.final, )