Spaces:
Runtime error
Runtime error
File size: 16,113 Bytes
1521ef5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 | """
Data Dictionary service for the Schema Descriptor application.
Provides functionality for building and updating data dictionaries.
"""
import concurrent.futures
import math
import threading
from errors import SchemaDescriptorError
from utils.progress_utils import ProgressTracker
from config import config
class DataDictionaryService:
"""
Service for building and updating data dictionaries.
"""
def __init__(self, bq_service, llm_service):
"""
Initialize a new data dictionary service.
Args:
bq_service: BigQuery service
llm_service: LLM service
"""
self.bq_service = bq_service
self.llm_service = llm_service
self.lock = threading.Lock() # For thread-safe operations
def describe_table(self, table_id, sample_limit=5, instructions=""):
"""
Generate a description for a table.
Args:
table_id: ID of the table
sample_limit: Maximum number of rows to sample
instructions: Additional instructions for the LLM
Returns:
Table description
"""
# Sample rows from the table
rows = self.bq_service.sample_table_rows(table_id, sample_limit)
# Create dictionary of column samples
table_sample = {}
if rows:
for col_name in rows[0].keys():
col_samples = [r.get(col_name, None) for r in rows]
table_sample[col_name] = col_samples
# Generate table description
return self.llm_service.get_table_description(table_id, table_sample, instructions)
def describe_column(self, table_id, column_name, sample_limit=10, instructions=""):
"""
Generate a description for a column.
Args:
table_id: ID of the table
column_name: Name of the column
sample_limit: Maximum number of samples to use
instructions: Additional instructions for the LLM
Returns:
Column description
"""
# Get column sample
column_sample = self.bq_service.get_column_sample(table_id, column_name, sample_limit)
# Generate column description
return self.llm_service.get_column_description(table_id, column_name, column_sample, instructions)
def process_table(self, table_id, project_id, limit_per_table, start_date, end_date,
instructions, progress):
"""
Process a single table for the data dictionary.
Args:
table_id: Fully qualified table ID
project_id: Google Cloud project ID
limit_per_table: Maximum number of rows to sample
start_date: Start date for partition filter
end_date: End date for partition filter
instructions: Additional instructions for the LLM
progress: Progress tracker object
Returns:
Tuple of (table_id, table_data) where table_data contains description and columns
"""
# Ensure BigQuery service has the correct project ID set
self.bq_service.project_id = project_id
progress.update(f"Processing table: {table_id}")
table_data = {
"table_description": None,
"columns": {}
}
# Sample rows from the table
progress.update(f"Sampling data from {table_id}...")
try:
print(f"CRITICAL TABLE DEBUG: Trying to sample table {table_id} with project_id={self.bq_service.project_id}")
rows = self.bq_service.sample_table_rows(table_id, limit_per_table, start_date, end_date)
print(f"CRITICAL TABLE DEBUG: Got {len(rows)} rows from {table_id}")
except Exception as e:
print(f"CRITICAL TABLE DEBUG: Exception sampling table {table_id}: {str(e)}")
progress.update(f"Error sampling table {table_id}: {str(e)}")
return table_id, table_data
if not rows:
progress.update(f"No data found in {table_id}, skipping")
return table_id, table_data
# Process columns
columns_info = {}
try:
col_names = rows[0].keys()
for col_name in col_names:
col_samples = [r.get(col_name, None) for r in rows]
columns_info[col_name] = {
"sample_values": col_samples,
"llm_description": None
}
except IndexError:
progress.update(f"Warning: No rows in sample for {table_id}")
# Return empty table data if there are no rows
return table_id, table_data
# Generate descriptions using LLM in batches
total_columns = len(columns_info)
progress.update(f"Generating descriptions for {total_columns} columns in {table_id}...")
# Process columns in batches for better performance
batch_size = config.batch_size
batches = math.ceil(total_columns / batch_size)
# Convert columns_info to list for batch processing
columns_list = list(columns_info.items())
for batch_index in range(batches):
start_idx = batch_index * batch_size
end_idx = min(start_idx + batch_size, total_columns)
batch = columns_list[start_idx:end_idx]
progress.update(f"Processing batch {batch_index+1}/{batches} for {table_id}")
# Process batch items
for col_index, (col_name, info) in enumerate(batch):
if total_columns > 5:
progress.update(f"Column {col_name} in {table_id}")
try:
description = self.llm_service.get_column_description(
table_id, col_name, info["sample_values"], instructions
)
# Update the original columns_info dictionary
columns_info[col_name]["llm_description"] = description
except Exception as e:
progress.update(f"Error generating description for column {col_name}: {str(e)}")
columns_info[col_name]["llm_description"] = f"Error: {str(e)[:100]}"
# Generate table description
progress.update(f"Generating table description for {table_id}...")
try:
table_prompt_data = {c: i["sample_values"] for c, i in columns_info.items()}
table_desc = self.llm_service.get_table_description(table_id, table_prompt_data, instructions)
table_data["table_description"] = table_desc
except Exception as e:
progress.update(f"Error generating table description: {str(e)}")
table_data["table_description"] = f"Error generating description: {str(e)[:100]}"
# Copy the columns info to the table data
table_data["columns"] = columns_info
progress.update(f"Completed processing table: {table_id}")
return table_id, table_data
def build_data_dictionary(self, project_id, dataset_id, instructions="", limit_per_table=5,
start_date=None, end_date=None, progress_callback=None):
"""
Build a data dictionary for a BigQuery dataset.
Args:
project_id: Google Cloud project ID
dataset_id: BigQuery dataset ID
instructions: Additional instructions for the LLM
limit_per_table: Maximum number of rows to sample per table
start_date: Start date for partition filter
end_date: End date for partition filter
progress_callback: Function to call with progress updates
Returns:
Data dictionary
Raises:
SchemaDescriptorError: If building the data dictionary fails
"""
# Set the project ID for the BigQuery service
self.bq_service.project_id = project_id
# Create progress tracker
progress = ProgressTracker(callback=progress_callback)
# Build data dictionary
data_dictionary = {}
# List tables in the dataset
try:
tables = self.bq_service.list_tables(dataset_id)
# In test mode, tables are just strings, not objects with attributes
if all(isinstance(t, str) for t in tables):
table_ids = [f"project.dataset.{t}" for t in tables]
else:
table_ids = [f"{t.project}.{t.dataset_id}.{t.table_id}" for t in tables]
total_tables = len(table_ids)
progress.update(f"Found {total_tables} tables in dataset {dataset_id}", total=total_tables)
except Exception as e:
error_msg = f"Error listing tables: {str(e)}"
progress.update(error_msg)
raise SchemaDescriptorError(error_msg)
# Determine max workers (parallel tables)
max_workers = min(config.max_parallel_tables, total_tables)
if max_workers > 1 and total_tables > 1:
progress.update(f"Processing {total_tables} tables with {max_workers} parallel workers")
# Process tables in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tables for processing
future_to_table = {
executor.submit(
self.process_table,
table_id,
project_id,
limit_per_table,
start_date,
end_date,
instructions,
progress
): table_id for table_id in table_ids
}
# Process results as they complete
completed = 0
for future in concurrent.futures.as_completed(future_to_table):
table_id = future_to_table[future]
completed += 1
try:
processed_id, table_data = future.result()
with self.lock:
# Make sure we're not overwriting existing data
if processed_id in data_dictionary:
print(f"Warning: Table {processed_id} already exists in dictionary!")
data_dictionary[processed_id] = table_data
# For debugging
print(f"Added table {processed_id} with {len(table_data.get('columns', {}))} columns to dictionary")
progress.update(f"Completed table {completed}/{total_tables}: {table_id}", current=completed)
except Exception as e:
progress.update(f"Error processing table {table_id}: {str(e)}")
# Continue with other tables even if one fails
else:
# Process tables sequentially
for table_index, table_id in enumerate(table_ids):
current_table = table_index + 1
progress.update(f"Processing table {current_table}/{total_tables}: {table_id}", current=current_table)
try:
print(f"CRITICAL DEBUG: Starting process_table for {table_id}")
# Create a safe local copy of the BigQuery service with correct project ID
# This is to ensure project_id is properly set for each table
from google.cloud import bigquery
import copy
# Set project ID directly in this thread
self.bq_service.project_id = project_id
# Now process the table
processed_id, table_data = self.process_table(
table_id,
project_id,
limit_per_table,
start_date,
end_date,
instructions,
progress
)
print(f"CRITICAL DEBUG: Received result from process_table: {processed_id}, with data: {table_data.keys()}")
print(f"CRITICAL DEBUG: Table columns: {list(table_data.get('columns', {}).keys())}")
# Store result in dictionary
data_dictionary[processed_id] = table_data
# Verify storage succeeded
print(f"CRITICAL DEBUG: After adding to dictionary - keys: {list(data_dictionary.keys())}")
if processed_id in data_dictionary:
print(f"CRITICAL DEBUG: Table {processed_id} successfully added to dictionary")
else:
print(f"CRITICAL DEBUG: ERROR! Table {processed_id} NOT added to dictionary!")
# For debugging
print(f"Sequential: Added table {processed_id} with {len(table_data.get('columns', {}))} columns to dictionary")
except Exception as e:
print(f"CRITICAL DEBUG: Exception in process_table: {str(e)}")
progress.update(f"Error processing table {table_id}: {str(e)}")
# Continue with other tables even if one fails
# Final validation check before returning
print(f"FINAL DEBUG: Dictionary keys before adding dataset description: {list(data_dictionary.keys())}")
# Generate dataset description
if table_ids:
progress.update(f"Generating dataset description for {dataset_id}...")
try:
ds_desc = self.llm_service.get_dataset_description(dataset_id, table_ids, instructions)
data_dictionary["_dataset_description"] = ds_desc
except Exception as e:
progress.update(f"Error generating dataset description: {str(e)}")
data_dictionary["_dataset_description"] = f"Error generating dataset description: {str(e)[:100]}"
else:
# Ensure we have a dataset description even if no tables
data_dictionary["_dataset_description"] = f"Dataset {dataset_id} (no valid tables found)"
# Additional validation
if len(data_dictionary) <= 1 and "_dataset_description" in data_dictionary:
progress.update("Warning: No table data was processed successfully!")
progress.update("Data dictionary creation complete!")
# Debug output
print(f"Final data dictionary has {len(data_dictionary)} entries (including dataset description)")
for key in data_dictionary.keys():
print(f" - {key}")
return data_dictionary
def update_dataset_and_tables(self, data_dictionary, project_id, dataset_id, progress_callback=None):
"""
Update dataset and table descriptions in BigQuery.
Args:
data_dictionary: Dictionary of dataset and table descriptions
project_id: Google Cloud project ID
dataset_id: BigQuery dataset ID
progress_callback: Function to call with progress updates
Raises:
SchemaDescriptorError: If updating the dataset and tables fails
"""
# Set the project ID for the BigQuery service
self.bq_service.project_id = project_id
# Update dataset and tables
self.bq_service.update_dataset_and_tables(data_dictionary, dataset_id, progress_callback) |