Bayhaqy commited on
Commit
50311a7
·
verified ·
1 Parent(s): ff94440

Update invoice_extraction.py

Browse files
Files changed (1) hide show
  1. invoice_extraction.py +91 -115
invoice_extraction.py CHANGED
@@ -4,143 +4,119 @@ import fitz # PyMuPDF
4
  import json
5
  import re
6
  import os
7
- import textwrap
8
- from concurrent.futures import ThreadPoolExecutor, as_completed
9
 
10
- # --- INISIALISASI MODEL GEMINI ---
11
- def initialize_gemini(api_key):
12
- """Initializes the Gemini model with the provided API key."""
13
  try:
14
  genai.configure(api_key=api_key)
15
- model = genai.GenerativeModel('gemini-1.5-flash-latest')
16
- print("✅ Model Gemini berhasil diinisialisasi.")
17
  return model
18
  except Exception as e:
19
  print(f"❌ Gagal mengkonfigurasi Gemini: {e}")
20
  return None
21
 
22
  def extract_text_from_pdf(file_path):
23
- """Mengekstrak seluruh teks dari file PDF menggunakan PyMuPDF."""
24
- if not os.path.exists(file_path):
25
- print(f"❌ File not found at: {file_path}")
26
- return None
27
  try:
28
- doc = fitz.open(file_path)
29
- combined_text = ""
30
- for page_num in range(len(doc)):
31
- page = doc.load_page(page_num)
32
- pymupdf_text = page.get_text()
33
- combined_text += f"\n--- Page {page_num + 1} ---\n{pymupdf_text}"
34
- return combined_text
35
  except Exception as e:
36
- print(f"Gagal mengekstrak teks dari PDF '{os.path.basename(file_path)}': {e}")
37
- return None
38
 
39
- def get_invoice_data_with_gemini(pdf_text, model):
40
- """Mengirim teks ke Gemini dan meminta ekstraksi data dalam format JSON."""
41
  if not model:
42
- print("❌ Model Gemini tidak diinisialisasi. Melewatkan ekstraksi data.")
43
- return None
44
- prompt = """
45
- You are a highly accurate data parser. Based on the text of the following invoice, extract the requested information.
46
-
47
- Invoice Text:
48
- ---
49
- {text_input}
50
- ---
51
-
52
- Your Task:
53
- 1. Identify global information such as invoice number, ETA date, vendor name, and currency.
54
- 2. Identify each line item in the invoice.
55
- 3. Extract the following data and return it ONLY in a single JSON object format. Do not add ```json``` or any other text outside the JSON.
56
-
57
- Desired JSON Format:
58
- {{
59
- "vendor_name": "INVOICE SENDER (USUALLY IN THE TOP LEFT CORNER)",
60
- "invoice_number": "INVOICE NUMBER",
61
- "estimate_time_arrival": "YYYY-MM-DD",
62
- "currency": "APPROXIMATELY WHAT CURRENCY (e.g., USD)",
63
- "items": [
64
- {{
65
- "invoice_po_number": "FULL PO NUMBER",
66
- "style_code": "ITEM CODE/STYLE/BUYER ITEM",
67
- "size": "ITEM SIZE",
68
- "item_qty": QUANTITY_OF_ITEMS_INTEGER,
69
- "pack_qty": QUANTITY_PER_PACK_INTEGER,
70
- "total_pack_qty": TOTAL_CARTON_INTEGER,
71
- "unit_price": FLOAT_UNIT_PRICE,
72
- "amount_price": TOTAL_PRICE_PER_ITEM_FLOAT (IF VALUE NOT FOUND, USE item_qty * unit_price)
73
- }}
74
- ]
75
- }}
76
-
77
- Make sure all numeric values (qty, price) are integers or floats, not strings.
78
- If you are unsure about a value, return null or 0 for numbers.
79
- """
80
  try:
81
- chunk_size = 15000
82
- text_chunks = textwrap.wrap(pdf_text, chunk_size, break_long_words=False, replace_whitespace=False)
83
- if not text_chunks:
84
- return None
85
-
86
- response = model.generate_content(prompt.format(text_input=text_chunks[0]))
87
  clean_response = re.sub(r'```json\n?|```', '', response.text.strip())
88
- invoice_data = json.loads(clean_response)
89
-
90
- if 'items' not in invoice_data:
91
- invoice_data['items'] = [] # Ensure items key exists
92
-
93
- return invoice_data
94
- except (json.JSONDecodeError, Exception) as e:
95
- print(f"❌ Terjadi kesalahan saat berkomunikasi dengan Gemini: {e}")
96
- return None
97
 
98
- def process_single_invoice(pdf_file_path, model):
99
- """Memproses satu file PDF invoice dan mengembalikan DataFrame."""
 
 
100
  file_name = os.path.basename(pdf_file_path)
101
- print(f"\n--- Memproses file: {file_name} ---")
102
-
103
- brand_match = re.match(r'^(\S+)', file_name)
104
- brand_name = brand_match.group(1) if brand_match else "Unknown"
105
-
106
- invoice_text = extract_text_from_pdf(pdf_file_path)
107
- if not invoice_text:
108
- return pd.DataFrame([{"file_name": file_name, "error": "Gagal mengekstrak teks dari PDF"}])
109
-
110
- invoice_data = get_invoice_data_with_gemini(invoice_text, model)
111
- if not invoice_data:
112
- return pd.DataFrame([{"file_name": file_name, "error": "Gagal mendapatkan data dari Gemini"}])
113
 
114
- items_list = invoice_data.get('items', [])
115
- if not items_list:
116
- # Jika tidak ada item, proses data global saja
117
- global_data = {
118
- "brand": brand_name,
119
- "vendor_name": invoice_data.get('vendor_name'),
120
- "invoice_number": invoice_data.get('invoice_number'),
121
- "estimate_time_arrival": invoice_data.get('estimate_time_arrival'),
122
- "currency": invoice_data.get('currency'),
123
- "file_name": file_name
124
- }
125
- items_list.append(global_data)
126
-
127
-
128
- for item in items_list:
129
- item['brand'] = brand_name
130
- item['vendor_name'] = invoice_data.get('vendor_name')
131
- item['invoice_number'] = invoice_data.get('invoice_number')
132
- item['estimate_time_arrival'] = invoice_data.get('estimate_time_arrival')
133
- item['currency'] = invoice_data.get('currency')
134
- item['file_name'] = file_name
135
-
136
- df = pd.DataFrame(items_list)
137
  desired_columns = [
138
  "brand", "vendor_name", "invoice_number", "estimate_time_arrival", "currency",
139
  "invoice_po_number", "style_code", "size", "item_qty", "pack_qty",
140
- "total_pack_qty", "unit_price", "amount_price", "file_name"
141
  ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
142
  for col in desired_columns:
143
  if col not in df.columns:
144
  df[col] = None
145
-
146
- return df.reindex(columns=desired_columns)
 
 
4
  import json
5
  import re
6
  import os
 
 
7
 
8
+ def initialize_gemini(api_key, model_name):
9
+ """Inisialisasi model Gemini."""
 
10
  try:
11
  genai.configure(api_key=api_key)
12
+ model = genai.GenerativeModel(model_name)
 
13
  return model
14
  except Exception as e:
15
  print(f"❌ Gagal mengkonfigurasi Gemini: {e}")
16
  return None
17
 
18
  def extract_text_from_pdf(file_path):
19
+ """Ekstrak teks dari file PDF."""
 
 
 
20
  try:
21
+ with fitz.open(file_path) as doc:
22
+ return "".join(page.get_text() for page in doc), None
 
 
 
 
 
23
  except Exception as e:
24
+ return None, f"Gagal membaca file PDF: {e}"
 
25
 
26
+ def get_invoice_data_with_gemini(pdf_text, model, prompt):
27
+ """Kirim teks dan prompt ke Gemini."""
28
  if not model:
29
+ return None, "Model Gemini tidak diinisialisasi."
30
+
31
+ full_prompt = prompt.format(text_input=pdf_text)
32
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  try:
34
+ response = model.generate_content(full_prompt, request_options={'timeout': 120})
 
 
 
 
 
35
  clean_response = re.sub(r'```json\n?|```', '', response.text.strip())
36
+ return json.loads(clean_response), None
37
+ except json.JSONDecodeError:
38
+ # Menangkap respons mentah untuk debugging jika parsing JSON gagal
39
+ raw_response = "Tidak ada respons"
40
+ if 'response' in locals() and hasattr(response, 'text'):
41
+ raw_response = response.text
42
+ return None, f"Gagal mem-parsing JSON dari respons Gemini. Respons mentah: {raw_response[:200]}..."
43
+ except Exception as e:
44
+ return None, f"Error saat berkomunikasi dengan Gemini: {e}"
45
 
46
+ def process_single_invoice(pdf_file_path, model, prompt):
47
+ """
48
+ Proses satu file invoice dan SELALU kembalikan DataFrame yang valid dan terstruktur.
49
+ """
50
  file_name = os.path.basename(pdf_file_path)
51
+ brand_name = re.match(r'^(\S+)', file_name).group(1) if re.match(r'^(\S+)', file_name) else "Unknown"
 
 
 
 
 
 
 
 
 
 
 
52
 
53
+ # DEFINISI STRUKTUR DATA YANG KONSISTEN
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  desired_columns = [
55
  "brand", "vendor_name", "invoice_number", "estimate_time_arrival", "currency",
56
  "invoice_po_number", "style_code", "size", "item_qty", "pack_qty",
57
+ "total_pack_qty", "unit_price", "amount_price", "file_name", "error"
58
  ]
59
+
60
+ # Fungsi untuk membuat baris error yang konsisten
61
+ def create_error_row(error_message):
62
+ data = {col: None for col in desired_columns}
63
+ data.update({"file_name": file_name, "error": error_message, "brand": brand_name})
64
+ return pd.DataFrame([data])
65
+
66
+ # 1. Ekstrak Teks
67
+ invoice_text, err = extract_text_from_pdf(pdf_file_path)
68
+ if err:
69
+ return create_error_row(err)
70
+
71
+ # 2. Ekstrak data via Gemini
72
+ invoice_data, err = get_invoice_data_with_gemini(invoice_text, model, prompt)
73
+ if err:
74
+ return create_error_row(err)
75
+ if not isinstance(invoice_data, dict):
76
+ return create_error_row("Respons dari Gemini bukan format JSON (dict) yang valid.")
77
+
78
+ # 3. Proses dan Bentuk DataFrame
79
+ items_list = invoice_data.get('items', [])
80
+
81
+ processed_rows = []
82
+ # Jika tidak ada item, buat satu baris dari data global
83
+ if not items_list or not isinstance(items_list, list):
84
+ base_data = {
85
+ 'brand': brand_name,
86
+ 'vendor_name': invoice_data.get('vendor_name'),
87
+ 'invoice_number': invoice_data.get('invoice_number'),
88
+ 'estimate_time_arrival': invoice_data.get('estimate_time_arrival'),
89
+ 'currency': invoice_data.get('currency'),
90
+ 'file_name': file_name,
91
+ 'error': "Tidak ada 'items' yang ditemukan dalam respons" if not items_list else None
92
+ }
93
+ processed_rows.append(base_data)
94
+ else:
95
+ # Proses setiap item dalam daftar
96
+ for item in items_list:
97
+ if isinstance(item, dict):
98
+ row_data = {
99
+ 'brand': brand_name,
100
+ 'vendor_name': invoice_data.get('vendor_name'),
101
+ 'invoice_number': invoice_data.get('invoice_number'),
102
+ 'estimate_time_arrival': invoice_data.get('estimate_time_arrival'),
103
+ 'currency': invoice_data.get('currency'),
104
+ 'file_name': file_name,
105
+ 'error': None
106
+ }
107
+ row_data.update(item)
108
+ processed_rows.append(row_data)
109
+
110
+ if not processed_rows:
111
+ return create_error_row("Data 'items' kosong atau formatnya tidak valid.")
112
+
113
+ # Buat DataFrame dari baris yang diproses
114
+ df = pd.DataFrame(processed_rows)
115
+
116
+ # Pastikan semua kolom yang diinginkan ada, jika tidak tambahkan dengan nilai None
117
  for col in desired_columns:
118
  if col not in df.columns:
119
  df[col] = None
120
+
121
+ # Kembalikan DataFrame dengan urutan kolom yang benar
122
+ return df[desired_columns]