File size: 16,113 Bytes
1521ef5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
"""
Data Dictionary service for the Schema Descriptor application.
Provides functionality for building and updating data dictionaries.
"""

import concurrent.futures
import math
import threading
from errors import SchemaDescriptorError
from utils.progress_utils import ProgressTracker
from config import config

class DataDictionaryService:
    """
    Service for building and updating data dictionaries.
    """
    
    def __init__(self, bq_service, llm_service):
        """
        Initialize a new data dictionary service.
        
        Args:
            bq_service: BigQuery service
            llm_service: LLM service
        """
        self.bq_service = bq_service
        self.llm_service = llm_service
        self.lock = threading.Lock()  # For thread-safe operations
        
    def describe_table(self, table_id, sample_limit=5, instructions=""):
        """
        Generate a description for a table.
        
        Args:
            table_id: ID of the table
            sample_limit: Maximum number of rows to sample
            instructions: Additional instructions for the LLM
            
        Returns:
            Table description
        """
        # Sample rows from the table
        rows = self.bq_service.sample_table_rows(table_id, sample_limit)
        
        # Create dictionary of column samples
        table_sample = {}
        if rows:
            for col_name in rows[0].keys():
                col_samples = [r.get(col_name, None) for r in rows]
                table_sample[col_name] = col_samples
        
        # Generate table description
        return self.llm_service.get_table_description(table_id, table_sample, instructions)
    
    def describe_column(self, table_id, column_name, sample_limit=10, instructions=""):
        """
        Generate a description for a column.
        
        Args:
            table_id: ID of the table
            column_name: Name of the column
            sample_limit: Maximum number of samples to use
            instructions: Additional instructions for the LLM
            
        Returns:
            Column description
        """
        # Get column sample
        column_sample = self.bq_service.get_column_sample(table_id, column_name, sample_limit)
        
        # Generate column description
        return self.llm_service.get_column_description(table_id, column_name, column_sample, instructions)
    
    def process_table(self, table_id, project_id, limit_per_table, start_date, end_date, 
                   instructions, progress):
        """
        Process a single table for the data dictionary.
        
        Args:
            table_id: Fully qualified table ID
            project_id: Google Cloud project ID
            limit_per_table: Maximum number of rows to sample
            start_date: Start date for partition filter
            end_date: End date for partition filter
            instructions: Additional instructions for the LLM
            progress: Progress tracker object
            
        Returns:
            Tuple of (table_id, table_data) where table_data contains description and columns
        """
        # Ensure BigQuery service has the correct project ID set
        self.bq_service.project_id = project_id
        progress.update(f"Processing table: {table_id}")
        
        table_data = {
            "table_description": None,
            "columns": {}
        }
        
        # Sample rows from the table
        progress.update(f"Sampling data from {table_id}...")
        try:
            print(f"CRITICAL TABLE DEBUG: Trying to sample table {table_id} with project_id={self.bq_service.project_id}")
            rows = self.bq_service.sample_table_rows(table_id, limit_per_table, start_date, end_date)
            print(f"CRITICAL TABLE DEBUG: Got {len(rows)} rows from {table_id}")
        except Exception as e:
            print(f"CRITICAL TABLE DEBUG: Exception sampling table {table_id}: {str(e)}")
            progress.update(f"Error sampling table {table_id}: {str(e)}")
            return table_id, table_data
        
        if not rows:
            progress.update(f"No data found in {table_id}, skipping")
            return table_id, table_data
            
        # Process columns
        columns_info = {}
        try:
            col_names = rows[0].keys()
            
            for col_name in col_names:
                col_samples = [r.get(col_name, None) for r in rows]
                columns_info[col_name] = {
                    "sample_values": col_samples,
                    "llm_description": None
                }
        except IndexError:
            progress.update(f"Warning: No rows in sample for {table_id}")
            # Return empty table data if there are no rows
            return table_id, table_data
            
        # Generate descriptions using LLM in batches
        total_columns = len(columns_info)
        progress.update(f"Generating descriptions for {total_columns} columns in {table_id}...")
        
        # Process columns in batches for better performance
        batch_size = config.batch_size
        batches = math.ceil(total_columns / batch_size)
        
        # Convert columns_info to list for batch processing
        columns_list = list(columns_info.items())
        
        for batch_index in range(batches):
            start_idx = batch_index * batch_size
            end_idx = min(start_idx + batch_size, total_columns)
            batch = columns_list[start_idx:end_idx]
            
            progress.update(f"Processing batch {batch_index+1}/{batches} for {table_id}")
            
            # Process batch items
            for col_index, (col_name, info) in enumerate(batch):
                if total_columns > 5:
                    progress.update(f"Column {col_name} in {table_id}")
                
                try:
                    description = self.llm_service.get_column_description(
                        table_id, col_name, info["sample_values"], instructions
                    )
                    # Update the original columns_info dictionary
                    columns_info[col_name]["llm_description"] = description
                except Exception as e:
                    progress.update(f"Error generating description for column {col_name}: {str(e)}")
                    columns_info[col_name]["llm_description"] = f"Error: {str(e)[:100]}"
                
        # Generate table description
        progress.update(f"Generating table description for {table_id}...")
        try:
            table_prompt_data = {c: i["sample_values"] for c, i in columns_info.items()}
            table_desc = self.llm_service.get_table_description(table_id, table_prompt_data, instructions)
            table_data["table_description"] = table_desc
        except Exception as e:
            progress.update(f"Error generating table description: {str(e)}")
            table_data["table_description"] = f"Error generating description: {str(e)[:100]}"
        
        # Copy the columns info to the table data
        table_data["columns"] = columns_info
        
        progress.update(f"Completed processing table: {table_id}")
        return table_id, table_data
    
    def build_data_dictionary(self, project_id, dataset_id, instructions="", limit_per_table=5, 
                             start_date=None, end_date=None, progress_callback=None):
        """
        Build a data dictionary for a BigQuery dataset.
        
        Args:
            project_id: Google Cloud project ID
            dataset_id: BigQuery dataset ID
            instructions: Additional instructions for the LLM
            limit_per_table: Maximum number of rows to sample per table
            start_date: Start date for partition filter
            end_date: End date for partition filter
            progress_callback: Function to call with progress updates
            
        Returns:
            Data dictionary
            
        Raises:
            SchemaDescriptorError: If building the data dictionary fails
        """
        # Set the project ID for the BigQuery service
        self.bq_service.project_id = project_id
        
        # Create progress tracker
        progress = ProgressTracker(callback=progress_callback)
        
        # Build data dictionary
        data_dictionary = {}
        
        # List tables in the dataset
        try:
            tables = self.bq_service.list_tables(dataset_id)
            # In test mode, tables are just strings, not objects with attributes
            if all(isinstance(t, str) for t in tables):
                table_ids = [f"project.dataset.{t}" for t in tables]
            else:
                table_ids = [f"{t.project}.{t.dataset_id}.{t.table_id}" for t in tables]
            total_tables = len(table_ids)
            
            progress.update(f"Found {total_tables} tables in dataset {dataset_id}", total=total_tables)
        except Exception as e:
            error_msg = f"Error listing tables: {str(e)}"
            progress.update(error_msg)
            raise SchemaDescriptorError(error_msg)
        
        # Determine max workers (parallel tables)
        max_workers = min(config.max_parallel_tables, total_tables)
        
        if max_workers > 1 and total_tables > 1:
            progress.update(f"Processing {total_tables} tables with {max_workers} parallel workers")
            
            # Process tables in parallel
            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                # Submit all tables for processing
                future_to_table = {
                    executor.submit(
                        self.process_table, 
                        table_id, 
                        project_id, 
                        limit_per_table, 
                        start_date, 
                        end_date, 
                        instructions, 
                        progress
                    ): table_id for table_id in table_ids
                }
                
                # Process results as they complete
                completed = 0
                for future in concurrent.futures.as_completed(future_to_table):
                    table_id = future_to_table[future]
                    completed += 1
                    
                    try:
                        processed_id, table_data = future.result()
                        with self.lock:
                            # Make sure we're not overwriting existing data
                            if processed_id in data_dictionary:
                                print(f"Warning: Table {processed_id} already exists in dictionary!")
                            data_dictionary[processed_id] = table_data
                            # For debugging
                            print(f"Added table {processed_id} with {len(table_data.get('columns', {}))} columns to dictionary")
                        progress.update(f"Completed table {completed}/{total_tables}: {table_id}", current=completed)
                    except Exception as e:
                        progress.update(f"Error processing table {table_id}: {str(e)}")
                        # Continue with other tables even if one fails
        else:
            # Process tables sequentially
            for table_index, table_id in enumerate(table_ids):
                current_table = table_index + 1
                progress.update(f"Processing table {current_table}/{total_tables}: {table_id}", current=current_table)
                
                try:
                    print(f"CRITICAL DEBUG: Starting process_table for {table_id}")
                    # Create a safe local copy of the BigQuery service with correct project ID
                    # This is to ensure project_id is properly set for each table
                    from google.cloud import bigquery
                    import copy
                    
                    # Set project ID directly in this thread
                    self.bq_service.project_id = project_id
                    
                    # Now process the table
                    processed_id, table_data = self.process_table(
                        table_id, 
                        project_id, 
                        limit_per_table, 
                        start_date, 
                        end_date, 
                        instructions, 
                        progress
                    )
                    print(f"CRITICAL DEBUG: Received result from process_table: {processed_id}, with data: {table_data.keys()}")
                    print(f"CRITICAL DEBUG: Table columns: {list(table_data.get('columns', {}).keys())}")
                    
                    # Store result in dictionary
                    data_dictionary[processed_id] = table_data
                    
                    # Verify storage succeeded
                    print(f"CRITICAL DEBUG: After adding to dictionary - keys: {list(data_dictionary.keys())}")
                    if processed_id in data_dictionary:
                        print(f"CRITICAL DEBUG: Table {processed_id} successfully added to dictionary")
                    else:
                        print(f"CRITICAL DEBUG: ERROR! Table {processed_id} NOT added to dictionary!")
                    
                    # For debugging
                    print(f"Sequential: Added table {processed_id} with {len(table_data.get('columns', {}))} columns to dictionary")
                except Exception as e:
                    print(f"CRITICAL DEBUG: Exception in process_table: {str(e)}")
                    progress.update(f"Error processing table {table_id}: {str(e)}")
                    # Continue with other tables even if one fails
        
        # Final validation check before returning
        print(f"FINAL DEBUG: Dictionary keys before adding dataset description: {list(data_dictionary.keys())}")
        
        # Generate dataset description
        if table_ids:
            progress.update(f"Generating dataset description for {dataset_id}...")
            try:
                ds_desc = self.llm_service.get_dataset_description(dataset_id, table_ids, instructions)
                data_dictionary["_dataset_description"] = ds_desc
            except Exception as e:
                progress.update(f"Error generating dataset description: {str(e)}")
                data_dictionary["_dataset_description"] = f"Error generating dataset description: {str(e)[:100]}"
        else:
            # Ensure we have a dataset description even if no tables
            data_dictionary["_dataset_description"] = f"Dataset {dataset_id} (no valid tables found)"
        
        # Additional validation
        if len(data_dictionary) <= 1 and "_dataset_description" in data_dictionary:
            progress.update("Warning: No table data was processed successfully!")
            
        progress.update("Data dictionary creation complete!")
        # Debug output
        print(f"Final data dictionary has {len(data_dictionary)} entries (including dataset description)") 
        for key in data_dictionary.keys():
            print(f"  - {key}")
            
        return data_dictionary
        
    def update_dataset_and_tables(self, data_dictionary, project_id, dataset_id, progress_callback=None):
        """
        Update dataset and table descriptions in BigQuery.
        
        Args:
            data_dictionary: Dictionary of dataset and table descriptions
            project_id: Google Cloud project ID
            dataset_id: BigQuery dataset ID
            progress_callback: Function to call with progress updates
            
        Raises:
            SchemaDescriptorError: If updating the dataset and tables fails
        """
        # Set the project ID for the BigQuery service
        self.bq_service.project_id = project_id
        
        # Update dataset and tables
        self.bq_service.update_dataset_and_tables(data_dictionary, dataset_id, progress_callback)