spark / cbh /api /common /utils.py
brestok's picture
init
cd46ce5
"""
Common utilities.
"""
import asyncio
import base64
import re
import httpx
from cbh.api.account.dto import AccountType
from cbh.api.account.models import AccountModel
from cbh.api.scenario.dto import AssigneesType
from cbh.core.config import settings
def form_additional_scenario_filter(account: AccountModel):
filter_ = {"owner.organization.id": account.organization.id}
if account.accountType == AccountType.USER:
filter_.update(
{
"$or": [
{"assignees": {"$size": 0}},
{
"assignees": {
"$elemMatch": {
"type": AssigneesType.USER.value,
"account.id": account.id,
}
}
},
{
"assignees": {
"$elemMatch": {
"type": AssigneesType.TEAM.value,
"team.members": {"$elemMatch": {"id": account.id}},
}
}
},
],
}
)
return filter_
async def convert_document_to_text(file: bytes, filename: str) -> str:
filename = re.sub(r"[^\w\s.-]", "", filename)
base64_file = base64.b64encode(file).decode("utf-8")
headers = {"Content-Type": "application/json"}
data = {
"apikey": settings.CONVERTIO_API_KEY,
"input": "base64",
"file": base64_file,
"filename": filename,
"outputformat": "txt",
}
async with httpx.AsyncClient(timeout=httpx.Timeout(timeout=120)) as client:
response = await client.post(
"https://api.convertio.co/convert", json=data, headers=headers
)
response = response.json()
if response["code"] == 200:
conversion_id = response["data"]["id"]
status = ""
attempt = 0
while status != "finish":
if attempt > 50:
raise Exception("Please, try again")
get_status_response = await client.get(
f"https://api.convertio.co/convert/{conversion_id}/status"
)
get_status_response = get_status_response.json()
if get_status_response["code"] != 200:
raise Exception("Please, try again")
else:
status = get_status_response["data"]["step"]
await asyncio.sleep(1)
attempt += 1
file_url = get_status_response["data"]["output"]["url"]
response = await client.get(file_url)
response.raise_for_status()
return response.content.decode("utf-8", errors="ignore")
else:
return ""