Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import os | |
| import json | |
| import re | |
| import concurrent.futures | |
| from dotenv import load_dotenv | |
| from google import genai | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import logging | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| def setup_environment() -> None: | |
| """ | |
| Load environment variables and configure the Gemini API client. | |
| Returns: | |
| None | |
| """ | |
| load_dotenv() | |
| def get_gemini_client() -> genai.Client: | |
| """ | |
| Initialize and return a Gemini API client. | |
| Returns: | |
| genai.Client: Configured Gemini client | |
| """ | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| raise ValueError("GEMINI_API_KEY environment variable not set") | |
| return genai.Client(api_key=api_key) | |
| def process_chunk(chunk_info: Tuple[int, pd.DataFrame, int, int], client: genai.Client) -> List[Dict[str, Any]]: | |
| """ | |
| Process a single chunk of data using Gemini API. | |
| Args: | |
| chunk_info: Tuple containing (chunk_index, dataframe_chunk, start_index, end_index) | |
| client: Gemini API client | |
| Returns: | |
| List of extracted items from the chunk | |
| """ | |
| i, chunk_df, start_idx, end_idx = chunk_info | |
| # Create a structured extraction prompt for the specific chunk | |
| extraction_prompt = f""" | |
| Extract product information from rows {start_idx} to {end_idx-1} in this Excel data. | |
| For each product row, extract: | |
| 1. Product name | |
| 2. Batch number | |
| 3. Expiry date (MM/YY format) | |
| 4. MRP (Maximum Retail Price) | |
| 5. Quantity (as integer) | |
| Return ONLY a JSON array of objects, one for each product, with these properties: | |
| [ | |
| {{ | |
| "product_name": "...", | |
| "batch_number": "...", | |
| "expiry_date": "...", | |
| "mrp": "...", | |
| "quantity": ... | |
| }}, | |
| ... | |
| ] | |
| Use null for any value you cannot extract. Return ONLY the JSON array. | |
| """ | |
| chunk_items = [] | |
| # Process chunk | |
| try: | |
| chunk_response = client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=[extraction_prompt, chunk_df.to_string()], | |
| config={ | |
| 'response_mime_type': 'application/json', | |
| 'temperature': 0.1, | |
| 'max_output_tokens': 8192, | |
| } | |
| ) | |
| # Extract items | |
| chunk_text = chunk_response.text | |
| # Fix common JSON issues | |
| chunk_text = re.sub(r'[\n\r\t]', '', chunk_text) | |
| chunk_text = re.sub(r',\s*]', ']', chunk_text) | |
| # Extract JSON array | |
| match = re.search(r'\[(.*)\]', chunk_text, re.DOTALL) | |
| if match: | |
| try: | |
| chunk_items = json.loads('[' + match.group(1) + ']') | |
| logger.info(f"Successfully processed chunk {i+1} with {len(chunk_items)} items") | |
| except json.JSONDecodeError: | |
| logger.error(f"Error parsing JSON in chunk {i+1}") | |
| except Exception as e: | |
| logger.error(f"Error processing chunk {i+1}: {str(e)}") | |
| return chunk_items | |
| def prepare_chunks(df: pd.DataFrame, chunk_size: int) -> List[Tuple[int, pd.DataFrame, int, int]]: | |
| """ | |
| Prepare dataframe chunks for processing. | |
| Args: | |
| df: Input dataframe | |
| chunk_size: Size of each chunk | |
| Returns: | |
| List of chunk information tuples | |
| """ | |
| num_chunks = (len(df) + chunk_size - 1) // chunk_size | |
| chunks_to_process = [] | |
| for i in range(num_chunks): | |
| start_idx = i * chunk_size | |
| end_idx = min((i + 1) * chunk_size, len(df)) | |
| chunk_df = df.iloc[start_idx:end_idx] | |
| chunks_to_process.append((i, chunk_df, start_idx, end_idx)) | |
| return chunks_to_process | |
| def process_excel_file(file_path: str, output_path: str, chunk_size: int = 20, max_workers: int = 2) -> Dict[str, Any]: | |
| """ | |
| Process an Excel file to extract product information using Gemini API. | |
| Args: | |
| file_path: Path to the Excel file | |
| output_path: Path to save the extracted data | |
| chunk_size: Size of each chunk for processing | |
| max_workers: Maximum number of parallel workers | |
| Returns: | |
| Dict containing the extraction results | |
| """ | |
| # Setup environment | |
| setup_environment() | |
| client = get_gemini_client() | |
| # Read Excel file | |
| logger.info(f"Reading Excel file: {file_path}") | |
| df = pd.read_excel(file_path) | |
| # Prepare chunks for processing | |
| chunks_to_process = prepare_chunks(df, chunk_size) | |
| num_chunks = len(chunks_to_process) | |
| # Process chunks in parallel | |
| logger.info(f"Processing {num_chunks} chunks with {max_workers} workers") | |
| all_items = [] | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Pass client to each process_chunk call | |
| results = list(executor.map( | |
| lambda chunk: process_chunk(chunk, client), | |
| chunks_to_process | |
| )) | |
| # Combine results | |
| for chunk_items in results: | |
| all_items.extend(chunk_items) | |
| # Create final result | |
| final_result = { | |
| "items": all_items, | |
| "extraction_status": "COMPLETE" if all_items else "INCOMPLETE", | |
| "total_items": len(all_items) | |
| } | |
| # Save the final result | |
| with open(output_path, "w") as f: | |
| json.dump(final_result, f, indent=2) | |
| logger.info(f"Extraction complete. Total items extracted: {len(all_items)}") | |
| return final_result | |
| def main() -> None: | |
| """ | |
| Main function to run the Excel processing script. | |
| """ | |
| input_file = 'expiry_invoice/SAC01000975.xls' | |
| output_file = "extracted_invoice_data.json" | |
| # Ensure the output directory exists | |
| output_path = Path(output_file) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| # Process the Excel file | |
| result = process_excel_file( | |
| file_path=input_file, | |
| output_path=output_file, | |
| chunk_size=20, | |
| max_workers=2 | |
| ) | |
| print(f"Extraction complete. Total items extracted: {result['total_items']}") | |
| if __name__ == "__main__": | |
| main() |