Spaces:
Running
Running
| """ | |
| Schema utilities for data analysis and processing | |
| """ | |
| import re | |
| from typing import List, Dict, Any | |
| def extract_key_business_terms(content: str, max_chars: int = 1000) -> str: | |
| """ | |
| Extract key business terms from website content | |
| """ | |
| if not content: | |
| return "No content provided" | |
| # Clean up the content | |
| content = re.sub(r'<[^>]+>', '', content) # Remove HTML tags | |
| content = re.sub(r'\s+', ' ', content) # Normalize whitespace | |
| content = content.strip() | |
| # If content is short enough, return as-is | |
| if len(content) <= max_chars: | |
| return content | |
| # Extract sentences and prioritize those with business keywords | |
| sentences = re.split(r'[.!?]+', content) | |
| business_keywords = [ | |
| 'revenue', 'sales', 'profit', 'growth', 'customer', 'market', | |
| 'product', 'service', 'business', 'company', 'industry', | |
| 'solution', 'platform', 'technology', 'analytics', 'data' | |
| ] | |
| # Score sentences by business relevance | |
| scored_sentences = [] | |
| for sentence in sentences: | |
| if len(sentence.strip()) < 10: # Skip very short sentences | |
| continue | |
| score = sum(1 for keyword in business_keywords if keyword.lower() in sentence.lower()) | |
| scored_sentences.append((score, sentence.strip())) | |
| # Sort by score and take top sentences that fit within max_chars | |
| scored_sentences.sort(reverse=True, key=lambda x: x[0]) | |
| result = "" | |
| for score, sentence in scored_sentences: | |
| if len(result + sentence + ". ") <= max_chars: | |
| result += sentence + ". " | |
| else: | |
| break | |
| return result.strip() if result else content[:max_chars] + "..." | |
| def parse_ddl_schema(ddl_content: str) -> Dict[str, Any]: | |
| """ | |
| Parse DDL content to extract schema information | |
| """ | |
| tables = {} | |
| # Simple regex to find CREATE TABLE statements | |
| table_pattern = r'CREATE\s+TABLE\s+(\w+)\s*\((.*?)\);' | |
| matches = re.findall(table_pattern, ddl_content, re.IGNORECASE | re.DOTALL) | |
| for table_name, columns_def in matches: | |
| columns = [] | |
| # Smart column parsing - split by comma but NOT inside parentheses | |
| column_lines = [] | |
| current_col = "" | |
| paren_depth = 0 | |
| for char in columns_def: | |
| if char == '(': | |
| paren_depth += 1 | |
| current_col += char | |
| elif char == ')': | |
| paren_depth -= 1 | |
| current_col += char | |
| elif char == ',' and paren_depth == 0: | |
| # This is a column separator, not inside type definition | |
| if current_col.strip(): | |
| column_lines.append(current_col.strip()) | |
| current_col = "" | |
| else: | |
| current_col += char | |
| # Don't forget the last column | |
| if current_col.strip(): | |
| column_lines.append(current_col.strip()) | |
| for line in column_lines: | |
| line = line.strip() | |
| if line and not line.startswith('PRIMARY KEY') and not line.startswith('FOREIGN KEY'): | |
| # Extract column name and type (including parameters like DECIMAL(10,2)) | |
| parts = line.split() | |
| if parts: | |
| col_name = parts[0] | |
| # Get the FULL type including parameters (e.g., DECIMAL(3,2), VARCHAR(100)) | |
| # Use regex to capture type with optional parameters | |
| type_match = re.search(r'(\w+(?:\([^)]+\))?)', line) | |
| if type_match and type_match.start() > 0: # Make sure we're past the column name | |
| col_type = type_match.group(1) | |
| else: | |
| col_type = parts[1] if len(parts) > 1 else 'VARCHAR' | |
| columns.append({ | |
| 'name': col_name, | |
| 'type': col_type | |
| }) | |
| tables[table_name] = { | |
| 'columns': columns, | |
| 'raw_definition': columns_def.strip() | |
| } | |
| return tables | |
| def validate_population_script_schema(script_content: str, schema_info: Dict[str, Any] = None, strict_mode: bool = False) -> tuple: | |
| """ | |
| Validate population script against schema | |
| """ | |
| validation_result = { | |
| 'valid': True, | |
| 'errors': [], | |
| 'warnings': [], | |
| 'table_operations': [] | |
| } | |
| # Check for basic Python syntax | |
| try: | |
| compile(script_content, '<string>', 'exec') | |
| validation_result['syntax_valid'] = True | |
| except SyntaxError as e: | |
| validation_result['valid'] = False | |
| validation_result['syntax_valid'] = False | |
| validation_result['errors'].append(f"Syntax error: {str(e)}") | |
| return validation_result['valid'], validation_result['errors'] + validation_result['warnings'] | |
| # Look for SQL operations | |
| sql_operations = re.findall(r'INSERT\s+INTO\s+(\w+)', script_content, re.IGNORECASE) | |
| validation_result['table_operations'] = list(set(sql_operations)) | |
| # Check for Snowflake connection | |
| if 'snowflake.connector.connect' in script_content: | |
| validation_result['snowflake_connection'] = True | |
| else: | |
| validation_result['warnings'].append("No Snowflake connection detected") | |
| # Validate against schema if provided | |
| if schema_info and strict_mode: | |
| expected_tables = set(schema_info.keys()) | |
| found_tables = set(validation_result['table_operations']) | |
| missing_tables = expected_tables - found_tables | |
| if missing_tables: | |
| validation_result['warnings'].append(f"Missing operations for tables: {', '.join(missing_tables)}") | |
| # Return the expected format: (is_valid, issues_list) | |
| issues = validation_result['errors'] + validation_result['warnings'] | |
| return validation_result['valid'], issues | |
| def generate_schema_constrained_prompt(schema_info: Dict[str, Any], use_case: str, business_context: str = "") -> str: | |
| """ | |
| Generate a schema-constrained prompt for data population | |
| """ | |
| if not schema_info: | |
| base_prompt = f"Generate realistic data for {use_case} use case" | |
| if business_context: | |
| return f"{base_prompt}\n\nBusiness Context:\n{business_context}" | |
| return base_prompt | |
| # Generate detailed table descriptions with column types | |
| table_descriptions = [] | |
| for table_name, table_info in schema_info.items(): | |
| columns = table_info.get('columns', []) | |
| if columns: | |
| column_details = [] | |
| for col in columns[:10]: # First 10 columns | |
| col_name = col.get('name', 'unknown') | |
| col_type = col.get('type', 'unknown') | |
| column_details.append(f"{col_name} ({col_type})") | |
| column_list = ', '.join(column_details) | |
| if len(columns) > 10: | |
| column_list += f" (and {len(columns)-10} more columns)" | |
| table_descriptions.append(f"- {table_name}: {column_list}") | |
| if table_descriptions: | |
| schema_desc = "Database schema contains:\n" + "\n".join(table_descriptions) | |
| else: | |
| schema_desc = "Schema information not available" | |
| # Enhanced prompt with specific requirements | |
| prompt = f"""Generate realistic data for {use_case} use case. | |
| {schema_desc} | |
| REQUIREMENTS: | |
| 1. Generate Python code that connects to Snowflake and populates these tables | |
| 2. Use proper Snowflake connection with schema parameter | |
| 3. Generate 1000+ rows of realistic data per table | |
| 4. Ensure referential integrity between tables | |
| 5. Include realistic business scenarios and edge cases | |
| 6. Use proper data types and constraints | |
| 7. Include error handling for connection issues | |
| 8. **IMPORTANT**: Document strategic outliers with structured comments for demo purposes | |
| OUTLIER DOCUMENTATION FORMAT: | |
| For each strategic outlier or interesting pattern you inject into the data, add structured comments ABOVE the code that injects it: | |
| # DEMO_OUTLIER: [Brief title - e.g., "Popular Items Across Regions"] | |
| # INSIGHT: [What pattern exists - e.g., "Specific product selling 5x normal volume in one region last month"] | |
| # VIZ_TYPE: [Chart type - COLUMN, BAR, LINE, KPI, TABLE, SCATTER] | |
| # VIZ_MEASURE_TYPE: [Semantic measure types - e.g., "sales_amount, sales_quantity"] | |
| # VIZ_DIMENSION_TYPES: [Semantic dimension types - e.g., "product_name, geographic_region"] | |
| # SHOW_ME: [Natural language query - e.g., "Show sales by product and region for popular items last month"] | |
| # KPI_METRIC: [Optional companion KPI - e.g., "total_popular_item_revenue"] | |
| # IMPACT: [Business impact - e.g., "$500K in concentrated demand, potential stockout risk"] | |
| # TALKING_POINT: [Demo talking point - e.g., "See how ThoughtSpot surfaces regional product trends instantly"] | |
| SEMANTIC TYPE EXAMPLES (use these, NOT specific column names): | |
| - Measures: sales_amount, sales_quantity, profit_margin, discount_percentage, customer_lifetime_value, order_count | |
| - Dimensions: product_name, customer_name, geographic_region, time_period, sales_channel, customer_segment, product_category | |
| - Dates: transaction_date, order_date, signup_date | |
| Create 3-5 strategic outliers that would make compelling demo talking points with clean visualizations. Place these comments immediately BEFORE the code that injects each outlier. | |
| CONNECTION TEMPLATE: | |
| ```python | |
| from dotenv import load_dotenv | |
| import os | |
| import snowflake.connector | |
| from snowflake_auth import get_snowflake_connection_params | |
| load_dotenv() | |
| conn_params = get_snowflake_connection_params() | |
| conn = snowflake.connector.connect( | |
| account=conn_params['account'], | |
| user=conn_params['user'], | |
| private_key=conn_params['private_key'], | |
| warehouse=conn_params['warehouse'], | |
| database=conn_params['database'], | |
| schema=os.getenv('SNOWFLAKE_SCHEMA') # This will be replaced with actual schema | |
| ) | |
| ``` | |
| Generate ONLY executable Python code, no explanations.""" | |
| if business_context: | |
| prompt += f"\n\nBusiness Context:\n{business_context}" | |
| return prompt | |