Spaces:
Running
Running
| import gradio as gr | |
| import pandas as pd | |
| import io | |
| import zipfile | |
| from datetime import datetime | |
| import traceback | |
| import tempfile | |
| import os | |
| # ติดตั้ง dependencies ที่จำเป็น | |
| try: | |
| from PyPDF2 import PdfReader, PdfWriter | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfbase import pdfmetrics | |
| from reportlab.pdfbase.ttfonts import TTFont | |
| except ImportError as e: | |
| print(f"กำลังติดตั้ง dependencies: {e}") | |
| import subprocess | |
| import sys | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "PyPDF2", "reportlab", "pandas"]) | |
| from PyPDF2 import PdfReader, PdfWriter | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.pagesizes import letter | |
| def analyze_pdf_fields(pdf_path): | |
| """วิเคราะห์ฟิลด์ใน PDF""" | |
| try: | |
| reader = PdfReader(pdf_path) | |
| all_fields = {} | |
| # ตรวจสอบจาก AcroForm | |
| if reader.trailer.get("/Root") and reader.trailer["/Root"].get("/AcroForm"): | |
| acro_form = reader.trailer["/Root"]["/AcroForm"] | |
| if "/Fields" in acro_form: | |
| fields = acro_form["/Fields"] | |
| for field in fields: | |
| field_obj = field.get_object() | |
| if "/T" in field_obj: | |
| field_name = str(field_obj["/T"]).strip("()") | |
| field_type = str(field_obj.get("/FT", "Unknown")) | |
| all_fields[field_name] = { | |
| 'type': field_type, | |
| 'method': 'AcroForm' | |
| } | |
| # ตรวจสอบจาก Annotations | |
| for page_num, page in enumerate(reader.pages): | |
| if "/Annots" in page: | |
| try: | |
| annotations = page["/Annots"] | |
| for annotation in annotations: | |
| annot_obj = annotation.get_object() | |
| if annot_obj.get("/Subtype") == "/Widget": | |
| if "/T" in annot_obj: | |
| field_name = str(annot_obj["/T"]).strip("()") | |
| field_type = str(annot_obj.get("/FT", "Widget")) | |
| all_fields[field_name] = { | |
| 'type': field_type, | |
| 'page': page_num + 1, | |
| 'method': 'Annotation' | |
| } | |
| except Exception: | |
| continue | |
| return all_fields | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def fill_pdf_form(pdf_path, field_data): | |
| """เติมข้อมูลในฟอร์ม PDF""" | |
| try: | |
| reader = PdfReader(pdf_path) | |
| writer = PdfWriter() | |
| # คัดลอกหน้าทั้งหมด | |
| for page in reader.pages: | |
| writer.add_page(page) | |
| # เติมข้อมูลในฟอร์ม | |
| if hasattr(writer, 'update_page_form_field_values'): | |
| for page_num, page in enumerate(writer.pages): | |
| try: | |
| writer.update_page_form_field_values(page, field_data) | |
| except Exception: | |
| pass | |
| # ลองวิธีอื่น | |
| elif "/AcroForm" in reader.trailer.get("/Root", {}): | |
| try: | |
| acro_form = reader.trailer["/Root"]["/AcroForm"] | |
| if "/Fields" in acro_form: | |
| fields = acro_form["/Fields"] | |
| for field in fields: | |
| field_obj = field.get_object() | |
| if "/T" in field_obj: | |
| field_name = str(field_obj["/T"]).strip("()") | |
| if field_name in field_data: | |
| try: | |
| field_obj.update({"/V": field_data[field_name]}) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| return writer | |
| except Exception as e: | |
| raise Exception(f"ไม่สามารถเติมฟอร์มได้: {str(e)}") | |
| def create_simple_pdf(data_row, filename): | |
| """สร้าง PDF ใหม่แบบง่าย""" | |
| buffer = io.BytesIO() | |
| p = canvas.Canvas(buffer, pagesize=letter) | |
| width, height = letter | |
| # ตั้งค่า font | |
| p.setFont("Helvetica", 12) | |
| # หัวเรื่อง | |
| p.setFont("Helvetica-Bold", 16) | |
| title = f"Document: {filename.replace('.pdf', '')}" | |
| p.drawString(50, height - 50, title) | |
| p.line(50, height - 60, 550, height - 60) | |
| # เนื้อหา | |
| y_position = height - 100 | |
| p.setFont("Helvetica", 12) | |
| for column, value in data_row.items(): | |
| if pd.notna(value) and str(value).strip(): | |
| clean_column = str(column).strip() | |
| clean_value = str(value).strip() | |
| if len(clean_value) > 80: | |
| clean_value = clean_value[:77] + "..." | |
| text = f"{clean_column}: {clean_value}" | |
| try: | |
| p.drawString(50, y_position, text) | |
| except: | |
| safe_text = text.encode('ascii', errors='ignore').decode('ascii') | |
| p.drawString(50, y_position, safe_text) | |
| y_position -= 25 | |
| if y_position < 50: | |
| p.showPage() | |
| p.setFont("Helvetica", 12) | |
| y_position = height - 50 | |
| # เวลาที่สร้าง | |
| p.setFont("Helvetica", 8) | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| p.drawString(50, 30, f"Created: {timestamp}") | |
| p.save() | |
| buffer.seek(0) | |
| return buffer.getvalue() | |
| def process_single_row(pdf_path, row_data, filename, use_form=True): | |
| """ประมวลผลแถวเดียว""" | |
| try: | |
| # เตรียมข้อมูลฟิลด์ | |
| field_data = {} | |
| for column, value in row_data.items(): | |
| if pd.notna(value) and str(value).strip(): | |
| clean_value = str(value).strip() | |
| clean_column = str(column).strip() | |
| # ลองหลายรูปแบบของชื่อฟิลด์ | |
| field_variations = [ | |
| clean_column, | |
| clean_column.lower(), | |
| clean_column.upper(), | |
| clean_column.replace('_', ' '), | |
| clean_column.replace(' ', '_'), | |
| clean_column.replace('-', '_'), | |
| clean_column.replace('_', '') | |
| ] | |
| for variation in field_variations: | |
| field_data[variation] = clean_value | |
| if use_form: | |
| try: | |
| # ลองเติมฟอร์ม | |
| writer = fill_pdf_form(pdf_path, field_data) | |
| output_buffer = io.BytesIO() | |
| writer.write(output_buffer) | |
| output_buffer.seek(0) | |
| return output_buffer.getvalue(), "form_filled" | |
| except Exception as e: | |
| # ถ้าไม่ได้ ให้สร้างใหม่ | |
| pdf_content = create_simple_pdf(row_data, filename) | |
| return pdf_content, f"new_pdf_created: {str(e)}" | |
| else: | |
| # สร้าง PDF ใหม่ | |
| pdf_content = create_simple_pdf(row_data, filename) | |
| return pdf_content, "new_pdf_created" | |
| except Exception as e: | |
| return None, f"error: {str(e)}" | |
| def read_csv_safe(csv_file): | |
| """อ่าน CSV อย่างปลอดภัย""" | |
| encodings = ['utf-8', 'utf-8-sig', 'cp874', 'tis-620', 'iso-8859-1', 'cp1252'] | |
| separators = [',', ';', '\t', '|'] | |
| for encoding in encodings: | |
| for sep in separators: | |
| try: | |
| df = pd.read_csv(csv_file, encoding=encoding, sep=sep, engine='python') | |
| if len(df.columns) > 1 and len(df) > 0: | |
| return df, None | |
| except Exception: | |
| continue | |
| try: | |
| df = pd.read_csv(csv_file) | |
| return df, None | |
| except Exception as e: | |
| return None, str(e) | |
| def process_pdf_csv(pdf_file, csv_file, filename_column, file_prefix, use_form_fields, progress=gr.Progress()): | |
| """ฟังก์ชันหลักสำหรับประมวลผล PDF และ CSV""" | |
| if pdf_file is None or csv_file is None: | |
| return None, "❌ กรุณาอัพโหลดไฟล์ PDF และ CSV" | |
| try: | |
| # อ่าน CSV | |
| df, csv_error = read_csv_safe(csv_file) | |
| if df is None: | |
| return None, f"❌ ไม่สามารถอ่าน CSV ได้: {csv_error}" | |
| # วิเคราะห์ PDF | |
| pdf_fields = analyze_pdf_fields(pdf_file) | |
| has_form_fields = bool(pdf_fields and "error" not in pdf_fields and pdf_fields) | |
| # เก็บ PDF ที่สร้าง | |
| generated_pdfs = {} | |
| success_count = 0 | |
| error_count = 0 | |
| processing_log = [] | |
| # ประมวลผลแต่ละแถว | |
| for index, (_, row) in enumerate(df.iterrows()): | |
| progress((index + 1) / len(df), f"ประมวลผล {index + 1}/{len(df)}") | |
| try: | |
| # สร้างชื่อไฟล์ | |
| if filename_column and filename_column in df.columns and pd.notna(row[filename_column]): | |
| safe_name = str(row[filename_column]).strip() | |
| safe_name = "".join(c for c in safe_name if c.isalnum() or c in (' ', '-', '_')).strip() | |
| filename = f"{file_prefix}_{safe_name}.pdf" | |
| else: | |
| filename = f"{file_prefix}_{index + 1:03d}.pdf" | |
| filename = filename.replace(' ', ' ').replace(' ', '_') | |
| if not filename.endswith('.pdf'): | |
| filename += '.pdf' | |
| # ประมวลผล | |
| pdf_content, status = process_single_row( | |
| pdf_file, | |
| row, | |
| filename, | |
| use_form_fields and has_form_fields | |
| ) | |
| if pdf_content is not None: | |
| generated_pdfs[filename] = pdf_content | |
| success_count += 1 | |
| processing_log.append(f"✅ {filename}: {status}") | |
| else: | |
| error_count += 1 | |
| processing_log.append(f"❌ {filename}: {status}") | |
| except Exception as e: | |
| error_count += 1 | |
| processing_log.append(f"💥 แถว {index + 1}: {str(e)}") | |
| # สร้าง ZIP | |
| if generated_pdfs: | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for filename, pdf_content in generated_pdfs.items(): | |
| zip_file.writestr(filename, pdf_content) | |
| zip_buffer.seek(0) | |
| # สร้างชื่อไฟล์ ZIP | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| zip_filename = f"generated_pdfs_{timestamp}.zip" | |
| # บันทึกไฟล์ชั่วคราว | |
| temp_zip_path = os.path.join(tempfile.gettempdir(), zip_filename) | |
| with open(temp_zip_path, 'wb') as f: | |
| f.write(zip_buffer.getvalue()) | |
| result_message = f"✅ สร้าง PDF สำเร็จ {success_count} ไฟล์!" | |
| if error_count > 0: | |
| result_message += f"\n⚠️ มีข้อผิดพลาด {error_count} ไฟล์" | |
| result_message += f"\n\n📋 รายละเอียด:\n" + "\n".join(processing_log[:10]) | |
| if len(processing_log) > 10: | |
| result_message += f"\n... และอีก {len(processing_log) - 10} รายการ" | |
| return temp_zip_path, result_message | |
| else: | |
| return None, "❌ ไม่สามารถสร้าง PDF ได้เลย" | |
| except Exception as e: | |
| return None, f"❌ เกิดข้อผิดพลาด: {str(e)}\n{traceback.format_exc()}" | |
| def analyze_pdf_info(pdf_file): | |
| """วิเคราะห์ข้อมูล PDF""" | |
| if pdf_file is None: | |
| return "ไม่มีไฟล์ PDF" | |
| try: | |
| reader = PdfReader(pdf_file) | |
| info = f"📄 **ข้อมูล PDF:**\n" | |
| info += f"- จำนวนหน้า: {len(reader.pages)}\n" | |
| # ตรวจสอบฟิลด์ | |
| pdf_fields = analyze_pdf_fields(pdf_file) | |
| if pdf_fields and "error" not in pdf_fields and pdf_fields: | |
| info += f"- จำนวน Form Fields: {len(pdf_fields)}\n" | |
| info += f"\n🏷️ **รายชื่อ Fields:**\n" | |
| for name, details in list(pdf_fields.items())[:10]: # แสดงแค่ 10 ตัวแรก | |
| info += f" - {name} ({details.get('type', 'Unknown')})\n" | |
| if len(pdf_fields) > 10: | |
| info += f" - ... และอีก {len(pdf_fields) - 10} fields\n" | |
| else: | |
| info += "- Form Fields: ไม่พบหรือไม่สามารถอ่านได้\n" | |
| info += "- หมายเหตุ: จะสร้าง PDF ใหม่แทน\n" | |
| return info | |
| except Exception as e: | |
| return f"❌ ไม่สามารถวิเคราะห์ PDF ได้: {str(e)}" | |
| def analyze_csv_info(csv_file): | |
| """วิเคราะห์ข้อมูล CSV""" | |
| if csv_file is None: | |
| return "ไม่มีไฟล์ CSV" | |
| try: | |
| df, error = read_csv_safe(csv_file) | |
| if df is None: | |
| return f"❌ ไม่สามารถอ่าน CSV ได้: {error}" | |
| info = f"📋 **ข้อมูล CSV:**\n" | |
| info += f"- จำนวนแถว: {len(df)}\n" | |
| info += f"- จำนวนคอลัมน์: {len(df.columns)}\n" | |
| info += f"\n📝 **รายชื่อคอลัมน์:**\n" | |
| for col in df.columns[:15]: # แสดงแค่ 15 คอลัมน์แรก | |
| info += f" - {col}\n" | |
| if len(df.columns) > 15: | |
| info += f" - ... และอีก {len(df.columns) - 15} คอลัมน์\n" | |
| # ตรวจสอบข้อมูลที่ขาด | |
| missing_data = df.isnull().sum() | |
| if missing_data.any(): | |
| missing_cols = missing_data[missing_data > 0] | |
| if len(missing_cols) > 0: | |
| info += f"\n⚠️ **ข้อมูลที่ขาดหาย:**\n" | |
| for col, count in missing_cols.head(5).items(): | |
| info += f" - {col}: {count} แถว\n" | |
| return info | |
| except Exception as e: | |
| return f"❌ ไม่สามารถวิเคราะห์ CSV ได้: {str(e)}" | |
| # สร้าง Gradio Interface | |
| def create_interface(): | |
| with gr.Blocks(title="PDF Form Filler", theme=gr.themes.Soft()) as app: | |
| gr.Markdown(""" | |
| # 📄 เครื่องมือเติมข้อมูล PDF จาก CSV | |
| **เครื่องมือนี้สามารถ:** | |
| - เติมข้อมูลลงในฟอร์ม PDF ที่มี form fields | |
| - สร้าง PDF ใหม่หากไม่มี form fields หรือเติมไม่ได้ | |
| - รองรับ CSV หลาย encoding (UTF-8, TIS-620, CP874, etc.) | |
| - ส่งออกเป็นไฟล์ ZIP | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("## 📁 อัพโหลดไฟล์") | |
| pdf_file = gr.File( | |
| label="PDF Template", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| csv_file = gr.File( | |
| label="CSV Data", | |
| file_types=[".csv"], | |
| type="filepath" | |
| ) | |
| gr.Markdown("## ⚙️ ตั้งค่า") | |
| filename_column = gr.Textbox( | |
| label="คอลัมน์สำหรับชื่อไฟล์ (ถ้ามี)", | |
| placeholder="เช่น name, id, etc.", | |
| value="" | |
| ) | |
| file_prefix = gr.Textbox( | |
| label="คำนำหน้าชื่อไฟล์", | |
| value="document" | |
| ) | |
| use_form_fields = gr.Checkbox( | |
| label="ใช้ Form Fields (ถ้าพบ)", | |
| value=True | |
| ) | |
| process_btn = gr.Button( | |
| "🚀 สร้าง PDF ทั้งหมด", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=2): | |
| gr.Markdown("## 📊 ข้อมูลไฟล์") | |
| pdf_info = gr.Markdown("ยังไม่มีไฟล์ PDF") | |
| csv_info = gr.Markdown("ยังไม่มีไฟล์ CSV") | |
| gr.Markdown("## 📥 ผลลัพธ์") | |
| result_file = gr.File( | |
| label="ไฟล์ ZIP ที่สร้าง", | |
| visible=False | |
| ) | |
| result_message = gr.Markdown("") | |
| # Event handlers | |
| pdf_file.change( | |
| fn=analyze_pdf_info, | |
| inputs=[pdf_file], | |
| outputs=[pdf_info] | |
| ) | |
| csv_file.change( | |
| fn=analyze_csv_info, | |
| inputs=[csv_file], | |
| outputs=[csv_info] | |
| ) | |
| process_btn.click( | |
| fn=process_pdf_csv, | |
| inputs=[ | |
| pdf_file, | |
| csv_file, | |
| filename_column, | |
| file_prefix, | |
| use_form_fields | |
| ], | |
| outputs=[result_file, result_message] | |
| ).then( | |
| fn=lambda x: gr.update(visible=x is not None), | |
| inputs=[result_file], | |
| outputs=[result_file] | |
| ) | |
| return app | |
| # รันแอป | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, # สร้าง public URL | |
| debug=True | |
| ) |