Spaces:
Sleeping
Sleeping
File size: 3,332 Bytes
9d39207 27de5c0 8c8fd65 27de5c0 77c54c0 27de5c0 a6cf08a 27de5c0 a50e7bc 124eddf 9f1ca0f a0ab59b 9f1ca0f 124eddf 9f51289 27de5c0 124eddf 27de5c0 9a1ec4d 124eddf 27de5c0 65deffd 124eddf 2f4763f 124eddf 2f4763f 507f1f1 6e60af6 27de5c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
from fastapi import FastAPI, Request , BackgroundTasks
from supabase import create_client, Client
import json
import uvicorn
from typing import Dict, List, Optional
import os
from openai import Client
# Initialize FastAPI app
app = FastAPI()
# Initialize Supabase client
supabase: Client = create_client(
supabase_url=os.getenv("SUPABASE_URL"),
supabase_key=os.getenv("SUPABASE_KEY")
)
# Initialize your open client
client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID'))
@app.post("/batch_processing_result")
async def batch_processing_result(request: Request, background_tasks: BackgroundTasks):
body = await request.json()
batch_id = body.get('batch_job_id')
print(batch_id)
# Add the processing task to background tasks
batch_job = client.batches.retrieve(batch_id)
if batch_job.status == 'completed':
background_tasks.add_task(process_batch_data, batch_id)
insert_response = (supabase.table("batch_processing_details")
.update({
"batch_job_status": True,
})
.eq('batch_job_id',batch_id)
.execute()
)
return {"batch_job_status":'completed'}
return {'batch_job_status':'completed'}
async def process_batch_data(batch_id: str):
try:
batch_job = client.batches.retrieve(batch_id)
if batch_job.status == 'completed':
result_file_id = batch_job.output_file_id
result = client.files.content(result_file_id).content
json_str = result.decode('utf-8')
json_lines = json_str.splitlines()
res = []
for line in json_lines:
if line.strip():
try:
json_dict = json.loads(line)
res.append(json_dict)
except json.JSONDecodeError as e:
print(f"Error decoding JSON on line: {line}\nError: {e}")
supa_data = []
for resp in res:
id = resp.get('custom_id').split('*')
message_id = id[0]
user_id = id[1]
email = id[2]
output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content'))
output['message_id'] = message_id
output['user_id'] = user_id
output['email'] = email
supa_data.append(output)
# update_status_response = (
# supabase.table('receipt_ocr_data')
# .update({'status':'processed'})
# .eq('email',email)
# .eq('user_id',user_id)
# .eq('message_id',message_id)
# .execute()
# )
print("Printing the the data")
print(supa_data)
insert_response = (
supabase.table("receipt_radar_structured_data_duplicate")
.insert(supa_data)
.execute()
)
print("Completed processing")
except Exception as e:
print(f"Error in background processing: {str(e)}")
# You might want to log this error or handle it in some way
|