backoffice / main.py
Jhon Alexander Alvarez Casas
update logic and new method for auth_cognito
02109ec
import os
from typing import Annotated
from fastapi import FastAPI, File, UploadFile, Form
from ttp import ttp
from app.core.cognito import Cognito
from app.core.s3 import S3
from app.core.supabase import supabase_client
from app.core.template import Template
app = FastAPI()
@app.post("/data")
def data(
bucket: str,
project: str,
local_date: str,
env_file: UploadFile = File(...)
):
server_folder = f"{project}/{local_date}"
local_folder = f"data/{server_folder}"
s3 = S3.client(file=env_file)
response = s3.list_objects_v2(Bucket=bucket, Prefix=server_folder)
if not os.path.exists(local_folder):
os.makedirs(local_folder)
with open(f"{local_folder}/data.txt", "ab") as f:
for obj in response.get('Contents', []):
s3.download_fileobj(bucket, obj['Key'], f)
return {"message": "success"}
@app.get("/data")
def data(
project: str,
local_date: str,
process: str
):
local_folder = f"data/{project}/{local_date}"
template = Template.read(process)
values = {"project": project, "local_date": local_date, "process": process}
template = template.format_map(values)
parser = ttp(data=f"{local_folder}/data.txt", template=template)
parser.parse()
result = parser.result()[0][0]
for i in range(0, len(result), 7777):
try:
lote = result[i:i + 7777]
response = supabase_client().table("log_back_office").upsert(lote, ignore_duplicates=True).execute()
print(f"subiendo {len(response.data)}/{len(result)} desde {i}")
pass
except Exception as e:
print(f"error desde {i}: {e}")
return result
@app.post("/auth")
def auth(
username: Annotated[str, Form()],
password: Annotated[str, Form()],
env_file: UploadFile = File(...)
):
cognito = Cognito.client(file=env_file)
response = Cognito.auth(
client=cognito,
username=username,
password=password
)
return response