from fastapi import FastAPI, Request , BackgroundTasks from supabase import create_client, Client import json import uvicorn from typing import Dict, List, Optional import os from openai import Client # Initialize FastAPI app app = FastAPI() # Initialize Supabase client supabase: Client = create_client( supabase_url=os.getenv("SUPABASE_URL"), supabase_key=os.getenv("SUPABASE_KEY") ) # Initialize your open client client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID')) @app.post("/batch_processing_result") async def batch_processing_result(request: Request, background_tasks: BackgroundTasks): body = await request.json() batch_id = body.get('batch_job_id') print(batch_id) # Add the processing task to background tasks batch_job = client.batches.retrieve(batch_id) if batch_job.status == 'completed': background_tasks.add_task(process_batch_data, batch_id) insert_response = (supabase.table("batch_processing_details") .update({ "batch_job_status": True, }) .eq('batch_job_id',batch_id) .execute() ) return {"batch_job_status":'completed'} return {'batch_job_status':'completed'} async def process_batch_data(batch_id: str): try: batch_job = client.batches.retrieve(batch_id) if batch_job.status == 'completed': result_file_id = batch_job.output_file_id result = client.files.content(result_file_id).content json_str = result.decode('utf-8') json_lines = json_str.splitlines() res = [] for line in json_lines: if line.strip(): try: json_dict = json.loads(line) res.append(json_dict) except json.JSONDecodeError as e: print(f"Error decoding JSON on line: {line}\nError: {e}") supa_data = [] for resp in res: id = resp.get('custom_id').split('*') message_id = id[0] user_id = id[1] email = id[2] output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content')) output['message_id'] = message_id output['user_id'] = user_id output['email'] = email supa_data.append(output) # update_status_response = ( # supabase.table('receipt_ocr_data') # .update({'status':'processed'}) # .eq('email',email) # .eq('user_id',user_id) # .eq('message_id',message_id) # .execute() # ) print("Printing the the data") print(supa_data) insert_response = ( supabase.table("receipt_radar_structured_data_duplicate") .insert(supa_data) .execute() ) print("Completed processing") except Exception as e: print(f"Error in background processing: {str(e)}") # You might want to log this error or handle it in some way