Spaces:
Sleeping
Sleeping
File size: 6,143 Bytes
24beb7e 12a4071 24beb7e fd85f3e 24beb7e 2883543 12a4071 2883543 24beb7e fd85f3e 24beb7e cce8d96 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | import os
import tempfile
import datetime
from abc import ABC, abstractmethod
from fpdf import FPDF # <-- Import FPDF
import json
import pandas as pd
class PDFDocumentGenerator(ABC):
"""
Abstract Base Class for generating a PDF document from data
using a pure-Python library (fpdf2).
"""
def __init__(self, data: dict):
"""
Initializes the generator with user-provided data.
:param data: A dictionary containing the data for the document.
"""
self.data = data
@abstractmethod
def build_document(self, pdf: FPDF):
"""
Subclasses must implement this method to build the PDF
by calling methods on the provided fpdf.FPDF object.
:param pdf: An FPDF object to build upon.
"""
pass
def _get_day_suffix(self, day):
"""Returns the ordinal suffix (st, nd, rd, th) for a given day."""
if 11 <= day <= 13:
return 'th'
suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(day % 10, 'th')
return suffix
def _get_smart_doc_data(self, smart_df: pd.DataFrame, doc_type: str) -> dict:
"""
Helper untuk mencari data dengan 'extraction_result' yang memiliki
skor tertinggi (jumlah keys - jumlah null/None keys) dari DataFrame
smart_upload berdasarkan doc_type.
"""
filtered_df = smart_df[smart_df['doc_type'] == doc_type].copy()
if filtered_df.empty:
return {}
# --- Logic Change Start ---
def calculate_extraction_score(result_data):
"""
Helper internal untuk parse dan hitung skor.
Skor = (Jumlah Total Keys) - (Jumlah Keys dengan Value None).
"""
data_dict = None
# 1. Coba ubah data menjadi dictionary
if isinstance(result_data, dict):
data_dict = result_data
elif isinstance(result_data, str):
try:
# Coba parse JSON string
data_dict = json.loads(result_data)
except (json.JSONDecodeError, TypeError):
# Jika bukan JSON valid atau None, anggap bukan dict
data_dict = None
# 2. Jika bukan dictionary (atau None, int, float, dll.), skornya 0
if not isinstance(data_dict, dict):
return 0
# 3. Hitung skor berdasarkan formula
total_keys = len(data_dict)
if total_keys == 0:
return 0 # Dict kosong skor 0
# Hitung nilai yang 'None' secara efisien
null_count = sum(1 for value in data_dict.values() if value is None)
# Skor akhir = (Total Keys) - (Null Keys)
return total_keys - null_count
# 1. Buat kolom baru '_extraction_score' berisi skor untuk setiap baris
filtered_df['_extraction_score'] = filtered_df['extraction_result'].apply(calculate_extraction_score)
# 2. Jika skor tertinggi adalah 0 (atau kurang), berarti tidak ada data valid
# (Contoh: semua field null, atau dict kosong, atau JSON invalid)
if filtered_df['_extraction_score'].max() <= 0:
# Ini adalah pengecekannya:
if hasattr(self, 'logger'): # Cek jika logger ada
self.logger.warning(f"No valid extraction results with non-null keys found for doc_type: {doc_type}")
return {}
# 3. Sort berdasarkan '_extraction_score' descending dan ambil baris pertama
best_doc_row = filtered_df.sort_values(by='_extraction_score', ascending=False).iloc[0]
# --- Logic Change End ---
# Ambil 'extraction_result' dari baris terbaik
result = best_doc_row['extraction_result']
# Gunakan logic parsing yang sudah ada untuk memastikan return type-nya dict
if isinstance(result, dict):
# Kembalikan dict ekstraksi terbaru yang ditemukan
return result
elif isinstance(result, str):
try:
return json.loads(result)
except json.JSONDecodeError:
# Ini adalah pengecekan kedua:
if hasattr(self, 'logger'): # Cek jika logger ada
self.logger.warning(f"Failed to decode best extraction result for doc_type: {doc_type}")
return {} # Fallback
# Fallback jika 'result' bukan dict atau str (misal: None)
return {}
def compile_pdf(self, tempdir: str) -> (str, str):
"""
Generates the PDF document using fpdf2 and saves it in tempdir.
:param tempdir: The temporary directory to use for compilation.
:return: A tuple of (pdf_filepath, error_message).
On success, (pdf_filepath, None).
On failure, (None, error_message).
"""
try:
pdf = FPDF()
pdf.add_page()
self.build_document(pdf)
pdf_filename = "document.pdf"
pdf_filepath = os.path.join(tempdir, pdf_filename)
pdf.output(pdf_filepath)
if not os.path.exists(pdf_filepath):
return None, "PDF file was not created by fpdf2."
pdf.set_margins(25, 25)
return pdf_filepath, None
except Exception as e:
print(f"Error during PDF generation: {e}")
return None, str(e)
def get_prefill_data(self) -> dict:
"""
Fetches and merges all required data from various DB sources
based on the 'application_id' in self.data.
Subclasses should override this to implement their specific
data-fetching and merging logic.
"""
# Base implementation just returns the data it was given.
# This acts as a placeholder for generators that don't need prefill.
self.logger.warning("Base get_prefill_data called. No data merging performed.")
return self.data |