File size: 3,332 Bytes
9d39207
27de5c0
 
 
 
 
8c8fd65
27de5c0
 
 
 
 
 
 
 
 
 
 
77c54c0
27de5c0
 
 
a6cf08a
27de5c0
a50e7bc
124eddf
 
 
9f1ca0f
 
 
a0ab59b
9f1ca0f
 
124eddf
 
 
9f51289
27de5c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124eddf
 
27de5c0
9a1ec4d
124eddf
 
 
27de5c0
65deffd
 
 
124eddf
2f4763f
 
 
 
 
 
 
 
124eddf
 
 
2f4763f
507f1f1
 
 
 
 
6e60af6
27de5c0
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from fastapi import FastAPI, Request , BackgroundTasks
from supabase import create_client, Client
import json
import uvicorn
from typing import Dict, List, Optional
import os
from openai import Client
# Initialize FastAPI app
app = FastAPI()

# Initialize Supabase client
supabase: Client = create_client(
    supabase_url=os.getenv("SUPABASE_URL"),
    supabase_key=os.getenv("SUPABASE_KEY")
)
# Initialize your open client 
client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID'))

@app.post("/batch_processing_result")
async def batch_processing_result(request: Request, background_tasks: BackgroundTasks):
    body = await request.json()
    batch_id = body.get('batch_job_id')
    print(batch_id)
    # Add the processing task to background tasks
    batch_job = client.batches.retrieve(batch_id)
    if batch_job.status == 'completed':
        background_tasks.add_task(process_batch_data, batch_id)
        insert_response = (supabase.table("batch_processing_details")
                    .update({
                        "batch_job_status": True, 
                    })
                    .eq('batch_job_id',batch_id)
                    .execute()
                )
        return {"batch_job_status":'completed'} 
    

    return {'batch_job_status':'completed'}


async def process_batch_data(batch_id: str):
    try:
        batch_job = client.batches.retrieve(batch_id)
        if batch_job.status == 'completed':
            result_file_id = batch_job.output_file_id
            result = client.files.content(result_file_id).content
            json_str = result.decode('utf-8')
            json_lines = json_str.splitlines()
            
            res = []
            for line in json_lines:
                if line.strip():
                    try:
                        json_dict = json.loads(line)
                        res.append(json_dict)
                    except json.JSONDecodeError as e:
                        print(f"Error decoding JSON on line: {line}\nError: {e}")

            supa_data = []
            for resp in res:
                id = resp.get('custom_id').split('*')
                message_id = id[0]
                user_id = id[1]
                email = id[2]
                output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content'))
                output['message_id'] = message_id
                output['user_id'] = user_id
                output['email'] = email
                supa_data.append(output)
                # update_status_response = (
                # supabase.table('receipt_ocr_data')
                # .update({'status':'processed'})
                # .eq('email',email)
                # .eq('user_id',user_id)
                # .eq('message_id',message_id)   
                # .execute()
                # )  

            print("Printing the the data")
            print(supa_data)

            insert_response = (
                    supabase.table("receipt_radar_structured_data_duplicate")
                    .insert(supa_data)
                    .execute()
                )
            print("Completed processing")
    except Exception as e:
        print(f"Error in background processing: {str(e)}")
        # You might want to log this error or handle it in some way