File size: 3,021 Bytes
9d39207
27de5c0
 
 
 
 
8c8fd65
27de5c0
 
 
 
 
 
 
 
 
 
 
77c54c0
27de5c0
 
 
a6cf08a
27de5c0
a50e7bc
124eddf
 
 
9f1ca0f
 
 
a0ab59b
9f1ca0f
 
124eddf
 
 
9f51289
27de5c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124eddf
 
27de5c0
9a1ec4d
124eddf
 
 
27de5c0
65deffd
 
 
124eddf
 
 
 
27de5c0
507f1f1
 
 
 
 
6e60af6
27de5c0
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from fastapi import FastAPI, Request , BackgroundTasks
from supabase import create_client, Client
import json
import uvicorn
from typing import Dict, List, Optional
import os
from openai import Client
# Initialize FastAPI app
app = FastAPI()

# Initialize Supabase client
supabase: Client = create_client(
    supabase_url=os.getenv("SUPABASE_URL"),
    supabase_key=os.getenv("SUPABASE_KEY")
)
# Initialize your open client 
client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID'))

@app.post("/batch_processing_result")
async def batch_processing_result(request: Request, background_tasks: BackgroundTasks):
    body = await request.json()
    batch_id = body.get('batch_job_id')
    print(batch_id)
    # Add the processing task to background tasks
    batch_job = client.batches.retrieve(batch_id)
    if batch_job.status == 'completed':
        background_tasks.add_task(process_batch_data, batch_id)
        insert_response = (supabase.table("batch_processing_details")
                    .update({
                        "batch_job_status": True, 
                    })
                    .eq('batch_job_id',batch_id)
                    .execute()
                )
        return {"batch_job_status":'completed'} 
    

    return {'batch_job_status':'completed'}


async def process_batch_data(batch_id: str):
    try:
        batch_job = client.batches.retrieve(batch_id)
        if batch_job.status == 'completed':
            result_file_id = batch_job.output_file_id
            result = client.files.content(result_file_id).content
            json_str = result.decode('utf-8')
            json_lines = json_str.splitlines()
            
            res = []
            for line in json_lines:
                if line.strip():
                    try:
                        json_dict = json.loads(line)
                        res.append(json_dict)
                    except json.JSONDecodeError as e:
                        print(f"Error decoding JSON on line: {line}\nError: {e}")

            supa_data = []
            for resp in res:
                id = resp.get('custom_id').split('*')
                message_id = id[0]
                user_id = id[1]
                email = id[2]
                output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content'))
                output['message_id'] = message_id
                output['user_id'] = user_id
                output['email'] = email
                supa_data.append(output)

            print("Printing the the data")
            print(supa_data)
                
            insert_response = (
                    supabase.table("receipt_radar_structured_data_duplicate")
                    .insert(supa_data)
                    .execute()
                )
            print("Completed processing")
    except Exception as e:
        print(f"Error in background processing: {str(e)}")
        # You might want to log this error or handle it in some way