Omkar008 commited on
Commit
27de5c0
·
verified ·
1 Parent(s): 1084a65

Create main.py

Browse files
Files changed (1) hide show
  1. main.py +84 -0
main.py ADDED
@@ -0,0 +1,84 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, Request
2
+ from supabase import create_client, Client
3
+ import json
4
+ import uvicorn
5
+ from typing import Dict, List, Optional
6
+ import os
7
+ from dotenv import load_load_dotenv
8
+
9
+ # Load environment variables
10
+ load_dotenv()
11
+
12
+ # Initialize FastAPI app
13
+ app = FastAPI()
14
+
15
+ # Initialize Supabase client
16
+ supabase: Client = create_client(
17
+ supabase_url=os.getenv("SUPABASE_URL"),
18
+ supabase_key=os.getenv("SUPABASE_KEY")
19
+ )
20
+
21
+ # Initialize your open client
22
+ client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID'))
23
+
24
+ @app.post("/test/batch_processing_result")
25
+ async def batch_processing_result(request: Request, background_tasks: BackgroundTasks):
26
+ body = await request.json()
27
+ batch_id = body.get('batch_job_id')
28
+ batch_job = client.batches.retrieve(batch_id)
29
+ # while batch_job.status == 'in_progress':
30
+ batch_job = client.batches.retrieve(batch_id)
31
+ print(batch_job.status)
32
+ # Add the processing task to background tasks
33
+ if batch_job.status == 'completed':
34
+ background_tasks.add_task(process_batch_data, batch_id)
35
+ return {"batch_job_status":'completed'}
36
+
37
+ # Immediately return success response
38
+ return {'batch_job_status':'notcompleted'}
39
+
40
+
41
+ async def process_batch_data(batch_id: str):
42
+ try:
43
+ batch_job = client.batches.retrieve(batch_id)
44
+ if batch_job.status == 'completed':
45
+ result_file_id = batch_job.output_file_id
46
+ result = client.files.content(result_file_id).content
47
+ json_str = result.decode('utf-8')
48
+ json_lines = json_str.splitlines()
49
+
50
+ res = []
51
+ for line in json_lines:
52
+ if line.strip():
53
+ try:
54
+ json_dict = json.loads(line)
55
+ res.append(json_dict)
56
+ except json.JSONDecodeError as e:
57
+ print(f"Error decoding JSON on line: {line}\nError: {e}")
58
+
59
+ for resp in res:
60
+ id = resp.get('custom_id')
61
+ res_id = id.split('-')[1]
62
+ output = json.loads(resp.get('response').get('body').get('choices')[0].get('message').get('content'))
63
+
64
+ categories = str(output.get('categories'))
65
+ summary = str(output.get('summary'))
66
+
67
+ supabase_resp = supabase.table("imdb_dataset").select("Description").eq("imdb_id", res_id).execute()
68
+ description = supabase_resp.data[0].get('Description')
69
+
70
+ insert_response = (
71
+ supabase.table("imdb_outputs")
72
+ .insert({
73
+ "id": res_id,
74
+ "description": description,
75
+ 'categories': categories,
76
+ 'summary': summary
77
+ })
78
+ .execute()
79
+ )
80
+ print(f"Inserted data for ID: {res_id}")
81
+
82
+ except Exception as e:
83
+ print(f"Error in background processing: {str(e)}")
84
+ # You might want to log this error or handle it in some way