|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import io |
|
|
import zipfile |
|
|
from datetime import datetime |
|
|
import traceback |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
|
|
|
try: |
|
|
from PyPDF2 import PdfReader, PdfWriter |
|
|
from reportlab.pdfgen import canvas |
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.pdfbase import pdfmetrics |
|
|
from reportlab.pdfbase.ttfonts import TTFont |
|
|
except ImportError as e: |
|
|
print(f"กำลังติดตั้ง dependencies: {e}") |
|
|
import subprocess |
|
|
import sys |
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "PyPDF2", "reportlab", "pandas"]) |
|
|
from PyPDF2 import PdfReader, PdfWriter |
|
|
from reportlab.pdfgen import canvas |
|
|
from reportlab.lib.pagesizes import letter |
|
|
|
|
|
def analyze_pdf_fields(pdf_path): |
|
|
"""วิเคราะห์ฟิลด์ใน PDF""" |
|
|
try: |
|
|
reader = PdfReader(pdf_path) |
|
|
all_fields = {} |
|
|
|
|
|
|
|
|
if reader.trailer.get("/Root") and reader.trailer["/Root"].get("/AcroForm"): |
|
|
acro_form = reader.trailer["/Root"]["/AcroForm"] |
|
|
if "/Fields" in acro_form: |
|
|
fields = acro_form["/Fields"] |
|
|
for field in fields: |
|
|
field_obj = field.get_object() |
|
|
if "/T" in field_obj: |
|
|
field_name = str(field_obj["/T"]).strip("()") |
|
|
field_type = str(field_obj.get("/FT", "Unknown")) |
|
|
all_fields[field_name] = { |
|
|
'type': field_type, |
|
|
'method': 'AcroForm' |
|
|
} |
|
|
|
|
|
|
|
|
for page_num, page in enumerate(reader.pages): |
|
|
if "/Annots" in page: |
|
|
try: |
|
|
annotations = page["/Annots"] |
|
|
for annotation in annotations: |
|
|
annot_obj = annotation.get_object() |
|
|
if annot_obj.get("/Subtype") == "/Widget": |
|
|
if "/T" in annot_obj: |
|
|
field_name = str(annot_obj["/T"]).strip("()") |
|
|
field_type = str(annot_obj.get("/FT", "Widget")) |
|
|
all_fields[field_name] = { |
|
|
'type': field_type, |
|
|
'page': page_num + 1, |
|
|
'method': 'Annotation' |
|
|
} |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
return all_fields |
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
def fill_pdf_form(pdf_path, field_data): |
|
|
"""เติมข้อมูลในฟอร์ม PDF""" |
|
|
try: |
|
|
reader = PdfReader(pdf_path) |
|
|
writer = PdfWriter() |
|
|
|
|
|
|
|
|
for page in reader.pages: |
|
|
writer.add_page(page) |
|
|
|
|
|
|
|
|
if hasattr(writer, 'update_page_form_field_values'): |
|
|
for page_num, page in enumerate(writer.pages): |
|
|
try: |
|
|
writer.update_page_form_field_values(page, field_data) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
elif "/AcroForm" in reader.trailer.get("/Root", {}): |
|
|
try: |
|
|
acro_form = reader.trailer["/Root"]["/AcroForm"] |
|
|
if "/Fields" in acro_form: |
|
|
fields = acro_form["/Fields"] |
|
|
for field in fields: |
|
|
field_obj = field.get_object() |
|
|
if "/T" in field_obj: |
|
|
field_name = str(field_obj["/T"]).strip("()") |
|
|
if field_name in field_data: |
|
|
try: |
|
|
field_obj.update({"/V": field_data[field_name]}) |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return writer |
|
|
except Exception as e: |
|
|
raise Exception(f"ไม่สามารถเติมฟอร์มได้: {str(e)}") |
|
|
|
|
|
def create_simple_pdf(data_row, filename): |
|
|
"""สร้าง PDF ใหม่แบบง่าย""" |
|
|
buffer = io.BytesIO() |
|
|
p = canvas.Canvas(buffer, pagesize=letter) |
|
|
width, height = letter |
|
|
|
|
|
|
|
|
p.setFont("Helvetica", 12) |
|
|
|
|
|
|
|
|
p.setFont("Helvetica-Bold", 16) |
|
|
title = f"Document: {filename.replace('.pdf', '')}" |
|
|
p.drawString(50, height - 50, title) |
|
|
p.line(50, height - 60, 550, height - 60) |
|
|
|
|
|
|
|
|
y_position = height - 100 |
|
|
p.setFont("Helvetica", 12) |
|
|
|
|
|
for column, value in data_row.items(): |
|
|
if pd.notna(value) and str(value).strip(): |
|
|
clean_column = str(column).strip() |
|
|
clean_value = str(value).strip() |
|
|
|
|
|
if len(clean_value) > 80: |
|
|
clean_value = clean_value[:77] + "..." |
|
|
|
|
|
text = f"{clean_column}: {clean_value}" |
|
|
|
|
|
try: |
|
|
p.drawString(50, y_position, text) |
|
|
except: |
|
|
safe_text = text.encode('ascii', errors='ignore').decode('ascii') |
|
|
p.drawString(50, y_position, safe_text) |
|
|
|
|
|
y_position -= 25 |
|
|
|
|
|
if y_position < 50: |
|
|
p.showPage() |
|
|
p.setFont("Helvetica", 12) |
|
|
y_position = height - 50 |
|
|
|
|
|
|
|
|
p.setFont("Helvetica", 8) |
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
p.drawString(50, 30, f"Created: {timestamp}") |
|
|
|
|
|
p.save() |
|
|
buffer.seek(0) |
|
|
return buffer.getvalue() |
|
|
|
|
|
def process_single_row(pdf_path, row_data, filename, use_form=True): |
|
|
"""ประมวลผลแถวเดียว""" |
|
|
try: |
|
|
|
|
|
field_data = {} |
|
|
for column, value in row_data.items(): |
|
|
if pd.notna(value) and str(value).strip(): |
|
|
clean_value = str(value).strip() |
|
|
clean_column = str(column).strip() |
|
|
|
|
|
|
|
|
field_variations = [ |
|
|
clean_column, |
|
|
clean_column.lower(), |
|
|
clean_column.upper(), |
|
|
clean_column.replace('_', ' '), |
|
|
clean_column.replace(' ', '_'), |
|
|
clean_column.replace('-', '_'), |
|
|
clean_column.replace('_', '') |
|
|
] |
|
|
|
|
|
for variation in field_variations: |
|
|
field_data[variation] = clean_value |
|
|
|
|
|
if use_form: |
|
|
try: |
|
|
|
|
|
writer = fill_pdf_form(pdf_path, field_data) |
|
|
|
|
|
output_buffer = io.BytesIO() |
|
|
writer.write(output_buffer) |
|
|
output_buffer.seek(0) |
|
|
return output_buffer.getvalue(), "form_filled" |
|
|
except Exception as e: |
|
|
|
|
|
pdf_content = create_simple_pdf(row_data, filename) |
|
|
return pdf_content, f"new_pdf_created: {str(e)}" |
|
|
else: |
|
|
|
|
|
pdf_content = create_simple_pdf(row_data, filename) |
|
|
return pdf_content, "new_pdf_created" |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"error: {str(e)}" |
|
|
|
|
|
def read_csv_safe(csv_file): |
|
|
"""อ่าน CSV อย่างปลอดภัย""" |
|
|
encodings = ['utf-8', 'utf-8-sig', 'cp874', 'tis-620', 'iso-8859-1', 'cp1252'] |
|
|
separators = [',', ';', '\t', '|'] |
|
|
|
|
|
for encoding in encodings: |
|
|
for sep in separators: |
|
|
try: |
|
|
df = pd.read_csv(csv_file, encoding=encoding, sep=sep, engine='python') |
|
|
if len(df.columns) > 1 and len(df) > 0: |
|
|
return df, None |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
try: |
|
|
df = pd.read_csv(csv_file) |
|
|
return df, None |
|
|
except Exception as e: |
|
|
return None, str(e) |
|
|
|
|
|
def process_pdf_csv(pdf_file, csv_file, filename_column, file_prefix, use_form_fields, progress=gr.Progress()): |
|
|
"""ฟังก์ชันหลักสำหรับประมวลผล PDF และ CSV""" |
|
|
|
|
|
if pdf_file is None or csv_file is None: |
|
|
return None, "❌ กรุณาอัพโหลดไฟล์ PDF และ CSV" |
|
|
|
|
|
try: |
|
|
|
|
|
df, csv_error = read_csv_safe(csv_file) |
|
|
if df is None: |
|
|
return None, f"❌ ไม่สามารถอ่าน CSV ได้: {csv_error}" |
|
|
|
|
|
|
|
|
pdf_fields = analyze_pdf_fields(pdf_file) |
|
|
has_form_fields = bool(pdf_fields and "error" not in pdf_fields and pdf_fields) |
|
|
|
|
|
|
|
|
generated_pdfs = {} |
|
|
success_count = 0 |
|
|
error_count = 0 |
|
|
processing_log = [] |
|
|
|
|
|
|
|
|
for index, (_, row) in enumerate(df.iterrows()): |
|
|
progress((index + 1) / len(df), f"ประมวลผล {index + 1}/{len(df)}") |
|
|
|
|
|
try: |
|
|
|
|
|
if filename_column and filename_column in df.columns and pd.notna(row[filename_column]): |
|
|
safe_name = str(row[filename_column]).strip() |
|
|
safe_name = "".join(c for c in safe_name if c.isalnum() or c in (' ', '-', '_')).strip() |
|
|
filename = f"{file_prefix}_{safe_name}.pdf" |
|
|
else: |
|
|
filename = f"{file_prefix}_{index + 1:03d}.pdf" |
|
|
|
|
|
filename = filename.replace(' ', ' ').replace(' ', '_') |
|
|
if not filename.endswith('.pdf'): |
|
|
filename += '.pdf' |
|
|
|
|
|
|
|
|
pdf_content, status = process_single_row( |
|
|
pdf_file, |
|
|
row, |
|
|
filename, |
|
|
use_form_fields and has_form_fields |
|
|
) |
|
|
|
|
|
if pdf_content is not None: |
|
|
generated_pdfs[filename] = pdf_content |
|
|
success_count += 1 |
|
|
processing_log.append(f"✅ {filename}: {status}") |
|
|
else: |
|
|
error_count += 1 |
|
|
processing_log.append(f"❌ {filename}: {status}") |
|
|
|
|
|
except Exception as e: |
|
|
error_count += 1 |
|
|
processing_log.append(f"💥 แถว {index + 1}: {str(e)}") |
|
|
|
|
|
|
|
|
if generated_pdfs: |
|
|
zip_buffer = io.BytesIO() |
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: |
|
|
for filename, pdf_content in generated_pdfs.items(): |
|
|
zip_file.writestr(filename, pdf_content) |
|
|
|
|
|
zip_buffer.seek(0) |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
zip_filename = f"generated_pdfs_{timestamp}.zip" |
|
|
|
|
|
|
|
|
temp_zip_path = os.path.join(tempfile.gettempdir(), zip_filename) |
|
|
with open(temp_zip_path, 'wb') as f: |
|
|
f.write(zip_buffer.getvalue()) |
|
|
|
|
|
result_message = f"✅ สร้าง PDF สำเร็จ {success_count} ไฟล์!" |
|
|
if error_count > 0: |
|
|
result_message += f"\n⚠️ มีข้อผิดพลาด {error_count} ไฟล์" |
|
|
|
|
|
result_message += f"\n\n📋 รายละเอียด:\n" + "\n".join(processing_log[:10]) |
|
|
if len(processing_log) > 10: |
|
|
result_message += f"\n... และอีก {len(processing_log) - 10} รายการ" |
|
|
|
|
|
return temp_zip_path, result_message |
|
|
else: |
|
|
return None, "❌ ไม่สามารถสร้าง PDF ได้เลย" |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"❌ เกิดข้อผิดพลาด: {str(e)}\n{traceback.format_exc()}" |
|
|
|
|
|
def analyze_pdf_info(pdf_file): |
|
|
"""วิเคราะห์ข้อมูล PDF""" |
|
|
if pdf_file is None: |
|
|
return "ไม่มีไฟล์ PDF" |
|
|
|
|
|
try: |
|
|
reader = PdfReader(pdf_file) |
|
|
info = f"📄 **ข้อมูล PDF:**\n" |
|
|
info += f"- จำนวนหน้า: {len(reader.pages)}\n" |
|
|
|
|
|
|
|
|
pdf_fields = analyze_pdf_fields(pdf_file) |
|
|
|
|
|
if pdf_fields and "error" not in pdf_fields and pdf_fields: |
|
|
info += f"- จำนวน Form Fields: {len(pdf_fields)}\n" |
|
|
info += f"\n🏷️ **รายชื่อ Fields:**\n" |
|
|
for name, details in list(pdf_fields.items())[:10]: |
|
|
info += f" - {name} ({details.get('type', 'Unknown')})\n" |
|
|
if len(pdf_fields) > 10: |
|
|
info += f" - ... และอีก {len(pdf_fields) - 10} fields\n" |
|
|
else: |
|
|
info += "- Form Fields: ไม่พบหรือไม่สามารถอ่านได้\n" |
|
|
info += "- หมายเหตุ: จะสร้าง PDF ใหม่แทน\n" |
|
|
|
|
|
return info |
|
|
except Exception as e: |
|
|
return f"❌ ไม่สามารถวิเคราะห์ PDF ได้: {str(e)}" |
|
|
|
|
|
def analyze_csv_info(csv_file): |
|
|
"""วิเคราะห์ข้อมูล CSV""" |
|
|
if csv_file is None: |
|
|
return "ไม่มีไฟล์ CSV" |
|
|
|
|
|
try: |
|
|
df, error = read_csv_safe(csv_file) |
|
|
if df is None: |
|
|
return f"❌ ไม่สามารถอ่าน CSV ได้: {error}" |
|
|
|
|
|
info = f"📋 **ข้อมูล CSV:**\n" |
|
|
info += f"- จำนวนแถว: {len(df)}\n" |
|
|
info += f"- จำนวนคอลัมน์: {len(df.columns)}\n" |
|
|
info += f"\n📝 **รายชื่อคอลัมน์:**\n" |
|
|
|
|
|
for col in df.columns[:15]: |
|
|
info += f" - {col}\n" |
|
|
if len(df.columns) > 15: |
|
|
info += f" - ... และอีก {len(df.columns) - 15} คอลัมน์\n" |
|
|
|
|
|
|
|
|
missing_data = df.isnull().sum() |
|
|
if missing_data.any(): |
|
|
missing_cols = missing_data[missing_data > 0] |
|
|
if len(missing_cols) > 0: |
|
|
info += f"\n⚠️ **ข้อมูลที่ขาดหาย:**\n" |
|
|
for col, count in missing_cols.head(5).items(): |
|
|
info += f" - {col}: {count} แถว\n" |
|
|
|
|
|
return info |
|
|
except Exception as e: |
|
|
return f"❌ ไม่สามารถวิเคราะห์ CSV ได้: {str(e)}" |
|
|
|
|
|
|
|
|
def create_interface(): |
|
|
with gr.Blocks(title="PDF Form Filler", theme=gr.themes.Soft()) as app: |
|
|
gr.Markdown(""" |
|
|
# 📄 เครื่องมือเติมข้อมูล PDF จาก CSV |
|
|
|
|
|
**เครื่องมือนี้สามารถ:** |
|
|
- เติมข้อมูลลงในฟอร์ม PDF ที่มี form fields |
|
|
- สร้าง PDF ใหม่หากไม่มี form fields หรือเติมไม่ได้ |
|
|
- รองรับ CSV หลาย encoding (UTF-8, TIS-620, CP874, etc.) |
|
|
- ส่งออกเป็นไฟล์ ZIP |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("## 📁 อัพโหลดไฟล์") |
|
|
|
|
|
pdf_file = gr.File( |
|
|
label="PDF Template", |
|
|
file_types=[".pdf"], |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
csv_file = gr.File( |
|
|
label="CSV Data", |
|
|
file_types=[".csv"], |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
gr.Markdown("## ⚙️ ตั้งค่า") |
|
|
|
|
|
filename_column = gr.Textbox( |
|
|
label="คอลัมน์สำหรับชื่อไฟล์ (ถ้ามี)", |
|
|
placeholder="เช่น name, id, etc.", |
|
|
value="" |
|
|
) |
|
|
|
|
|
file_prefix = gr.Textbox( |
|
|
label="คำนำหน้าชื่อไฟล์", |
|
|
value="document" |
|
|
) |
|
|
|
|
|
use_form_fields = gr.Checkbox( |
|
|
label="ใช้ Form Fields (ถ้าพบ)", |
|
|
value=True |
|
|
) |
|
|
|
|
|
process_btn = gr.Button( |
|
|
"🚀 สร้าง PDF ทั้งหมด", |
|
|
variant="primary", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
gr.Markdown("## 📊 ข้อมูลไฟล์") |
|
|
|
|
|
pdf_info = gr.Markdown("ยังไม่มีไฟล์ PDF") |
|
|
csv_info = gr.Markdown("ยังไม่มีไฟล์ CSV") |
|
|
|
|
|
gr.Markdown("## 📥 ผลลัพธ์") |
|
|
|
|
|
result_file = gr.File( |
|
|
label="ไฟล์ ZIP ที่สร้าง", |
|
|
visible=False |
|
|
) |
|
|
|
|
|
result_message = gr.Markdown("") |
|
|
|
|
|
|
|
|
pdf_file.change( |
|
|
fn=analyze_pdf_info, |
|
|
inputs=[pdf_file], |
|
|
outputs=[pdf_info] |
|
|
) |
|
|
|
|
|
csv_file.change( |
|
|
fn=analyze_csv_info, |
|
|
inputs=[csv_file], |
|
|
outputs=[csv_info] |
|
|
) |
|
|
|
|
|
process_btn.click( |
|
|
fn=process_pdf_csv, |
|
|
inputs=[ |
|
|
pdf_file, |
|
|
csv_file, |
|
|
filename_column, |
|
|
file_prefix, |
|
|
use_form_fields |
|
|
], |
|
|
outputs=[result_file, result_message] |
|
|
).then( |
|
|
fn=lambda x: gr.update(visible=x is not None), |
|
|
inputs=[result_file], |
|
|
outputs=[result_file] |
|
|
) |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app = create_interface() |
|
|
app.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=True, |
|
|
debug=True |
|
|
) |