File size: 2,914 Bytes
cd46ce5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | """
Common utilities.
"""
import asyncio
import base64
import re
import httpx
from cbh.api.account.dto import AccountType
from cbh.api.account.models import AccountModel
from cbh.api.scenario.dto import AssigneesType
from cbh.core.config import settings
def form_additional_scenario_filter(account: AccountModel):
filter_ = {"owner.organization.id": account.organization.id}
if account.accountType == AccountType.USER:
filter_.update(
{
"$or": [
{"assignees": {"$size": 0}},
{
"assignees": {
"$elemMatch": {
"type": AssigneesType.USER.value,
"account.id": account.id,
}
}
},
{
"assignees": {
"$elemMatch": {
"type": AssigneesType.TEAM.value,
"team.members": {"$elemMatch": {"id": account.id}},
}
}
},
],
}
)
return filter_
async def convert_document_to_text(file: bytes, filename: str) -> str:
filename = re.sub(r"[^\w\s.-]", "", filename)
base64_file = base64.b64encode(file).decode("utf-8")
headers = {"Content-Type": "application/json"}
data = {
"apikey": settings.CONVERTIO_API_KEY,
"input": "base64",
"file": base64_file,
"filename": filename,
"outputformat": "txt",
}
async with httpx.AsyncClient(timeout=httpx.Timeout(timeout=120)) as client:
response = await client.post(
"https://api.convertio.co/convert", json=data, headers=headers
)
response = response.json()
if response["code"] == 200:
conversion_id = response["data"]["id"]
status = ""
attempt = 0
while status != "finish":
if attempt > 50:
raise Exception("Please, try again")
get_status_response = await client.get(
f"https://api.convertio.co/convert/{conversion_id}/status"
)
get_status_response = get_status_response.json()
if get_status_response["code"] != 200:
raise Exception("Please, try again")
else:
status = get_status_response["data"]["step"]
await asyncio.sleep(1)
attempt += 1
file_url = get_status_response["data"]["output"]["url"]
response = await client.get(file_url)
response.raise_for_status()
return response.content.decode("utf-8", errors="ignore")
else:
return ""
|