MikeMai's picture
Update app.py
dc7b3c3 verified
raw
history blame
42.8 kB
import os
from dotenv import load_dotenv
load_dotenv()
import json
import pandas as pd
import zipfile
import xml.etree.ElementTree as ET
from io import BytesIO
import openpyxl
from openai import OpenAI
import re
import logging
from pydantic import BaseModel, Field, ValidationError, RootModel
from typing import List, Optional
HF_API_KEY = os.getenv("HF_API_KEY")
# Deepseek R1 Distilled Qwen 2.5 14B --------------------------------
# base_url = "https://router.huggingface.co/novita"
# model = "deepseek/deepseek-r1-distill-qwen-14b"
# Deepseek R1 Distilled Qwen 2.5 32B --------------------------------
# base_url = "https://router.huggingface.co/hf-inference/models/deepseek-ai/DeepSeek-R1-Distill-Qwen-32B/v1"
# model = "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B"
# Qwen 2.5 7B --------------------------------------------------------
base_url = "https://router.huggingface.co/together/v1"
model="Qwen/Qwen2.5-7B-Instruct-Turbo"
# Qwen 2.5 32B --------------------------------------------------------
# base_url = "https://router.huggingface.co/novita/v3/openai"
# model="qwen/qwen-2.5-72b-instruct"
# Configure logging to write to 'zaoju_logs.log' without using pickle
logging.basicConfig(
filename='extract_po_logs.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
encoding='utf-8'
)
# Default Word XML namespace
DEFAULT_NS = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
NS = None # Global variable to store the namespace
def get_namespace(root):
"""Extracts the primary namespace from the XML root element while keeping the default."""
global NS
ns = root.tag.split('}')[0].strip('{')
NS = {'w': ns} if ns else DEFAULT_NS
return NS
# --- Helper Functions for DOCX Processing ---
def extract_text_from_cell(cell):
"""Extracts text from a Word table cell, preserving line breaks and reconstructing split words."""
paragraphs = cell.findall('.//w:p', NS)
lines = []
for paragraph in paragraphs:
# Get all text runs and concatenate their contents
text_runs = [t.text for t in paragraph.findall('.//w:t', NS) if t.text]
line = ''.join(text_runs).strip() # Merge split words properly
if line: # Add only non-empty lines
lines.append(line)
return lines # Return list of lines to preserve line breaks
def clean_spaces(text):
"""
Removes excessive spaces between Chinese characters while preserving spaces in English words.
"""
# Remove spaces **between** Chinese characters but keep English spaces
text = re.sub(r'([\u4e00-\u9fff])\s+([\u4e00-\u9fff])', r'\1\2', text)
return text.strip()
def extract_key_value_pairs(text, target_dict=None):
"""
Extracts multiple key-value pairs from a given text.
- First, split by more than 3 spaces (`\s{3,}`) **only if the next segment contains a `:`.**
- Then, process each segment by splitting at `:` to correctly assign keys and values.
"""
if target_dict is None:
target_dict = {}
text = text.replace(":", ":") # Normalize Chinese colons to English colons
# Step 1: Check if splitting by more than 3 spaces is necessary
segments = re.split(r'(\s{3,})', text) # Use raw string to prevent invalid escape sequence
# Step 2: Process each segment, ensuring we only split if the next part has a `:`
merged_segments = []
temp_segment = ""
for segment in segments:
if ":" in segment: # If segment contains `:`, it's a valid split point
if temp_segment:
merged_segments.append(temp_segment.strip())
temp_segment = ""
merged_segments.append(segment.strip())
else:
temp_segment += " " + segment.strip()
if temp_segment:
merged_segments.append(temp_segment.strip())
# Step 3: Extract key-value pairs correctly
for segment in merged_segments:
if ':' in segment:
key, value = segment.split(':', 1) # Only split at the first colon
key, value = key.strip(), value.strip() # Clean spaces
if key in target_dict:
target_dict[key] += "\n" + value # Append if key already exists
else:
target_dict[key] = value
return target_dict
# --- Table Processing Functions ---
def process_single_column_table(rows):
"""Processes a single-column table and returns the extracted lines as a list."""
single_column_data = []
for row in rows:
cells = row.findall('.//w:tc', NS)
if len(cells) == 1:
cell_lines = extract_text_from_cell(cells[0]) # Extract all lines from the cell
# Append each line directly to the list without splitting
single_column_data.extend(cell_lines)
return single_column_data # Return the list of extracted lines
def process_buyer_seller_table(rows):
"""Processes a two-column buyer-seller table into a structured dictionary using the first row as keys."""
headers = [extract_text_from_cell(cell) for cell in rows[0].findall('.//w:tc', NS)]
if len(headers) != 2:
return None # Not a buyer-seller table
# determine role based on header text
def get_role(header_text, default_role):
header_text = header_text.lower() # Convert to lowercase
if '买方' in header_text or 'buyer' in header_text or '甲方' in header_text:
return 'buyer_info'
elif '卖方' in header_text or 'seller' in header_text or '乙方' in header_text:
return 'seller_info'
else:
return default_role # Default if no keyword is found
# Determine the keys for buyer and seller columns
buyer_key = get_role(headers[0][0], 'buyer_info')
seller_key = get_role(headers[1][0], 'seller_info')
# Initialize the dictionary using the determined keys
buyer_seller_data = {
buyer_key: {},
seller_key: {}
}
for row in rows:
cells = row.findall('.//w:tc', NS)
if len(cells) == 2:
buyer_lines = extract_text_from_cell(cells[0])
seller_lines = extract_text_from_cell(cells[1])
for line in buyer_lines:
extract_key_value_pairs(line, buyer_seller_data[buyer_key])
for line in seller_lines:
extract_key_value_pairs(line, buyer_seller_data[seller_key])
return buyer_seller_data
def process_summary_table(rows):
"""Processes a two-column summary table where keys are extracted as dictionary keys."""
extracted_data = []
for row in rows:
cells = row.findall('.//w:tc', NS)
if len(cells) == 2:
key = " ".join(extract_text_from_cell(cells[0]))
value = " ".join(extract_text_from_cell(cells[1]))
extracted_data.append({key: value})
return extracted_data
def extract_headers(first_row_cells):
"""Extracts unique column headers from the first row of a table."""
headers = []
header_count = {}
for cell in first_row_cells:
cell_text = " ".join(extract_text_from_cell(cell))
grid_span = cell.find('.//w:gridSpan', NS)
col_span = int(grid_span.attrib.get(f'{{{NS["w"]}}}val', '1')) if grid_span is not None else 1
for _ in range(col_span):
# Ensure header uniqueness by appending an index if repeated
if cell_text in header_count:
header_count[cell_text] += 1
unique_header = f"{cell_text}_{header_count[cell_text]}"
else:
header_count[cell_text] = 1
unique_header = cell_text
headers.append(unique_header if unique_header else f"Column_{len(headers) + 1}")
return headers
def process_long_table(rows):
"""Processes a standard table and correctly handles horizontally merged cells."""
if not rows:
return [] # Avoid IndexError
headers = extract_headers(rows[0].findall('.//w:tc', NS))
table_data = []
vertical_merge_tracker = {}
for row in rows[1:]:
row_data = {}
cells = row.findall('.//w:tc', NS)
running_index = 0
for cell in cells:
cell_text = " ".join(extract_text_from_cell(cell))
# Consistent Namespace Handling for Horizontal Merge
grid_span = cell.find('.//w:gridSpan', NS)
grid_span_val = grid_span.attrib.get(f'{{{NS["w"]}}}val') if grid_span is not None else '1'
col_span = int(grid_span_val)
# Handle vertical merge
v_merge = cell.find('.//w:vMerge', NS)
if v_merge is not None:
v_merge_val = v_merge.attrib.get(f'{{{NS["w"]}}}val')
if v_merge_val == 'restart':
vertical_merge_tracker[running_index] = cell_text
else:
# Repeat the value from the previous row's merged cell
cell_text = vertical_merge_tracker.get(running_index, "")
# Repeat the value for horizontally merged cells
start_col = running_index
end_col = running_index + col_span
# Repeat the value for each spanned column
for col in range(start_col, end_col):
key = headers[col] if col < len(headers) else f"Column_{col+1}"
row_data[key] = cell_text
# Update the running index to the end of the merged cell
running_index = end_col
# Fill remaining columns with empty strings to maintain alignment
while running_index < len(headers):
row_data[headers[running_index]] = ""
running_index += 1
table_data.append(row_data)
# Filter out rows where the "序号" column contains non-numeric values
filtered_table_data = []
for row in table_data:
# Check if any cell contains "合计" (total)
contains_total = False
for key, value in row.items():
if isinstance(value, str) and "合计" in value:
contains_total = True
break
if contains_total:
continue
# Check potential serial number columns (use both Chinese and English variants)
serial_number = None
for column in row:
if any(term in column for term in ["序号"]):
serial_number = row[column]
break
# If we found a serial number column, check if its value is numeric
if serial_number is not None:
# Skip if serial number is empty
if not serial_number.strip():
continue
# Strip any non-numeric characters and check if there's still a value
# This keeps values like "1", "2." etc. but filters out "No." or other text
cleaned_number = re.sub(r'[^\d]', '', serial_number)
if cleaned_number: # If there are any digits left, keep the row
filtered_table_data.append(row)
else:
# If we couldn't find a serial number column, keep the row
filtered_table_data.append(row)
return filtered_table_data
def extract_tables(root):
"""Extracts tables from the DOCX document and returns structured data."""
tables = root.findall('.//w:tbl', NS)
table_data = {}
table_paragraphs = set()
for table_index, table in enumerate(tables, start=1):
rows = table.findall('.//w:tr', NS)
if not rows:
continue # Skip empty tables
for paragraph in table.findall('.//w:p', NS):
table_paragraphs.add(paragraph)
first_row_cells = rows[0].findall('.//w:tc', NS)
num_columns = len(first_row_cells)
if num_columns == 1:
single_column_data = process_single_column_table(rows)
if single_column_data:
table_data[f"table_{table_index}_single_column"] = single_column_data
continue # Skip further processing for this table
summary_start_index = None
for i, row in enumerate(rows):
if len(row.findall('.//w:tc', NS)) == 2:
summary_start_index = i
break
long_table_data = []
summary_data = []
if summary_start_index is not None and summary_start_index > 0:
long_table_data = process_long_table(rows[:summary_start_index])
elif summary_start_index is None:
long_table_data = process_long_table(rows)
if summary_start_index is not None:
is_buyer_seller_table = all(len(row.findall('.//w:tc', NS)) == 2 for row in rows)
if is_buyer_seller_table:
buyer_seller_data = process_buyer_seller_table(rows)
if buyer_seller_data:
table_data[f"table_{table_index}_buyer_seller"] = buyer_seller_data
else:
summary_data = process_summary_table(rows[summary_start_index:])
if long_table_data:
table_data[f"long_table_{table_index}"] = long_table_data
if summary_data:
table_data[f"long_table_{table_index}_summary"] = summary_data
return table_data, table_paragraphs
# --- Non-Table Processing Functions ---
def extract_text_outside_tables(root, table_paragraphs):
"""Extracts text from paragraphs outside tables in the document."""
extracted_text = []
for paragraph in root.findall('.//w:p', NS):
if paragraph in table_paragraphs:
continue # Skip paragraphs inside tables
texts = [t.text.strip() for t in paragraph.findall('.//w:t', NS) if t.text]
line = clean_spaces(' '.join(texts).replace(':',':')) # Clean colons and spaces
if ':' in line:
extracted_text.append(line)
return extracted_text
# --- Main Extraction Functions ---
def extract_docx_as_xml(file_bytes, save_xml=False, xml_filename="document.xml"):
# Ensure file_bytes is at the start position
file_bytes.seek(0)
with zipfile.ZipFile(file_bytes, 'r') as docx:
with docx.open('word/document.xml') as xml_file:
xml_content = xml_file.read().decode('utf-8')
if save_xml:
with open(xml_filename, "w", encoding="utf-8") as f:
f.write(xml_content)
return xml_content
def xml_to_json(xml_content, save_json=False, json_filename="extracted_data.json"):
tree = ET.ElementTree(ET.fromstring(xml_content))
root = tree.getroot()
table_data, table_paragraphs = extract_tables(root)
extracted_data = table_data
extracted_data["non_table_data"] = extract_text_outside_tables(root, table_paragraphs)
if save_json:
with open(json_filename, "w", encoding="utf-8") as f:
json.dump(extracted_data, f, ensure_ascii=False, indent=4)
return json.dumps(extracted_data, ensure_ascii=False, indent=4)
def deepseek_extract_contract_summary(json_data, save_json=False, json_filename="contract_summary.json"):
"""Sends extracted JSON data to OpenAI and returns formatted structured JSON."""
# Step 1: Convert JSON string to Python dictionary
contract_data = json.loads(json_data)
# Step 2: Remove keys that contain "long_table"
filtered_contract_data = {key: value for key, value in contract_data.items() if "long_table" not in key}
# Step 3: Convert back to JSON string (if needed)
json_output = json.dumps(contract_data, ensure_ascii=False, indent=4)
# Define Pydantic model for contract summary validation
class ContractSummary(BaseModel):
合同编号: Optional[str] = ""
接收人: Optional[str] = ""
Recipient: Optional[str] = ""
接收地: Optional[str] = ""
Place_of_receipt: Optional[str] = Field("", alias="Place of receipt")
供应商: Optional[str] = ""
币种: Optional[str] = ""
供货日期: Optional[str] = ""
base_prompt = """You are given a contract in JSON format. Extract the following information:
# Response Format
Return the extracted information as a structured JSON in the exact format shown below (Note: Do not repeat any keys, if unsure leave the value empty):
{
"合同编号":
"接收人": (注意:不是买家必须是接收人,不是一个公司而是一个人)
"Recipient":
"接收地": (注意:不是交货地点是目的港,只写中文,英文写在 place of receipt)
"Place of receipt": (只写英文, 如果接收地/目的港/Port of destination 有英文可填在这里)
"供应商":
"币种": (主要用的货币,填英文缩写。GNF一般是为了方便而转换出来的, 除非只有GNF,GNF一般不是主要币种。)
"供货日期": (如果合同里有写才填,不要自己推理出日期,必须是一个日期,而不是天数)(格式:YYYY-MM-DD)
}
Contract data in JSON format:""" + f"""
{json_output}"""
messages = [
{
"role": "user",
"content": base_prompt
}
]
# Deepseek R1 Distilled Qwen 2.5 14B --------------------------------
client = OpenAI(
base_url=base_url,
api_key=HF_API_KEY,
)
# Try up to 3 times with error feedback
max_retries = 3
for attempt in range(max_retries):
try:
print(f"🔄 LLM attempt {attempt + 1} of {max_retries}")
completion = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.1,
)
think_text = re.findall(r"<think>(.*?)</think>", completion.choices[0].message.content, flags=re.DOTALL)
if think_text:
print(f"🧠 Thought Process: {think_text}")
logging.info(f"Think text: {think_text}")
contract_summary = re.sub(r"<think>.*?</think>\s*", "", completion.choices[0].message.content, flags=re.DOTALL) # Remove think
contract_summary = re.sub(r"^```json\n|```$", "", contract_summary, flags=re.DOTALL) # Remove ```
# Clean up JSON before validation
contract_json = json.loads(contract_summary.strip())
validated_data = ContractSummary.model_validate(contract_json)
# Success! Return validated data
validated_json = json.dumps(validated_data.model_dump(by_alias=True), ensure_ascii=False, indent=4)
if save_json:
with open(json_filename, "w", encoding="utf-8") as f:
f.write(validated_json)
print(f"✅ Successfully validated contract summary on attempt {attempt + 1}")
return json.dumps(validated_json, ensure_ascii=False, indent=4)
except ValidationError as e:
error_msg = f"Validation error: {e}"
logging.error(f"{error_msg}")
logging.error(f"Input data: {contract_summary}")
print(f"❌ {error_msg}")
except json.JSONDecodeError as e:
error_msg = f"JSON decode error: {e}"
logging.error(f"{error_msg}")
logging.error(f"Input data: {contract_summary}")
print(f"❌ {error_msg}")
# Don't retry on the last attempt
if attempt < max_retries - 1:
# Add error message to the conversation and retry
messages.append({
"role": "assistant",
"content": completion.choices[0].message.content
})
messages.append({
"role": "user",
"content": f"Your response had the following error: {error_msg}. Please fix the format and provide a valid JSON response with the required fields."
})
# If we get here, all attempts failed - return empty but valid model
print("⚠️ All attempts failed, returning empty model")
empty_data = ContractSummary().model_dump(by_alias=True)
empty_json = json.dumps(empty_data, ensure_ascii=False, indent=4)
if save_json:
with open(json_filename, "w", encoding="utf-8") as f:
f.write(empty_json)
return json.dumps(empty_json, ensure_ascii=False, indent=4)
def extract_price_list(price_list, save_json=False, json_name="price_list.json"):
"""
Extracts structured price list by first using AI to map column names to standard keys,
then programmatically transforming the data to match the Pydantic model.
"""
# If price_list is empty, return an empty list
if not price_list:
return []
# Convert price_list to a list if it's a dict
if isinstance(price_list, dict):
# Check if the dict has any items
if len(price_list) == 0:
return []
# Convert to list if it's just a single entry dict
price_list = [price_list]
# Extract a sample row for header mapping
sample_row = price_list[0] if price_list else {}
# If there are no headers, return empty list
if not sample_row:
return []
# Get the headers directly from the sample row
extracted_headers = list(sample_row.keys())
# Clean double spaces in headers to facilitate AI identification
def clean_header_spaces(headers):
"""Clean double spaces in headers to make them more consistent for AI processing."""
return [re.sub(r'\s+', ' ', header).strip() for header in headers]
# Apply the cleaning function to extracted headers
extracted_headers = clean_header_spaces(extracted_headers)
# Define our target fields from the Pydantic model
target_fields = [
"序号", "名称", "名称(英文)", "品牌", "规格型号", "所属机型",
"数量", "单位", "单价", "总价", "几郎单价", "几郎总价",
"备注", "计划来源"
]
sample_mapping = """Examples of how you should map to guide you, there are other cases so use your own judgement to map the headers to the standard fields:
- Map "序号" to headers containing "序号No.", "序号 No.",
- Map "品牌" to headers containing "品牌Brand", "品牌 brand",
- Map "规格型号" to headers containing "规格型号", "规格 Specification", "Specification and Model", "规格型号Specification and Model", "型号Model"
- Map "所属机型" to headers containing "所属机型", "Applicable Models"
- Map "数量" to headers containing "数量Quantity", "数量 Quantity", "Qty"
- Map "单位" to headers containing "单位Unit", "单位 Unit"
- Map "单价" to headers containing "单价(元)", "单价(CNY)", "Unit Price (CNY)", "单价Unit Price"
- Map "总价" to headers containing "总价(元)", "总额(元)", "Amount (CNY)", "Total Amount (CNY)"
- Map "几郎单价" to headers containing "单价(几郎)", "几郎单价(元)", "Unit Price (GNF)", "单价Unit Price(几郎)(GNF)"
- Map "几郎总价" to headers containing "总额(几郎)", "几郎总额(元)", "Total Amount (GNF)"
- Map "备注" to headers containing "备注Remarks", "备注 notes", "Note"
- Map "计划来源" to headers containing "计划来源Plan No.", "计划来源(唛头信息)", "Planned Source" """
# Use AI to map extracted headers to our target fields
base_prompt = f"""
You are playing a matching game. Match each and every standard fields to the exactcolumn headers within "" separated by ,.
USE THE EXACT HEADER BELOW INCLUDING BOTH CHINESE AND ENGLISH AND THE EXACT SPACING.
The standard fields are:
{json.dumps(target_fields, ensure_ascii=False)}
You are given column headers below: (YOU MUST USE THE EXACT HEADER BELOW INCLUDING BOTH CHINESE AND ENGLISH AND THE EXACT SPACING)
{json.dumps(extracted_headers, ensure_ascii=False)}
ENSURE ALL STANDARD FIELDS ARE MAPPED TO THE EXACT COLUMN HEADER INCLUDING BOTH CHINESE AND ENGLISH AND THE EXACT SPACING.
Return only a JSON mapping in this format WITHOUT any explanations:
```json
{{
"standard_field_1": "column_header_1",
"standard_field_2": "column_header_2",
...
}}
```
Important: Map "名称" AND "名称(英文)" to the SAME extracted header.
For example, if the extracted header is "名称Name of Materials and Equipment", then:
{{
"名称": "名称Name of Materials and Equipment",
"名称(英文)": "名称Name of Materials and Equipment"
}}
"""
messages = [{"role": "user", "content": base_prompt}]
client = OpenAI(
base_url=base_url,
api_key=HF_API_KEY,
)
# Add retry logic similar to deepseek_extract_contract_summary
max_retries = 3
transformed_data = []
for attempt in range(max_retries):
try:
print(f"🔄 Sending prompt to LLM (attempt {attempt + 1} of {max_retries}: {base_prompt})")
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.1,
)
raw_mapping = response.choices[0].message.content
think_text = re.findall(r"<think>(.*?)</think>", response.choices[0].message.content, flags=re.DOTALL)
if think_text:
print(f"🧠 Thought Process: {think_text}")
logging.info(f"Think text: {think_text}")
raw_mapping = re.sub(r"<think>.*?</think>\s*", "", raw_mapping, flags=re.DOTALL) # Remove think
# Remove any backticks or json tags
raw_mapping = re.sub(r"```json|```", "", raw_mapping)
# Parse the mapping with standard fields as keys
standard_field_mapping = json.loads(raw_mapping.strip())
print(f"📊 Standard field mapping: {json.dumps(standard_field_mapping, ensure_ascii=False, indent=2)}")
# Function to separate Chinese and English text
def separate_chinese_english(text):
if not text or not isinstance(text, str):
return "", ""
# First check if there's a clear separator like hyphen or space
# Common patterns: "中文-English", "中文(English)", "中文 English"
patterns = [
r'^([\u4e00-\u9fff\-]+)[:\-\s]+([a-zA-Z].*)$', # Chinese-English
r'^([\u4e00-\u9fff\-]+)[\((]([a-zA-Z].*)[\))]$', # Chinese(English)
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1), match.group(2)
# Find the first Chinese character index
first_chinese_idx = -1
for i, char in enumerate(text):
if '\u4e00' <= char <= '\u9fff': # Chinese character
first_chinese_idx = i
break
# Find where English starts after Chinese
english_start_idx = len(text)
if first_chinese_idx >= 0:
# Search for the first English character that comes after Chinese
for i in range(first_chinese_idx, len(text)):
# Skip to the end of Chinese characters
if '\u4e00' <= text[i] <= '\u9fff':
continue
# Look ahead for English characters
for j in range(i, len(text)):
if 'a' <= text[j].lower() <= 'z':
english_start_idx = j
break
if english_start_idx < len(text):
break
# If we found the boundaries
if first_chinese_idx >= 0 and english_start_idx < len(text):
# Handle prefix: any Latin characters before Chinese should be part of Chinese name
prefix = text[:first_chinese_idx].strip() if first_chinese_idx > 0 else ""
chinese_part = text[first_chinese_idx:english_start_idx].strip()
english_part = text[english_start_idx:].strip()
# Combine prefix with Chinese part
if prefix:
chinese_part = f"{prefix} {chinese_part}"
return chinese_part, english_part
# Special case for prefix like "PVC" with no space before Chinese
if first_chinese_idx > 0:
prefix = text[:first_chinese_idx].strip()
rest_of_text = text[first_chinese_idx:]
# Extract Chinese and English from the rest of the text
chinese_chars = []
english_chars = []
in_chinese = True
for char in rest_of_text:
if '\u4e00' <= char <= '\u9fff': # Chinese character
if not in_chinese and english_chars: # If we've already seen English, something is wrong
chinese_chars = []
english_chars = []
break
chinese_chars.append(char)
in_chinese = True
elif 'a' <= char.lower() <= 'z' or char in ' -_()': # English or separator
if in_chinese and chinese_chars: # We've seen Chinese and now see English
english_chars.append(char)
in_chinese = False
elif not in_chinese: # Continue collecting English
english_chars.append(char)
else: # No Chinese seen yet, might be part of prefix
chinese_chars.append(char)
else: # Other characters (numbers, etc.)
if in_chinese:
chinese_chars.append(char)
else:
english_chars.append(char)
if chinese_chars and english_chars:
chinese_text = prefix + " " + ''.join(chinese_chars).strip()
english_text = ''.join(english_chars).strip()
return chinese_text, english_text
else:
# No clean separation possible
return prefix + " " + rest_of_text, ""
# Fallback: Try simple pattern matching
# Find all Chinese characters
chinese_chars = re.findall(r'[\u4e00-\u9fff]+', text)
chinese = ''.join(chinese_chars)
# If we have Chinese, extract everything up to the last Chinese character
if chinese:
last_chinese_idx = text.rindex(chinese_chars[-1]) + len(chinese_chars[-1])
# Anything before the first Chinese character is a prefix
first_chinese_idx = text.index(chinese_chars[0])
prefix = text[:first_chinese_idx].strip()
# Everything after the last Chinese character is English
chinese_part = prefix + " " + text[first_chinese_idx:last_chinese_idx].strip() if prefix else text[first_chinese_idx:last_chinese_idx].strip()
english_part = text[last_chinese_idx:].strip()
# If English part doesn't actually contain English letters, treat it as empty
if not re.search(r'[a-zA-Z]', english_part):
english_part = ""
return chinese_part, english_part
# No Chinese characters found, check if there are any English letters
if re.search(r'[a-zA-Z]', text):
return "", text.strip()
# No clear separation possible
return text.strip(), ""
# Process the data based on the standard field mapping
transformed_data = []
for row in price_list:
new_row = {field: "" for field in target_fields} # Initialize with empty strings
other_fields = {}
# Step 1: Handle name fields first - look for any field with "名称" or "name"
for header, value in row.items():
# Clean the header for comparison
cleaned_header = re.sub(r'\s+', ' ', header).strip()
header_lower = cleaned_header.lower()
if ("名称" in header_lower or "name" in header_lower) and value:
# If field contains both Chinese and English, separate them
if re.search(r'[\u4e00-\u9fff]', value) and re.search(r'[a-zA-Z]', value):
chinese, english = separate_chinese_english(value)
if chinese:
new_row["名称"] = chinese
if english:
new_row["名称(英文)"] = english
print(f"Separated: '{value}' → Chinese: '{chinese}', English: '{english}'")
else:
# Just set the name directly
new_row["名称"] = value
break # Stop after finding first name field
# Step 2: Fill in all other fields using standard mapping
for header, value in row.items():
# Skip empty values
if not value:
continue
# Clean the header for comparison
cleaned_header = re.sub(r'\s+', ' ', header).strip()
# Check if this maps to a standard field
matched_field = None
for std_field, mapped_header in standard_field_mapping.items():
# Make comparison more flexible by lowercasing and stripping spaces
if mapped_header.lower().strip() == cleaned_header.lower().strip():
matched_field = std_field
break
# If we found a mapping, use it (but don't overwrite name fields)
if matched_field:
if matched_field not in ["名称", "名称(英文)"] or not new_row[matched_field]:
new_row[matched_field] = value
# If no mapping found, add to other_fields
else:
# Skip name fields we already processed
header_lower = cleaned_header.lower()
if not ("名称" in header_lower or "name" in header_lower):
other_fields[header] = value
# Add remaining fields to "其他"
if other_fields:
new_row["其他"] = other_fields
else:
new_row["其他"] = {}
# Convert field names for validation
if "名称(英文)" in new_row:
new_row["名称(英文)"] = new_row.pop("名称(英文)")
transformed_data.append(new_row)
# Success! Break out of the retry loop
print(f"✅ Successfully processed price list on attempt {attempt + 1}")
break
except json.JSONDecodeError as e:
error_msg = f"JSON decode error in field mapping: {e}"
logging.error(f"{error_msg}")
print(f"❌ {error_msg}")
except KeyError as e:
error_msg = f"KeyError during data transformation: {e}"
logging.error(f"{error_msg}")
print(f"❌ {error_msg}")
except Exception as e:
error_msg = f"Error processing price list: {e}"
logging.error(f"{error_msg}")
print(f"❌ {error_msg}")
# Don't retry on the last attempt
if attempt < max_retries - 1:
# Add error message to the conversation and retry
if 'response' in locals():
messages.append({
"role": "assistant",
"content": response.choices[0].message.content
})
messages.append({
"role": "user",
"content": f"Your response had the following error: {error_msg}. Please fix your mapping and try again."
})
else:
print(f"⚠️ All {max_retries} attempts failed, returning empty result")
transformed_data = [] # Return empty list after all retries failed
# Save to file if requested
if save_json and transformed_data:
with open(json_name, "w", encoding="utf-8") as f:
json.dump(transformed_data, f, ensure_ascii=False, indent=4)
print(f"✅ Saved to {json_name}")
return transformed_data
def json_to_excel(contract_summary, json_data, excel_path):
"""Converts extracted JSON tables to an Excel file."""
# Correctly parse the JSON string
contract_summary_json = json.loads(json.loads(contract_summary))
contract_summary_df = pd.DataFrame([contract_summary_json])
# Ensure json_data is a dictionary
if isinstance(json_data, str):
json_data = json.loads(json_data)
long_tables = [pd.DataFrame(table) for key, table in json_data.items() if "long_table" in key and "summary" not in key]
long_table = long_tables[-1] if long_tables else pd.DataFrame()
with pd.ExcelWriter(excel_path) as writer:
contract_summary_df.to_excel(writer, sheet_name="Contract Summary", index=False)
long_table.to_excel(writer, sheet_name="Price List", index=False)
#--- Extract PO ------------------------------
def extract_po(docx_path):
"""Processes a single .docx file, extracts tables, formats with OpenAI, and returns combined JSON data."""
if not os.path.exists(docx_path) or not docx_path.endswith(".docx"):
raise ValueError(f"Invalid file: {docx_path}")
# Read the .docx file as bytes
with open(docx_path, "rb") as f:
docx_bytes = BytesIO(f.read())
# Step 1: Extract XML content from DOCX
print("Extracting Docs data to XML...")
xml_filename = os.path.splitext(os.path.basename(docx_path))[0] + "_document.xml"
xml_file = extract_docx_as_xml(docx_bytes, save_xml=False, xml_filename=xml_filename)
get_namespace(ET.fromstring(xml_file))
# Step 2: Extract tables from DOCX and save JSON
print("Extracting XML data to JSON...")
json_filename = os.path.splitext(os.path.basename(docx_path))[0] + "_extracted_data.json"
extracted_data = xml_to_json(xml_file, save_json=False, json_filename=json_filename)
# Step 3: Process JSON with OpenAI to get structured output
print("Processing Contract Summary data with AI...")
contract_summary_filename = os.path.splitext(os.path.basename(docx_path))[0] + "_contract_summary.json"
contract_summary = deepseek_extract_contract_summary(extracted_data, save_json=False, json_filename=contract_summary_filename)
# Find the last long table (excluding summary tables)
print("Processing Price List data with AI...")
long_tables = [
table for key, table in json.loads(extracted_data).items()
if "long_table" in key and "summary" not in key
]
last_long_table = long_tables[-1] if long_tables else {}
# Generate the price list filename in the same folder as the document
price_list_filename = os.path.join(os.path.dirname(docx_path), os.path.splitext(os.path.basename(docx_path))[0] + "_price_list.json")
# Process the price list and save it to a JSON file
price_list = extract_price_list(last_long_table, save_json=True, json_name=price_list_filename)
# Step 4: Combine contract summary and long table data into a single JSON object
print("Combining AI Generated JSON with Extracted Data...")
combined_data = {
"contract_summary": json.loads(json.loads(contract_summary)),
"price_list": price_list
}
# Logging
log = f"""Results:
Contract Summary: {contract_summary},
RAW Extracted Data: {extracted_data},
Combined JSON: {json.dumps(combined_data, ensure_ascii=False, indent=4)}"""
# print(log)
# print(f"🔄 Extracted Data: {combined_data}")
logging.info(f"""{log}""")
return combined_data
# Example Usage
# extract_po("test-contract-converted.docx")
# extract_po("test-contracts\GN-SMBLMCD202501-032WJ SMB联盟菜地PVC球阀等五金物资采购合同-ZHUOKE.docx")
# print(extract_price_list([{'序号 No.': '1', '名称 Name': 'PE波纹管(双壁波纹管) PE corrugated pipe (double wall corrugated pipe)', '规格 Specification': '内径600mm,6米/根,SN8 Inner diameter 600mm, 6 meters per piece, SN8', '单位 Unit': '米m', '数量 Quantity': '180', '单价(元) Unit Price (CNY)': '106.00', '总额(元) Total Amount (CNY)': '1080.00', '几郎单价(元) Unit Price (GNF)': '16.21', '几郎总额(元) Total Amount (GNF)': '22118.38', '品牌 Brand': '鹏洲PZ', '计划来源 Planned Source': 'SMB268-GNHY-0021-WJ-20250108'}]))
# Gradio Interface ------------------------------
import gradio as gr
from gradio.themes.base import Base
interface = gr.Interface(
fn=extract_po,
title="PO Extractor 买卖合同数据提取",
inputs=gr.File(label="买卖合同 (.docx)"),
outputs=gr.Json(label="提取结果"),
flagging_mode="never",
theme=Base()
)
interface.launch()