Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Unified invoice processing script that handles both PDF and Excel files. | |
| """ | |
| import os | |
| import sys | |
| # Add the project root directory to the Python path | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| import json | |
| import logging | |
| from typing import Optional | |
| from pathlib import Path | |
| import argparse | |
| import tempfile | |
| from dotenv import load_dotenv | |
| # Import document processing functions | |
| from process.process_pdf_with_headers import process_pdf_with_headers | |
| from process.process_excel import process_excel_file | |
| from src.excel_to_pdf import excel_to_pdf, convert_xls_to_xlsx | |
| from src.docx_to_pdf import docx_to_pdf | |
| from src.txt_to_pdf import txt_to_pdf | |
| # Load environment variables from .env file if it exists | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def setup_google_client(): | |
| """Set up and return the Google Generative AI client.""" | |
| try: | |
| from google import genai | |
| api_key = os.environ.get("GOOGLE_API_KEY") | |
| if not api_key: | |
| logger.warning("GOOGLE_API_KEY environment variable not set. PDF processing with LLM will not be available.") | |
| return None | |
| return genai.Client(api_key=api_key) | |
| except ImportError: | |
| logger.warning("google-generativeai package not installed. PDF processing with LLM will not be available.") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error setting up Google client: {str(e)}") | |
| return None | |
| def save_to_json(invoice_data, input_file_path: str) -> str: | |
| """ | |
| Save the invoice data to a JSON file in the 'result' directory. | |
| Args: | |
| invoice_data: The invoice data to save (can be a dictionary or an object) | |
| input_file_path: The path to the input file | |
| Returns: | |
| The path to the saved JSON file | |
| """ | |
| # Create result directory if it doesn't exist | |
| result_dir = "result" | |
| os.makedirs(result_dir, exist_ok=True) | |
| # Get the base filename without extension | |
| base_filename = os.path.splitext(os.path.basename(input_file_path))[0] | |
| # Create the output JSON file path | |
| output_file_path = os.path.join(result_dir, f"{base_filename}.json") | |
| # Convert invoice data to JSON-serializable format | |
| # Check if invoice_data is a dictionary or an object | |
| if isinstance(invoice_data, dict): | |
| # It's already a dictionary, just ensure items are serializable | |
| json_data = invoice_data | |
| else: | |
| # It's an object, convert to dictionary | |
| json_data = { | |
| "headers": invoice_data.headers if hasattr(invoice_data, 'headers') else [], | |
| "items": [item.model_dump() if hasattr(item, 'model_dump') else item.dict() | |
| for item in invoice_data.items] | |
| } | |
| # Write to JSON file | |
| with open(output_file_path, 'w', encoding='utf-8') as f: | |
| json.dump(json_data, f, indent=2, ensure_ascii=False) | |
| logger.info(f"Saved invoice data to {output_file_path}") | |
| return output_file_path | |
| def process_file(file_path: str) -> None: | |
| """ | |
| Process an invoice file (PDF, Excel, or Document) and print the extracted data. | |
| Args: | |
| file_path: Path to the invoice file | |
| """ | |
| file_path = os.path.abspath(file_path) | |
| if not os.path.exists(file_path): | |
| logger.error(f"File not found: {file_path}") | |
| return | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| llm_client = setup_google_client() | |
| temp_pdf_path = tempfile.NamedTemporaryFile(delete=False, suffix='.pdf').name | |
| if file_ext in ['.xlsx', '.xls']: | |
| # Process Excel file | |
| # For .xls files, convert to .xlsx format first | |
| if file_ext == '.xls': | |
| xlsx_path = convert_xls_to_xlsx(file_path) | |
| file_path = xlsx_path | |
| # Create output JSON path | |
| output_json_path = os.path.join("result", f"{os.path.splitext(os.path.basename(file_path))[0]}.json") | |
| result = process_excel_file( | |
| file_path=file_path, | |
| output_path=output_json_path, | |
| chunk_size=20, | |
| max_workers=2 | |
| ) | |
| # Create the expected invoice_data format | |
| invoice_data = { | |
| "headers": ["Product Name", "Batch Number", "Expiry Date", "MRP", "Quantity"], | |
| "items": result["items"] | |
| } | |
| elif file_ext == '.pdf': | |
| try: | |
| logger.info(f"Processing PDF file with header context: {file_path}") | |
| # Process the PDF using process_pdf_with_headers | |
| invoice_data_obj = process_pdf_with_headers(file_path) | |
| # Convert the InvoiceData object to the format expected by the rest of the code | |
| invoice_data = { | |
| "headers": invoice_data_obj.headers, | |
| "items": [item.model_dump() if hasattr(item, 'model_dump') else item.dict() for item in invoice_data_obj.items] | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing PDF with headers: {str(e)}") | |
| elif file_ext in ['.doc', '.docx', '.txt']: | |
| # Process Document file by first converting to PDF | |
| # Ensure the required modules are imported | |
| if file_ext == '.txt': | |
| temp_pdf_path = txt_to_pdf(file_path, temp_pdf_path) | |
| logger.info(f"Converted text file to PDF: {temp_pdf_path}") | |
| elif file_ext in ['.doc', '.docx']: | |
| temp_pdf_path = docx_to_pdf(file_path, temp_pdf_path) | |
| logger.info(f"Converted document file to PDF: {temp_pdf_path}") | |
| invoice_data_obj = process_pdf_with_headers(temp_pdf_path) | |
| # Convert the InvoiceData object to the format expected by the rest of the code | |
| invoice_data = { | |
| "headers": invoice_data_obj.headers, | |
| "items": [item.model_dump() if hasattr(item, 'model_dump') else item.dict() for item in invoice_data_obj.items] | |
| } | |
| else: | |
| logger.error(f"Unsupported file format: {file_ext}") | |
| logger.error("Supported formats: .pdf, .xlsx, .xls, .doc, .docx, .txt") | |
| return | |
| json_path = save_to_json(invoice_data, file_path) | |
| print(f"Results saved to: {json_path}") | |
| # Print results | |
| if isinstance(invoice_data, dict): | |
| # It's a dictionary | |
| items_count = len(invoice_data.get('items', [])) | |
| items = invoice_data.get('items', []) | |
| print(f"\nExtracted {items_count} items from {file_path}:") | |
| for i, item in enumerate(items, 1): | |
| print(f"\nItem {i}:") | |
| print(f" Product: {item.get('product_name', 'N/A')}") | |
| print(f" Batch Number: {item.get('batch_number', 'N/A')}") | |
| print(f" Expiry: {item.get('expiry_date', 'N/A')}") | |
| print(f" MRP: {item.get('mrp', 'N/A')}") | |
| print(f" Quantity: {item.get('quantity', 'N/A')}") | |
| else: | |
| # It's an object (likely a Pydantic model) | |
| items_count = len(invoice_data.items) if hasattr(invoice_data, 'items') else 0 | |
| print(f"\nExtracted {items_count} items from {file_path}:") | |
| for i, item in enumerate(invoice_data.items if hasattr(invoice_data, 'items') else [], 1): | |
| print(f"\nItem {i}:") | |
| print(f" Product: {getattr(item, 'product_name', 'N/A')}") | |
| print(f" Batch Number: {getattr(item, 'batch_number', 'N/A')}") | |
| print(f" Expiry: {getattr(item, 'expiry_date', 'N/A')}") | |
| print(f" MRP: {getattr(item, 'mrp', 'N/A')}") | |
| print(f" Quantity: {getattr(item, 'quantity', 'N/A')}") | |
| return json_path | |
| def main(): | |
| """Main function to parse arguments and process files.""" | |
| parser = argparse.ArgumentParser(description="Process invoice files (PDF, Excel, XLS)") | |
| parser.add_argument("--file_path", help="Path to the invoice file") | |
| args = parser.parse_args() | |
| try: | |
| process_file(args.file_path) | |
| except Exception as e: | |
| logger.error(f"Error processing file: {str(e)}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |