paddle12 commited on
Commit
2c4447e
·
verified ·
1 Parent(s): 2212ca0

Upload utils.py

Browse files
Files changed (1) hide show
  1. utils.py +148 -0
utils.py ADDED
@@ -0,0 +1,148 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uuid
2
+ import json
3
+ from typing import List, Dict, Any, Tuple
4
+ from openpyxl import load_workbook
5
+
6
+ # === XLSX to JSON Conversion Functions ===
7
+ def detect_table_and_paragraphs(worksheet) -> Tuple[List[List[str]], List[Dict[str, Any]]]:
8
+ """
9
+ Detect and separate table data and paragraph text from Excel worksheet.
10
+ Returns: (table_data, paragraphs)
11
+ """
12
+ data = []
13
+ max_col = worksheet.max_column
14
+ max_row = worksheet.max_row
15
+
16
+ # Read all data from worksheet
17
+ for row in worksheet.iter_rows(min_row=1, max_row=max_row, values_only=True):
18
+ # Filter out completely empty rows
19
+ if any(cell is not None for cell in row):
20
+ data.append(list(row))
21
+
22
+ # Detect table boundaries
23
+ table_data = []
24
+ paragraph_texts = []
25
+ table_ended = False
26
+
27
+ for i, row in enumerate(data):
28
+ # Count non-empty cells in the row
29
+ non_empty_cells = sum(1 for cell in row if cell is not None)
30
+
31
+ if not table_ended:
32
+ # If row has multiple non-empty cells, it's likely part of a table
33
+ if non_empty_cells >= 2:
34
+ # Convert None to empty strings for table cells
35
+ table_row = [str(cell) if cell is not None else "" for cell in row[:max_col]]
36
+ table_data.append(table_row)
37
+ else:
38
+ # Single cell with text might be end of table
39
+ if non_empty_cells == 1 and any(cell for cell in row if cell):
40
+ # Check if it's a paragraph (longer text)
41
+ text = next((str(cell) for cell in row if cell), "")
42
+ if len(text) > 50 or not any(char.isdigit() for char in text):
43
+ table_ended = True
44
+ paragraph_texts.append(text)
45
+ elif non_empty_cells == 0 and len(table_data) > 0:
46
+ # Empty row after table data
47
+ table_ended = True
48
+ else:
49
+ # After table ended, collect paragraphs
50
+ if non_empty_cells > 0:
51
+ text = " ".join(str(cell) for cell in row if cell)
52
+ if text.strip():
53
+ paragraph_texts.append(text.strip())
54
+
55
+ # Create paragraph objects
56
+ paragraphs = []
57
+ for i, text in enumerate(paragraph_texts):
58
+ paragraphs.append({
59
+ "uid": str(uuid.uuid4()),
60
+ "order": i + 1,
61
+ "text": text
62
+ })
63
+
64
+ return table_data, paragraphs
65
+
66
+ def xlsx_to_json(file_path) -> Dict[str, Any]:
67
+ """
68
+ Convert XLSX file to TAT-QA JSON format.
69
+ """
70
+ workbook = load_workbook(file_path, data_only=True)
71
+ worksheet = workbook.active
72
+
73
+ # Extract table and paragraphs
74
+ table_data, paragraphs = detect_table_and_paragraphs(worksheet)
75
+
76
+ # Create JSON structure
77
+ json_data = {
78
+ "table": {
79
+ "uid": str(uuid.uuid4()),
80
+ "table": table_data
81
+ },
82
+ "paragraphs": paragraphs,
83
+ "questions": [] # Empty for user to fill later
84
+ }
85
+
86
+ return json_data
87
+
88
+ def json_to_jsonl(json_data: Dict[str, Any]) -> str:
89
+ """
90
+ Convert JSON to JSONL format (one JSON object per line).
91
+ """
92
+ return json.dumps(json_data, ensure_ascii=False)
93
+
94
+ def json_to_markdown(json_data: Dict[str, Any]) -> str:
95
+ """
96
+ Convert JSON data to markdown format for display.
97
+ """
98
+ markdown_content = "## Table Data\n\n"
99
+
100
+ # Convert table to markdown
101
+ table = json_data["table"]["table"]
102
+ if table:
103
+ # Create markdown table
104
+ markdown_content += "| " + " | ".join(table[0]) + " |\n"
105
+ markdown_content += "| " + " | ".join(["---"] * len(table[0])) + " |\n"
106
+ for row in table[1:]:
107
+ markdown_content += "| " + " | ".join(row) + " |\n"
108
+
109
+ # Add paragraphs
110
+ markdown_content += "\n## Context/Paragraphs\n\n"
111
+ for para in json_data["paragraphs"]:
112
+ markdown_content += f"{para['order']}. {para['text']}\n\n"
113
+
114
+ return markdown_content
115
+
116
+ # === Updated Prompt Creation Function ===
117
+ def create_prompt(table_data: Dict[str, Any], question: str) -> str:
118
+ """
119
+ Create prompt in the same format as training data.
120
+ """
121
+ # Convert table to markdown format
122
+ table = table_data["table"]["table"]
123
+ table_md = "\n".join(["| " + " | ".join(row) + " |" for row in table])
124
+
125
+ # Extract paragraph texts
126
+ text_content = "\n".join([p["text"] for p in table_data["paragraphs"]])
127
+
128
+ prompt = f"""### Instruction
129
+ Given a table and a list of texts in the following, answer the question posed using the following six-step process:
130
+ 1. Step 1: Predict the type of question being asked. Store this prediction in the variable {{question_type}}.
131
+ 2. Step 2: Extract the relevant strings or numerical values from the provided table or texts. Store them in {{evidence}}.
132
+ 3. Step 3: If {{question_type}} is Arithmetic, generate an equation in {{equation}}. Otherwise, put N.A..
133
+ 4. Step 4: Compute the final answer and store in {{answer}}.
134
+ 5. Step 5: Predict the answer's scale in {{scale}}. One of: none, percent, thousand, million, billion.
135
+ 6. Step 6: Based on the {{answer}} and {{question_type}}, generate a short and logical recommendation, business insight, or next action. Store it in {{action}}.
136
+
137
+ ### Table
138
+ {table_md}
139
+
140
+ ### Text
141
+ {text_content}
142
+
143
+ ### Question
144
+ {question}
145
+
146
+ ### Answer"""
147
+
148
+ return prompt