Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| load_dotenv() | |
| class BatchProcessor: | |
| def __init__(self): | |
| self.client = OpenAI( | |
| api_key=os.getenv("OPENAI_API_KEY"), | |
| organization=os.getenv("OPENAI_ORG_ID") | |
| ) | |
| def prepare_batch(self, folder_path, output_file): | |
| """Prepare a batch input file from a folder of text files.""" | |
| with open(output_file, "w") as out_file: | |
| for filename in os.listdir(folder_path): | |
| if filename.endswith(".txt"): | |
| file_path = os.path.join(folder_path, filename) | |
| with open(file_path, "r") as file: | |
| text = file.read() | |
| batch_entry = { | |
| "custom_id": filename, | |
| "method": "POST", | |
| "url": "/v1/chat/completions", | |
| "body": { | |
| "model": "gpt-4o-mini", | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "You are a helpful assistant designed to check if there's any racial content. " | |
| "Please review this document for any racial or discriminatory expressions. " | |
| "If yes, return 'Yes'; if there's none, please return 'No racial content found'. " | |
| "If there is any doubt or ambiguity, assume the text contains racial content and respond 'Yes'." | |
| ) | |
| }, | |
| {"role": "user", "content": text} | |
| ], | |
| "max_tokens": 1000 | |
| } | |
| } | |
| out_file.write(json.dumps(batch_entry) + "\n") | |
| print(f"Batch file created: {output_file}") | |
| def upload_batch_file(self, batch_file_path): | |
| """Upload the prepared batch input file.""" | |
| with open(batch_file_path, "rb") as f: | |
| batch_input_file = self.client.files.create( | |
| file=f, | |
| purpose="batch" | |
| ) | |
| print(f"Batch input file uploaded. File ID: {batch_input_file.id}") | |
| return batch_input_file.id | |
| def create_batch(self, file_id): | |
| """Create a batch job with the uploaded input file.""" | |
| batch = self.client.batches.create( | |
| input_file_id=file_id, | |
| endpoint="/v1/chat/completions", | |
| completion_window="24h", | |
| metadata={ | |
| "description": "Deed analysis batch" | |
| } | |
| ) | |
| print(f"Batch created. Batch ID: {batch.id}") | |
| return batch.id | |
| def check_batch_status(self, batch_id): | |
| """Check the status of a batch job.""" | |
| batch_status = self.client.batches.retrieve(batch_id) | |
| print(f"Batch Status: {batch_status.status}") | |
| if batch_status.status == "completed": | |
| output_file_id = batch_status.output_file_id | |
| print(f"Output File ID: {output_file_id}") | |
| return output_file_id | |
| else: | |
| return None | |
| def retrieve_results(self, output_file_id, output_path): | |
| """Retrieve the results of a completed batch job.""" | |
| file_response = self.client.files.content(output_file_id) | |
| with open(output_path, "wb") as out_file: | |
| out_file.write(file_response.read()) | |
| print(f"Batch results downloaded to {output_path}") | |
| if __name__ == "__main__": | |
| processor = BatchProcessor() | |
| folder_path = "" | |
| batch_input_file = "batch_input.jsonl" | |
| batch_output_file = "batch_output.jsonl" | |
| # Step 1: Prepare the batch input file | |
| processor.prepare_batch(folder_path, batch_input_file) | |
| # Step 2: Upload the batch input file | |
| file_id = processor.upload_batch_file(batch_input_file) | |
| # Step 3: Create a batch job | |
| batch_id = processor.create_batch(file_id) | |
| # Step 4: Poll for batch status | |
| import time | |
| while True: | |
| output_file_id = processor.check_batch_status(batch_id) | |
| if output_file_id: | |
| break | |
| print("Batch not complete. Retrying in 30 minutes...") | |
| time.sleep(1800) | |
| # Step 5: Retrieve the results | |
| processor.retrieve_results(output_file_id, batch_output_file) | |