Spaces:
Runtime error
Runtime error
| import re | |
| class SQLConverter: | |
| def __init__(self): | |
| self.words_to_replace = { | |
| "ILIKE": "LIKE", | |
| "NOW()": "SYSDATE", | |
| "CURRENT_DATE": "SYSDATE", | |
| "CURRENT_TIMESTAMP": "SYSTIMESTAMP", | |
| #"DATE_PART": "EXTRACT" | |
| # Add more replacements as needed | |
| } | |
| def replace_limit(self, text): | |
| pattern = r"LIMIT\s+(\d+)" | |
| replacement = "FETCH FIRST \\g<1> ROWS ONLY" | |
| result_text = re.sub(pattern, replacement, text, flags=re.IGNORECASE) | |
| return result_text | |
| def replace_offset(self, text): | |
| pattern = r"OFFSET\s+(\d+)" | |
| replacement = "\\g<1> ROWS" | |
| result_text = re.sub(pattern, replacement, text, flags=re.IGNORECASE) | |
| return result_text | |
| def convert_interval(self, interval_str): | |
| match = re.match(r"INTERVAL '(\d+) (\w+)'", interval_str,flags=re.IGNORECASE) | |
| if match: | |
| numeric_part = match.group(1) | |
| unit_of_time = match.group(2).lower() | |
| unit_of_time = re.sub(r'(s)$', '', unit_of_time) | |
| converted_interval = f"INTERVAL '{numeric_part}' {unit_of_time.upper()}" | |
| return converted_interval | |
| else: | |
| raise ValueError(f"Invalid interval format: {interval_str}") | |
| def convert_sql_interval(self, sql_query): | |
| def replace_interval(match): | |
| interval_str = match.group(0) | |
| return self.convert_interval(interval_str) | |
| interval_regex = r"INTERVAL '\d+ \w+'" | |
| converted_query = re.sub(interval_regex, replace_interval, sql_query, flags=re.IGNORECASE) | |
| return converted_query | |
| def replace_extract_quarter(self, sql_query): | |
| pattern = r"EXTRACT\(QUARTER FROM (\w+\.\w+)\)" | |
| def replacement(match): | |
| column = match.group(1) | |
| #quarter = match.group(2) | |
| return f"TO_CHAR({column}, 'Q')" | |
| converted_query = re.sub(pattern, replacement, sql_query, flags=re.IGNORECASE) | |
| return converted_query | |
| def replace_date_trunc(self, sql_query): | |
| pattern = r"date_trunc\(\s*'(\w+)'\s*,\s*([\w\.]+)\s*\)" | |
| def replacement(match): | |
| date_part = match.group(1).upper() | |
| date_value = match.group(2) | |
| return f"TRUNC({date_value}, '{date_part}')" | |
| converted_query = re.sub(pattern, replacement, sql_query, flags=re.IGNORECASE) | |
| return converted_query | |
| def replace_words(self, text): | |
| sorted_keys = sorted(self.words_to_replace.keys(), key=len, reverse=True) | |
| pattern = re.compile("|".join(map(re.escape, sorted_keys))) | |
| def replacer(match): | |
| return self.words_to_replace[match.group(0)] | |
| replaced_text = pattern.sub(replacer, text) | |
| return replaced_text | |
| def replace_date_part(self, sql_query): | |
| pattern = r"date_part\(\s*'(\w+)'\s*,\s*([\w\.]+)\s*\)" | |
| def replacement(match): | |
| date_part = match.group(1).upper() | |
| date_value = match.group(2) | |
| return f"EXTRACT({date_part} FROM {date_value})" | |
| converted_query = re.sub(pattern, replacement, sql_query, flags=re.IGNORECASE) | |
| return converted_query | |
| def replace_date_strings(self,sql_query): | |
| # Define the regular expression pattern to match date strings | |
| # The pattern assumes dates are in the format 'YYYY-MM-DD' | |
| date_pattern = re.compile(r"'(\d{4}-\d{2}-\d{2})'") | |
| # Define the function to replace the date strings | |
| def replacement(match): | |
| date_str = match.group(1) | |
| # Check if the date is already inside a DATE function | |
| if "DATE '" + date_str + "'" not in sql_query: | |
| return f"DATE '{date_str}'" | |
| return match.group(0) | |
| # Use re.sub to replace all occurrences in the SQL query | |
| converted_query = date_pattern.sub(replacement, sql_query) | |
| return converted_query | |
| def detect_aliases(self,query): | |
| # Step 1: Extract the SELECT clause | |
| select_clause_match = re.search(r'SELECT\s+(.*?)\s+FROM', query, re.IGNORECASE | re.DOTALL) | |
| if not select_clause_match: | |
| return {} | |
| select_clause = select_clause_match.group(1).strip() | |
| # Step 2: Split the SELECT clause by commas, considering nested parentheses | |
| def split_select_clause(clause): | |
| parts = [] | |
| bracket_level = 0 | |
| start = 0 | |
| for i, char in enumerate(clause): | |
| if char == '(': | |
| bracket_level += 1 | |
| elif char == ')': | |
| bracket_level -= 1 | |
| elif char == ',' and bracket_level == 0: | |
| parts.append(clause[start:i].strip()) | |
| start = i + 1 | |
| parts.append(clause[start:].strip()) | |
| return parts | |
| select_parts = split_select_clause(select_clause) | |
| # Step 3: Extract expressions and aliases | |
| aliases = {} | |
| alias_pattern = re.compile(r'(?P<expression>.+?)\s+AS\s+(?P<alias>\w+)', re.IGNORECASE | re.DOTALL) | |
| for part in select_parts: | |
| expr_alias_match = alias_pattern.search(part) | |
| if expr_alias_match: | |
| expression = expr_alias_match.group('expression').strip() | |
| alias = expr_alias_match.group('alias').strip() | |
| aliases[alias] = expression | |
| return aliases | |
| def replace_group_by_aliases(self,query): | |
| # Detect aliases in the query | |
| aliases = self.detect_aliases(query) | |
| # Find all GROUP BY clauses | |
| group_by_matches = re.finditer(r'GROUP\s+BY\s+(.*?)(?=\s+ORDER\s+BY|\s+LIMIT\s+|$)', query, re.IGNORECASE | re.DOTALL) | |
| # Replace aliases in each GROUP BY clause | |
| new_query = query | |
| for match in group_by_matches: | |
| group_by_clause = match.group(1).strip() | |
| for alias, expression in aliases.items(): | |
| group_by_clause = re.sub(r'\b' + re.escape(alias) + r'\b', expression, group_by_clause) | |
| new_query = new_query.replace(match.group(0), f'GROUP BY {group_by_clause}') | |
| return new_query | |
| def check_if(self, sql_query, keyword): | |
| return keyword.lower() in sql_query.lower() | |
| def convert_sql_oracle(self, sql): | |
| if self.check_if(sql, "date_trunc"): | |
| sql = self.replace_date_trunc(sql) | |
| if self.check_if(sql, "QUARTER") and self.check_if(sql, "EXTRACT"): | |
| sql = self.replace_extract_quarter(sql) | |
| if self.check_if(sql, "INTERVAL"): | |
| sql = self.convert_sql_interval(sql) | |
| if self.check_if(sql, "LIMIT"): | |
| sql = self.replace_limit(sql) | |
| if self.check_if(sql, "OFFSET"): | |
| sql = self.replace_offset(sql) | |
| if self.check_if(sql, "date_part"): | |
| sql = self.replace_date_part(sql) | |
| return self.replace_group_by_aliases(self.replace_date_strings(self.replace_words(sql))) | |
| # Example usage | |
| #sql_query = """ | |
| #SELECT c.FirstName, c.LastName, (CAST(SUM(CASE WHEN o.PurchaseDate BETWEEN '2023-01-01' AND '2023-04-30' THEN p.Price END) AS FLOAT) / NULLIF(SUM(CASE WHEN o.PurchaseDate BETWEEN '2023-01-01' AND '2023-04-30' THEN p.Price END), 0)) * 100 AS percentage_change FROM Customers c JOIN Orders o ON c.CustomerID = o.CustomerID JOIN Products p ON o.ProductID = p.ProductID GROUP BY c.FirstName, c.LastName HAVING percentage_change > 50 ORDER BY percentage_change DESC; | |
| #""" | |
| #converter = SQLConverter() | |
| #converted_sql = converter.convert_sql_oracle(sql_query) | |
| #print(converted_sql) | |