Spaces:
Sleeping
Sleeping
| """ | |
| SQL Query Validator and Optimizer | |
| Provides validation, optimization suggestions, and query analysis | |
| """ | |
| import re | |
| from typing import Dict, List, Tuple | |
| class SQLValidator: | |
| """Validates and analyzes SQL queries""" | |
| def __init__(self, schema: Dict): | |
| self.schema = schema | |
| self.table_names = set(schema.keys()) | |
| self.column_map = {} | |
| # Build column map for quick lookup | |
| for table, columns in schema.items(): | |
| for col in columns: | |
| col_name = col['name'].lower() | |
| if col_name not in self.column_map: | |
| self.column_map[col_name] = [] | |
| self.column_map[col_name].append(table) | |
| def validate(self, sql: str) -> Dict[str, any]: | |
| """ | |
| Validate SQL query against schema | |
| Returns dict with validation results | |
| """ | |
| results = { | |
| "valid": True, | |
| "errors": [], | |
| "warnings": [], | |
| "suggestions": [], | |
| "query_type": self._detect_query_type(sql) | |
| } | |
| # Check for common SQL anti-patterns | |
| self._check_null_comparison(sql, results) | |
| self._check_select_star(sql, results) | |
| self._check_missing_where(sql, results) | |
| self._check_implicit_joins(sql, results) | |
| self._check_table_names(sql, results) | |
| # If we found errors, mark as invalid | |
| if results["errors"]: | |
| results["valid"] = False | |
| return results | |
| def _detect_query_type(self, sql: str) -> str: | |
| """Detect the type of query""" | |
| sql_upper = sql.upper() | |
| if "WITH" in sql_upper and "AS" in sql_upper: | |
| return "cte" | |
| elif any(func in sql_upper for func in ["ROW_NUMBER(", "RANK(", "DENSE_RANK(", "PARTITION BY"]): | |
| return "window" | |
| elif "GROUP BY" in sql_upper or any(func in sql_upper for func in ["COUNT(", "SUM(", "AVG(", "MAX(", "MIN("]): | |
| return "aggregate" | |
| elif "JOIN" in sql_upper: | |
| return "join" | |
| elif "UNION" in sql_upper: | |
| return "union" | |
| elif "EXISTS" in sql_upper or "IN (SELECT" in sql_upper: | |
| return "subquery" | |
| else: | |
| return "simple" | |
| def _check_null_comparison(self, sql: str, results: Dict): | |
| """Check for = NULL instead of IS NULL""" | |
| if re.search(r"=\s*NULL|!=\s*NULL|<>\s*NULL", sql, re.IGNORECASE): | |
| results["errors"].append( | |
| "Use IS NULL or IS NOT NULL instead of = NULL or != NULL" | |
| ) | |
| def _check_select_star(self, sql: str, results: Dict): | |
| """Warn about SELECT *""" | |
| if re.search(r"SELECT\s+\*", sql, re.IGNORECASE): | |
| results["warnings"].append( | |
| "Consider specifying column names instead of SELECT * for better performance" | |
| ) | |
| def _check_missing_where(self, sql: str, results: Dict): | |
| """Check for queries without WHERE clause on large tables""" | |
| sql_upper = sql.upper() | |
| if "DELETE" in sql_upper or "UPDATE" in sql_upper: | |
| if "WHERE" not in sql_upper: | |
| results["errors"].append( | |
| "DELETE or UPDATE without WHERE clause will affect all rows" | |
| ) | |
| def _check_implicit_joins(self, sql: str, results: Dict): | |
| """Check for implicit joins (comma-separated tables)""" | |
| # Look for FROM table1, table2 pattern | |
| if re.search(r"FROM\s+\w+\s*,\s*\w+", sql, re.IGNORECASE): | |
| results["suggestions"].append( | |
| "Consider using explicit JOIN syntax instead of comma-separated tables" | |
| ) | |
| def _check_table_names(self, sql: str, results: Dict): | |
| """Check if referenced tables exist in schema""" | |
| # Extract table names from FROM and JOIN clauses | |
| from_pattern = r"FROM\s+(\w+)" | |
| join_pattern = r"JOIN\s+(\w+)" | |
| tables_in_query = set() | |
| for match in re.finditer(from_pattern, sql, re.IGNORECASE): | |
| tables_in_query.add(match.group(1).lower()) | |
| for match in re.finditer(join_pattern, sql, re.IGNORECASE): | |
| tables_in_query.add(match.group(1).lower()) | |
| # Check against schema | |
| schema_tables = set(t.lower() for t in self.table_names) | |
| invalid_tables = tables_in_query - schema_tables | |
| if invalid_tables: | |
| results["errors"].append( | |
| f"Table(s) not found in schema: {', '.join(invalid_tables)}" | |
| ) | |
| def suggest_optimizations(self, sql: str) -> List[str]: | |
| """Suggest optimizations for the query""" | |
| suggestions = [] | |
| sql_upper = sql.upper() | |
| # Check for NOT IN with subquery | |
| if "NOT IN (SELECT" in sql_upper: | |
| suggestions.append( | |
| "Consider using NOT EXISTS instead of NOT IN for better NULL handling" | |
| ) | |
| # Check for multiple subqueries | |
| subquery_count = sql_upper.count("(SELECT") | |
| if subquery_count > 2: | |
| suggestions.append( | |
| "Consider using CTEs (WITH clause) for better readability with multiple subqueries" | |
| ) | |
| # Check for DISTINCT | |
| if "DISTINCT" in sql_upper and "GROUP BY" not in sql_upper: | |
| suggestions.append( | |
| "DISTINCT can be expensive. Consider if GROUP BY might be more appropriate" | |
| ) | |
| # Check for ORDER BY in subquery | |
| if re.search(r"\(SELECT.*ORDER BY.*\)", sql, re.IGNORECASE): | |
| suggestions.append( | |
| "ORDER BY in subquery may be ignored. Apply ORDER BY to outer query" | |
| ) | |
| return suggestions | |
| def format_sql(self, sql: str) -> str: | |
| """Basic SQL formatting for readability""" | |
| # This is a simple formatter - for production use a proper SQL formatter | |
| formatted = sql.strip() | |
| # Add newlines before major keywords | |
| keywords = ['SELECT', 'FROM', 'WHERE', 'GROUP BY', 'HAVING', 'ORDER BY', 'LIMIT'] | |
| for keyword in keywords: | |
| formatted = re.sub( | |
| f'\\b{keyword}\\b', | |
| f'\n{keyword}', | |
| formatted, | |
| flags=re.IGNORECASE | |
| ) | |
| return formatted.strip() |