""" Common utilities. """ import asyncio import base64 import re import httpx from cbh.api.account.dto import AccountType from cbh.api.account.models import AccountModel from cbh.api.scenario.dto import AssigneesType from cbh.core.config import settings def form_additional_scenario_filter(account: AccountModel): filter_ = {"owner.organization.id": account.organization.id} if account.accountType == AccountType.USER: filter_.update( { "$or": [ {"assignees": {"$size": 0}}, { "assignees": { "$elemMatch": { "type": AssigneesType.USER.value, "account.id": account.id, } } }, { "assignees": { "$elemMatch": { "type": AssigneesType.TEAM.value, "team.members": {"$elemMatch": {"id": account.id}}, } } }, ], } ) return filter_ async def convert_document_to_text(file: bytes, filename: str) -> str: filename = re.sub(r"[^\w\s.-]", "", filename) base64_file = base64.b64encode(file).decode("utf-8") headers = {"Content-Type": "application/json"} data = { "apikey": settings.CONVERTIO_API_KEY, "input": "base64", "file": base64_file, "filename": filename, "outputformat": "txt", } async with httpx.AsyncClient(timeout=httpx.Timeout(timeout=120)) as client: response = await client.post( "https://api.convertio.co/convert", json=data, headers=headers ) response = response.json() if response["code"] == 200: conversion_id = response["data"]["id"] status = "" attempt = 0 while status != "finish": if attempt > 50: raise Exception("Please, try again") get_status_response = await client.get( f"https://api.convertio.co/convert/{conversion_id}/status" ) get_status_response = get_status_response.json() if get_status_response["code"] != 200: raise Exception("Please, try again") else: status = get_status_response["data"]["step"] await asyncio.sleep(1) attempt += 1 file_url = get_status_response["data"]["output"]["url"] response = await client.get(file_url) response.raise_for_status() return response.content.decode("utf-8", errors="ignore") else: return ""