| """ | |
| Common utilities. | |
| """ | |
| import asyncio | |
| import base64 | |
| import re | |
| import httpx | |
| from cbh.api.account.dto import AccountType | |
| from cbh.api.account.models import AccountModel | |
| from cbh.api.scenario.dto import AssigneesType | |
| from cbh.core.config import settings | |
| def form_additional_scenario_filter(account: AccountModel): | |
| filter_ = {"owner.organization.id": account.organization.id} | |
| if account.accountType == AccountType.USER: | |
| filter_.update( | |
| { | |
| "$or": [ | |
| {"assignees": {"$size": 0}}, | |
| { | |
| "assignees": { | |
| "$elemMatch": { | |
| "type": AssigneesType.USER.value, | |
| "account.id": account.id, | |
| } | |
| } | |
| }, | |
| { | |
| "assignees": { | |
| "$elemMatch": { | |
| "type": AssigneesType.TEAM.value, | |
| "team.members": {"$elemMatch": {"id": account.id}}, | |
| } | |
| } | |
| }, | |
| ], | |
| } | |
| ) | |
| return filter_ | |
| async def convert_document_to_text(file: bytes, filename: str) -> str: | |
| filename = re.sub(r"[^\w\s.-]", "", filename) | |
| base64_file = base64.b64encode(file).decode("utf-8") | |
| headers = {"Content-Type": "application/json"} | |
| data = { | |
| "apikey": settings.CONVERTIO_API_KEY, | |
| "input": "base64", | |
| "file": base64_file, | |
| "filename": filename, | |
| "outputformat": "txt", | |
| } | |
| async with httpx.AsyncClient(timeout=httpx.Timeout(timeout=120)) as client: | |
| response = await client.post( | |
| "https://api.convertio.co/convert", json=data, headers=headers | |
| ) | |
| response = response.json() | |
| if response["code"] == 200: | |
| conversion_id = response["data"]["id"] | |
| status = "" | |
| attempt = 0 | |
| while status != "finish": | |
| if attempt > 50: | |
| raise Exception("Please, try again") | |
| get_status_response = await client.get( | |
| f"https://api.convertio.co/convert/{conversion_id}/status" | |
| ) | |
| get_status_response = get_status_response.json() | |
| if get_status_response["code"] != 200: | |
| raise Exception("Please, try again") | |
| else: | |
| status = get_status_response["data"]["step"] | |
| await asyncio.sleep(1) | |
| attempt += 1 | |
| file_url = get_status_response["data"]["output"]["url"] | |
| response = await client.get(file_url) | |
| response.raise_for_status() | |
| return response.content.decode("utf-8", errors="ignore") | |
| else: | |
| return "" | |