Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| import os | |
| import json | |
| import time | |
| import base64 | |
| import uuid | |
| from flask_cors import CORS | |
| from google import genai | |
| from PIL import Image | |
| import io | |
| from typing import List, Dict, Any | |
| import logging | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configure GenAI | |
| GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') | |
| if not GOOGLE_API_KEY: | |
| raise ValueError("GOOGLE_API_KEY environment variable is required") | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| # In-memory storage for multi-part receipts (use Redis/database in production) | |
| receipt_sessions = {} | |
| RECEIPT_ANALYSIS_PROMPT = """ | |
| Analyze this receipt image and extract the following information in JSON format: | |
| - items: List of items with their details | |
| - receipt_date: Date from the receipt (YYYY-MM-DD format) | |
| - total_amount: Total amount from receipt | |
| - store_name: Name of the store/merchant | |
| For each item, provide: | |
| - name: Item name/description | |
| - quantity: Quantity purchased (default to 1 if not specified) | |
| - unit_price: Price per unit | |
| - total_price: Total price for this item | |
| - category: Categorize as either "stock" (inventory items, products for resale, raw materials) or "expense" (office supplies, utilities, services, consumables) | |
| Use your best judgment to categorize items: | |
| - "stock": Products intended for sale, raw materials, inventory items | |
| - "expense": Office supplies, utilities, services, maintenance, consumables | |
| Return only valid JSON without any markdown formatting or code blocks. | |
| """ | |
| MULTI_PART_ANALYSIS_PROMPT = """ | |
| Analyze these multiple images of the same receipt and extract all information in JSON format: | |
| - items: Complete list of all items from all images | |
| - receipt_date: Date from the receipt (YYYY-MM-DD format) | |
| - total_amount: Total amount from receipt | |
| - store_name: Name of the store/merchant | |
| For each item, provide: | |
| - name: Item name/description | |
| - quantity: Quantity purchased (default to 1 if not specified) | |
| - unit_price: Price per unit | |
| - total_price: Total price for this item | |
| - category: Categorize as either "stock" (inventory items, products for resale, raw materials) or "expense" (office supplies, utilities, services, consumables) | |
| Combine information from all images to create a complete receipt analysis. | |
| Return only valid JSON without any markdown formatting or code blocks. | |
| """ | |
| def encode_image_to_base64(image_data): | |
| """Convert image data to base64 string.""" | |
| try: | |
| if isinstance(image_data, str): | |
| # If it's already base64, return as is | |
| return image_data | |
| # Convert bytes to base64 | |
| return base64.b64encode(image_data).decode('utf-8') | |
| except Exception as e: | |
| logger.error(f"Error encoding image: {str(e)}") | |
| raise | |
| def process_single_receipt(image_data, content_type="image/jpeg"): | |
| """Process a single receipt image.""" | |
| try: | |
| base64_image = encode_image_to_base64(image_data) | |
| # Create the request with the image | |
| response = client.models.generate_content( | |
| model='gemini-2.0-flash', | |
| contents=[ | |
| { | |
| 'parts': [ | |
| {'text': RECEIPT_ANALYSIS_PROMPT}, | |
| { | |
| 'inline_data': { | |
| 'mime_type': content_type, | |
| 'data': base64_image | |
| } | |
| } | |
| ] | |
| } | |
| ] | |
| ) | |
| # Extract and parse the response | |
| result_text = response.text.strip() | |
| # Remove any markdown code block formatting | |
| if result_text.startswith('```json'): | |
| result_text = result_text[7:] | |
| if result_text.endswith('```'): | |
| result_text = result_text[:-3] | |
| result_json = json.loads(result_text.strip()) | |
| return result_json | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON parsing error: {str(e)}") | |
| raise ValueError(f"Failed to parse AI response as JSON: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error processing receipt: {str(e)}") | |
| raise | |
| def process_multi_part_receipt(images_data, content_types): | |
| """Process multiple images of the same receipt.""" | |
| try: | |
| parts = [{'text': MULTI_PART_ANALYSIS_PROMPT}] | |
| # Add each image to the request | |
| for i, (image_data, content_type) in enumerate(zip(images_data, content_types)): | |
| base64_image = encode_image_to_base64(image_data) | |
| parts.append({ | |
| 'inline_data': { | |
| 'mime_type': content_type, | |
| 'data': base64_image | |
| } | |
| }) | |
| response = client.models.generate_content( | |
| model='gemini-1.5-flash', | |
| contents=[{'parts': parts}] | |
| ) | |
| # Extract and parse the response | |
| result_text = response.text.strip() | |
| # Remove any markdown code block formatting | |
| if result_text.startswith('```json'): | |
| result_text = result_text[7:] | |
| if result_text.endswith('```'): | |
| result_text = result_text[:-3] | |
| result_json = json.loads(result_text.strip()) | |
| return result_json | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON parsing error: {str(e)}") | |
| raise ValueError(f"Failed to parse AI response as JSON: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error processing multi-part receipt: {str(e)}") | |
| raise | |
| def process_receipt(): | |
| """Process a single receipt image.""" | |
| try: | |
| if 'image' not in request.files: | |
| return jsonify({'error': 'No image file provided'}), 400 | |
| file = request.files['image'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No image file selected'}), 400 | |
| # Read image data | |
| image_data = file.read() | |
| content_type = file.content_type or 'image/jpeg' | |
| # Process the receipt | |
| result = process_single_receipt(image_data, content_type) | |
| return jsonify({ | |
| 'success': True, | |
| 'data': result, | |
| 'message': 'Receipt processed successfully' | |
| }) | |
| except ValueError as e: | |
| return jsonify({'error': str(e)}), 400 | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {str(e)}") | |
| return jsonify({'error': 'Internal server error'}), 500 | |
| def start_receipt_session(): | |
| """Start a new multi-part receipt session.""" | |
| session_id = str(uuid.uuid4()) | |
| receipt_sessions[session_id] = { | |
| 'images': [], | |
| 'content_types': [], | |
| 'created_at': time.time() | |
| } | |
| return jsonify({ | |
| 'success': True, | |
| 'session_id': session_id, | |
| 'message': 'Receipt session started' | |
| }) | |
| def add_receipt_part(session_id): | |
| """Add an image part to an existing receipt session.""" | |
| try: | |
| if session_id not in receipt_sessions: | |
| return jsonify({'error': 'Invalid session ID'}), 404 | |
| if 'image' not in request.files: | |
| return jsonify({'error': 'No image file provided'}), 400 | |
| file = request.files['image'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No image file selected'}), 400 | |
| # Read and store image data | |
| image_data = file.read() | |
| content_type = file.content_type or 'image/jpeg' | |
| receipt_sessions[session_id]['images'].append(image_data) | |
| receipt_sessions[session_id]['content_types'].append(content_type) | |
| return jsonify({ | |
| 'success': True, | |
| 'parts_count': len(receipt_sessions[session_id]['images']), | |
| 'message': 'Receipt part added successfully' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error adding receipt part: {str(e)}") | |
| return jsonify({'error': 'Internal server error'}), 500 | |
| def process_receipt_session(session_id): | |
| """Process all parts of a multi-part receipt.""" | |
| try: | |
| if session_id not in receipt_sessions: | |
| return jsonify({'error': 'Invalid session ID'}), 404 | |
| session_data = receipt_sessions[session_id] | |
| if not session_data['images']: | |
| return jsonify({'error': 'No images in session'}), 400 | |
| # Process the multi-part receipt | |
| result = process_multi_part_receipt( | |
| session_data['images'], | |
| session_data['content_types'] | |
| ) | |
| # Clean up session | |
| del receipt_sessions[session_id] | |
| return jsonify({ | |
| 'success': True, | |
| 'data': result, | |
| 'message': 'Multi-part receipt processed successfully' | |
| }) | |
| except ValueError as e: | |
| return jsonify({'error': str(e)}), 400 | |
| except Exception as e: | |
| logger.error(f"Error processing receipt session: {str(e)}") | |
| return jsonify({'error': 'Internal server error'}), 500 | |
| def bulk_process_receipts(): | |
| """Process multiple individual receipts in bulk.""" | |
| try: | |
| if 'images' not in request.files: | |
| return jsonify({'error': 'No image files provided'}), 400 | |
| files = request.files.getlist('images') | |
| if not files: | |
| return jsonify({'error': 'No image files selected'}), 400 | |
| results = [] | |
| errors = [] | |
| for i, file in enumerate(files): | |
| try: | |
| if file.filename == '': | |
| errors.append(f"File {i+1}: No filename") | |
| continue | |
| # Read image data | |
| image_data = file.read() | |
| content_type = file.content_type or 'image/jpeg' | |
| # Process the receipt | |
| result = process_single_receipt(image_data, content_type) | |
| results.append({ | |
| 'file_index': i + 1, | |
| 'filename': file.filename, | |
| 'data': result | |
| }) | |
| except Exception as e: | |
| errors.append(f"File {i+1} ({file.filename}): {str(e)}") | |
| return jsonify({ | |
| 'success': True, | |
| 'processed_count': len(results), | |
| 'error_count': len(errors), | |
| 'results': results, | |
| 'errors': errors, | |
| 'message': f'Bulk processing completed. {len(results)} successful, {len(errors)} errors.' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error in bulk processing: {str(e)}") | |
| return jsonify({'error': 'Internal server error'}), 500 | |
| def health_check(): | |
| """Health check endpoint.""" | |
| return jsonify({ | |
| 'status': 'healthy', | |
| 'timestamp': time.time(), | |
| 'active_sessions': len(receipt_sessions) | |
| }) | |
| def cleanup_old_sessions(): | |
| """Clean up old receipt sessions (older than 1 hour).""" | |
| current_time = time.time() | |
| cutoff_time = current_time - 3600 # 1 hour | |
| old_sessions = [ | |
| session_id for session_id, data in receipt_sessions.items() | |
| if data['created_at'] < cutoff_time | |
| ] | |
| for session_id in old_sessions: | |
| del receipt_sessions[session_id] | |
| return jsonify({ | |
| 'success': True, | |
| 'cleaned_sessions': len(old_sessions), | |
| 'remaining_sessions': len(receipt_sessions) | |
| }) | |
| if __name__ == "__main__": | |
| app.run(debug=True, host="0.0.0.0", port=7860) |