import os from typing import Annotated from fastapi import FastAPI, File, UploadFile, Form from ttp import ttp from app.core.cognito import Cognito from app.core.s3 import S3 from app.core.supabase import supabase_client from app.core.template import Template app = FastAPI() @app.post("/data") def data( bucket: str, project: str, local_date: str, env_file: UploadFile = File(...) ): server_folder = f"{project}/{local_date}" local_folder = f"data/{server_folder}" s3 = S3.client(file=env_file) response = s3.list_objects_v2(Bucket=bucket, Prefix=server_folder) if not os.path.exists(local_folder): os.makedirs(local_folder) with open(f"{local_folder}/data.txt", "ab") as f: for obj in response.get('Contents', []): s3.download_fileobj(bucket, obj['Key'], f) return {"message": "success"} @app.get("/data") def data( project: str, local_date: str, process: str ): local_folder = f"data/{project}/{local_date}" template = Template.read(process) values = {"project": project, "local_date": local_date, "process": process} template = template.format_map(values) parser = ttp(data=f"{local_folder}/data.txt", template=template) parser.parse() result = parser.result()[0][0] for i in range(0, len(result), 7777): try: lote = result[i:i + 7777] response = supabase_client().table("log_back_office").upsert(lote, ignore_duplicates=True).execute() print(f"subiendo {len(response.data)}/{len(result)} desde {i}") pass except Exception as e: print(f"error desde {i}: {e}") return result @app.post("/auth") def auth( username: Annotated[str, Form()], password: Annotated[str, Form()], env_file: UploadFile = File(...) ): cognito = Cognito.client(file=env_file) response = Cognito.auth( client=cognito, username=username, password=password ) return response