| """ |
| Common utilities. |
| """ |
|
|
| import asyncio |
| import base64 |
| import re |
| from typing import Callable, TypeVar |
|
|
| import httpx |
|
|
| from cbh.api.account.dto import AccountType |
| from cbh.api.account.models import AccountModel |
| from cbh.api.common.dto import ( |
| LeaderboardStatisticsPosition, |
| Scores, |
| OrderType, |
| SkillStatistics, |
| IDNamePicture, |
| ) |
| from cbh.api.common.schemas import SkillsStatisticsResponse |
| from cbh.core.config import settings |
|
|
| T = TypeVar("T") |
|
|
|
|
| def calculate_avg_scores(reports: list) -> Scores: |
| if not reports: |
| return Scores( |
| communication=0, |
| activeListening=0, |
| conversation=0, |
| objection=0, |
| empathy=0, |
| final=0, |
| ) |
| return Scores( |
| communication=round(sum(report.scores.communication for report in reports) / len(reports)), |
| activeListening=round( |
| sum(report.scores.activeListening for report in reports) / len(reports) |
| ), |
| conversation=round(sum(report.scores.conversation for report in reports) / len(reports)), |
| objection=round(sum(report.scores.objection for report in reports) / len(reports)), |
| empathy=round(sum(report.scores.empathy for report in reports) / len(reports)), |
| final=round(sum(report.scores.final for report in reports) / len(reports)), |
| ) |
|
|
|
|
| def form_user_stats(session_reports: list) -> dict[str, dict]: |
| user_stats = {} |
| for report in session_reports: |
| user_id = report.account.id |
| if user_id not in user_stats: |
| user_stats[user_id] = { |
| "account": report.account, |
| "reports": [], |
| "attempts": 0, |
| } |
| user_stats[user_id]["reports"].append(report) |
| user_stats[user_id]["attempts"] += 1 |
| return user_stats |
|
|
|
|
| def leaderboard_sort_key(item: tuple, reverse: bool = False) -> tuple: |
| user_data, score = item |
| if reverse: |
| return -score, user_data["attempts"], user_data["account"].name.lower() |
|
|
| return score, -user_data["attempts"], user_data["account"].name.lower() |
|
|
|
|
| def build_leaderboard( |
| session_reports: list, |
| position_builder: Callable[[dict, Scores], T], |
| order: OrderType | None = None, |
| ) -> list[T]: |
| user_stats = form_user_stats(session_reports) |
| user_scores = [ |
| (user_data, calculate_avg_scores(user_data["reports"]).final) |
| for user_data in user_stats.values() |
| ] |
|
|
| is_descending = order is None or order == OrderType.DESCENDING |
| sorted_users = sorted( |
| user_scores, key=lambda item: leaderboard_sort_key(item, reverse=is_descending) |
| ) |
|
|
| leaderboard = [] |
| for user_data, _ in sorted_users: |
| avg_scores = calculate_avg_scores(user_data["reports"]) |
| leaderboard.append(position_builder(user_data, avg_scores)) |
|
|
| return leaderboard |
|
|
|
|
| def build_leaderboard_simple( |
| session_reports: list, |
| pageSize: int | None = None, |
| pageIndex: int | None = None, |
| order: OrderType | None = None, |
| ) -> list[LeaderboardStatisticsPosition]: |
| def position_builder(user_data: dict, avg_scores: Scores) -> LeaderboardStatisticsPosition: |
| return LeaderboardStatisticsPosition( |
| account=IDNamePicture( |
| id=user_data["account"].id, |
| name=user_data["account"].name, |
| pictureUrl=user_data["account"].pictureUrl, |
| ), |
| scores=avg_scores, |
| ) |
|
|
| leaderboard = build_leaderboard(session_reports, position_builder, order) |
| return paginate_list(leaderboard, pageSize, pageIndex) |
|
|
|
|
| def paginate_list( |
| items: list[T], page_size: int | None = None, page_index: int | None = None |
| ) -> list[T]: |
| if page_size is None or page_index is None: |
| return items |
| return items[page_index * page_size : (page_index + 1) * page_size] |
|
|
|
|
| def form_additional_scenario_filter(account: AccountModel, allow_demo: bool = False): |
| from cbh.api.scenario.dto import AssigneesType, ScenarioStatus |
|
|
| filter_ = {"owner.organization.id": account.organization.id} |
| if account.accountType == AccountType.USER: |
| filter_.update( |
| { |
| "$or": [ |
| {"assignees": {"$size": 0}}, |
| { |
| "assignees": { |
| "$elemMatch": { |
| "type": AssigneesType.USER.value, |
| "account.id": account.id, |
| } |
| } |
| }, |
| { |
| "assignees": { |
| "$elemMatch": { |
| "type": AssigneesType.TEAM.value, |
| "team.members": {"$elemMatch": {"id": account.id}}, |
| } |
| } |
| }, |
| ], |
| "isTemplate": False, |
| "status": ScenarioStatus.ACTIVE.value, |
| } |
| ) |
| if not allow_demo or account.accountType != AccountType.USER: |
| filter_.update( |
| { |
| "isDemo": False, |
| } |
| ) |
| return filter_ |
|
|
|
|
| async def convert_document_to_text(file: bytes, filename: str) -> str: |
| if filename.endswith(".txt"): |
| return file.decode("utf-8", errors="ignore") |
| filename = re.sub(r"[^\w\s.-]", "", filename) |
| base64_file = base64.b64encode(file).decode("utf-8") |
| headers = {"Content-Type": "application/json"} |
| data = { |
| "apikey": settings.CONVERTIO_API_KEY, |
| "input": "base64", |
| "file": base64_file, |
| "filename": filename, |
| "outputformat": "txt", |
| } |
| async with httpx.AsyncClient(timeout=httpx.Timeout(timeout=120)) as client: |
| response = await client.post("https://api.convertio.co/convert", json=data, headers=headers) |
| response = response.json() |
| if response["code"] == 200: |
| conversion_id = response["data"]["id"] |
| status = "" |
| attempt = 0 |
| while status != "finish": |
| if attempt > 50: |
| raise Exception("Please, try again") |
| get_status_response = await client.get( |
| f"https://api.convertio.co/convert/{conversion_id}/status" |
| ) |
| get_status_response = get_status_response.json() |
| if get_status_response["code"] != 200: |
| raise Exception("Please, try again") |
| else: |
| status = get_status_response["data"]["step"] |
| await asyncio.sleep(1) |
| attempt += 1 |
| file_url = get_status_response["data"]["output"]["url"] |
| response = await client.get(file_url) |
| response.raise_for_status() |
| return response.content.decode("utf-8", errors="ignore") |
| else: |
| return "" |
|
|
|
|
| def calculate_skills_statistics(session_reports: list) -> SkillsStatisticsResponse: |
| """ |
| Calculate team skills statistics. |
| """ |
| if not session_reports: |
| empty_skill = SkillStatistics(score=0, bestAccount=None) |
| return SkillsStatisticsResponse( |
| communication=empty_skill, |
| activeListening=empty_skill, |
| conversation=empty_skill, |
| objection=empty_skill, |
| empathy=empty_skill, |
| final=0, |
| ) |
| avg_scores = calculate_avg_scores(session_reports) |
| skills = ["communication", "activeListening", "conversation", "objection", "empathy"] |
|
|
| skill_stats = {} |
| for skill in skills: |
| best_report = sorted( |
| session_reports, |
| key=lambda r, s=skill: (getattr(r.scores, s), r.datetimeInserted.timestamp()), |
| reverse=True, |
| )[0] |
| skill_stats[skill] = SkillStatistics( |
| score=getattr(avg_scores, skill), |
| bestAccount=IDNamePicture( |
| id=best_report.account.id, |
| name=best_report.account.name, |
| pictureUrl=best_report.account.pictureUrl, |
| ), |
| ) |
|
|
| return SkillsStatisticsResponse( |
| communication=skill_stats["communication"], |
| activeListening=skill_stats["activeListening"], |
| conversation=skill_stats["conversation"], |
| objection=skill_stats["objection"], |
| empathy=skill_stats["empathy"], |
| final=avg_scores.final, |
| ) |
|
|