Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import datetime | |
| from abc import ABC, abstractmethod | |
| from fpdf import FPDF # <-- Import FPDF | |
| import json | |
| import pandas as pd | |
| class PDFDocumentGenerator(ABC): | |
| """ | |
| Abstract Base Class for generating a PDF document from data | |
| using a pure-Python library (fpdf2). | |
| """ | |
| def __init__(self, data: dict): | |
| """ | |
| Initializes the generator with user-provided data. | |
| :param data: A dictionary containing the data for the document. | |
| """ | |
| self.data = data | |
| def build_document(self, pdf: FPDF): | |
| """ | |
| Subclasses must implement this method to build the PDF | |
| by calling methods on the provided fpdf.FPDF object. | |
| :param pdf: An FPDF object to build upon. | |
| """ | |
| pass | |
| def _get_day_suffix(self, day): | |
| """Returns the ordinal suffix (st, nd, rd, th) for a given day.""" | |
| if 11 <= day <= 13: | |
| return 'th' | |
| suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(day % 10, 'th') | |
| return suffix | |
| def _get_smart_doc_data(self, smart_df: pd.DataFrame, doc_type: str) -> dict: | |
| """ | |
| Helper untuk mencari data dengan 'extraction_result' yang memiliki | |
| skor tertinggi (jumlah keys - jumlah null/None keys) dari DataFrame | |
| smart_upload berdasarkan doc_type. | |
| """ | |
| filtered_df = smart_df[smart_df['doc_type'] == doc_type].copy() | |
| if filtered_df.empty: | |
| return {} | |
| # --- Logic Change Start --- | |
| def calculate_extraction_score(result_data): | |
| """ | |
| Helper internal untuk parse dan hitung skor. | |
| Skor = (Jumlah Total Keys) - (Jumlah Keys dengan Value None). | |
| """ | |
| data_dict = None | |
| # 1. Coba ubah data menjadi dictionary | |
| if isinstance(result_data, dict): | |
| data_dict = result_data | |
| elif isinstance(result_data, str): | |
| try: | |
| # Coba parse JSON string | |
| data_dict = json.loads(result_data) | |
| except (json.JSONDecodeError, TypeError): | |
| # Jika bukan JSON valid atau None, anggap bukan dict | |
| data_dict = None | |
| # 2. Jika bukan dictionary (atau None, int, float, dll.), skornya 0 | |
| if not isinstance(data_dict, dict): | |
| return 0 | |
| # 3. Hitung skor berdasarkan formula | |
| total_keys = len(data_dict) | |
| if total_keys == 0: | |
| return 0 # Dict kosong skor 0 | |
| # Hitung nilai yang 'None' secara efisien | |
| null_count = sum(1 for value in data_dict.values() if value is None) | |
| # Skor akhir = (Total Keys) - (Null Keys) | |
| return total_keys - null_count | |
| # 1. Buat kolom baru '_extraction_score' berisi skor untuk setiap baris | |
| filtered_df['_extraction_score'] = filtered_df['extraction_result'].apply(calculate_extraction_score) | |
| # 2. Jika skor tertinggi adalah 0 (atau kurang), berarti tidak ada data valid | |
| # (Contoh: semua field null, atau dict kosong, atau JSON invalid) | |
| if filtered_df['_extraction_score'].max() <= 0: | |
| # Ini adalah pengecekannya: | |
| if hasattr(self, 'logger'): # Cek jika logger ada | |
| self.logger.warning(f"No valid extraction results with non-null keys found for doc_type: {doc_type}") | |
| return {} | |
| # 3. Sort berdasarkan '_extraction_score' descending dan ambil baris pertama | |
| best_doc_row = filtered_df.sort_values(by='_extraction_score', ascending=False).iloc[0] | |
| # --- Logic Change End --- | |
| # Ambil 'extraction_result' dari baris terbaik | |
| result = best_doc_row['extraction_result'] | |
| # Gunakan logic parsing yang sudah ada untuk memastikan return type-nya dict | |
| if isinstance(result, dict): | |
| # Kembalikan dict ekstraksi terbaru yang ditemukan | |
| return result | |
| elif isinstance(result, str): | |
| try: | |
| return json.loads(result) | |
| except json.JSONDecodeError: | |
| # Ini adalah pengecekan kedua: | |
| if hasattr(self, 'logger'): # Cek jika logger ada | |
| self.logger.warning(f"Failed to decode best extraction result for doc_type: {doc_type}") | |
| return {} # Fallback | |
| # Fallback jika 'result' bukan dict atau str (misal: None) | |
| return {} | |
| def compile_pdf(self, tempdir: str) -> (str, str): | |
| """ | |
| Generates the PDF document using fpdf2 and saves it in tempdir. | |
| :param tempdir: The temporary directory to use for compilation. | |
| :return: A tuple of (pdf_filepath, error_message). | |
| On success, (pdf_filepath, None). | |
| On failure, (None, error_message). | |
| """ | |
| try: | |
| pdf = FPDF() | |
| pdf.add_page() | |
| self.build_document(pdf) | |
| pdf_filename = "document.pdf" | |
| pdf_filepath = os.path.join(tempdir, pdf_filename) | |
| pdf.output(pdf_filepath) | |
| if not os.path.exists(pdf_filepath): | |
| return None, "PDF file was not created by fpdf2." | |
| pdf.set_margins(25, 25) | |
| return pdf_filepath, None | |
| except Exception as e: | |
| print(f"Error during PDF generation: {e}") | |
| return None, str(e) | |
| def get_prefill_data(self) -> dict: | |
| """ | |
| Fetches and merges all required data from various DB sources | |
| based on the 'application_id' in self.data. | |
| Subclasses should override this to implement their specific | |
| data-fetching and merging logic. | |
| """ | |
| # Base implementation just returns the data it was given. | |
| # This acts as a placeholder for generators that don't need prefill. | |
| self.logger.warning("Base get_prefill_data called. No data merging performed.") | |
| return self.data |