Spaces:
Sleeping
Sleeping
File size: 3,021 Bytes
9d39207 27de5c0 8c8fd65 27de5c0 77c54c0 27de5c0 a6cf08a 27de5c0 a50e7bc 124eddf 9f1ca0f a0ab59b 9f1ca0f 124eddf 9f51289 27de5c0 124eddf 27de5c0 9a1ec4d 124eddf 27de5c0 65deffd 124eddf 27de5c0 507f1f1 6e60af6 27de5c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
from fastapi import FastAPI, Request , BackgroundTasks
from supabase import create_client, Client
import json
import uvicorn
from typing import Dict, List, Optional
import os
from openai import Client
# Initialize FastAPI app
app = FastAPI()
# Initialize Supabase client
supabase: Client = create_client(
supabase_url=os.getenv("SUPABASE_URL"),
supabase_key=os.getenv("SUPABASE_KEY")
)
# Initialize your open client
client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID'))
@app.post("/batch_processing_result")
async def batch_processing_result(request: Request, background_tasks: BackgroundTasks):
body = await request.json()
batch_id = body.get('batch_job_id')
print(batch_id)
# Add the processing task to background tasks
batch_job = client.batches.retrieve(batch_id)
if batch_job.status == 'completed':
background_tasks.add_task(process_batch_data, batch_id)
insert_response = (supabase.table("batch_processing_details")
.update({
"batch_job_status": True,
})
.eq('batch_job_id',batch_id)
.execute()
)
return {"batch_job_status":'completed'}
return {'batch_job_status':'completed'}
async def process_batch_data(batch_id: str):
try:
batch_job = client.batches.retrieve(batch_id)
if batch_job.status == 'completed':
result_file_id = batch_job.output_file_id
result = client.files.content(result_file_id).content
json_str = result.decode('utf-8')
json_lines = json_str.splitlines()
res = []
for line in json_lines:
if line.strip():
try:
json_dict = json.loads(line)
res.append(json_dict)
except json.JSONDecodeError as e:
print(f"Error decoding JSON on line: {line}\nError: {e}")
supa_data = []
for resp in res:
id = resp.get('custom_id').split('*')
message_id = id[0]
user_id = id[1]
email = id[2]
output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content'))
output['message_id'] = message_id
output['user_id'] = user_id
output['email'] = email
supa_data.append(output)
print("Printing the the data")
print(supa_data)
insert_response = (
supabase.table("receipt_radar_structured_data_duplicate")
.insert(supa_data)
.execute()
)
print("Completed processing")
except Exception as e:
print(f"Error in background processing: {str(e)}")
# You might want to log this error or handle it in some way
|