Power-Seeker / convert.py
MohamedAAK's picture
Update convert.py
7e7bfec verified
import re
class SQLConverter:
def __init__(self):
self.words_to_replace = {
"ILIKE": "LIKE",
"NOW()": "SYSDATE",
"CURRENT_DATE": "SYSDATE",
"CURRENT_TIMESTAMP": "SYSTIMESTAMP",
#"DATE_PART": "EXTRACT"
# Add more replacements as needed
}
def replace_limit(self, text):
pattern = r"LIMIT\s+(\d+)"
replacement = "FETCH FIRST \\g<1> ROWS ONLY"
result_text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
return result_text
def replace_offset(self, text):
pattern = r"OFFSET\s+(\d+)"
replacement = "\\g<1> ROWS"
result_text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
return result_text
def convert_interval(self, interval_str):
match = re.match(r"INTERVAL '(\d+) (\w+)'", interval_str,flags=re.IGNORECASE)
if match:
numeric_part = match.group(1)
unit_of_time = match.group(2).lower()
unit_of_time = re.sub(r'(s)$', '', unit_of_time)
converted_interval = f"INTERVAL '{numeric_part}' {unit_of_time.upper()}"
return converted_interval
else:
raise ValueError(f"Invalid interval format: {interval_str}")
def convert_sql_interval(self, sql_query):
def replace_interval(match):
interval_str = match.group(0)
return self.convert_interval(interval_str)
interval_regex = r"INTERVAL '\d+ \w+'"
converted_query = re.sub(interval_regex, replace_interval, sql_query, flags=re.IGNORECASE)
return converted_query
def replace_extract_quarter(self, sql_query):
pattern = r"EXTRACT\(QUARTER FROM (\w+\.\w+)\)"
def replacement(match):
column = match.group(1)
#quarter = match.group(2)
return f"TO_CHAR({column}, 'Q')"
converted_query = re.sub(pattern, replacement, sql_query, flags=re.IGNORECASE)
return converted_query
def replace_date_trunc(self, sql_query):
pattern = r"date_trunc\(\s*'(\w+)'\s*,\s*([\w\.]+)\s*\)"
def replacement(match):
date_part = match.group(1).upper()
date_value = match.group(2)
return f"TRUNC({date_value}, '{date_part}')"
converted_query = re.sub(pattern, replacement, sql_query, flags=re.IGNORECASE)
return converted_query
def replace_words(self, text):
sorted_keys = sorted(self.words_to_replace.keys(), key=len, reverse=True)
pattern = re.compile("|".join(map(re.escape, sorted_keys)))
def replacer(match):
return self.words_to_replace[match.group(0)]
replaced_text = pattern.sub(replacer, text)
return replaced_text
def replace_date_part(self, sql_query):
pattern = r"date_part\(\s*'(\w+)'\s*,\s*([\w\.]+)\s*\)"
def replacement(match):
date_part = match.group(1).upper()
date_value = match.group(2)
return f"EXTRACT({date_part} FROM {date_value})"
converted_query = re.sub(pattern, replacement, sql_query, flags=re.IGNORECASE)
return converted_query
def replace_date_strings(self,sql_query):
# Define the regular expression pattern to match date strings
# The pattern assumes dates are in the format 'YYYY-MM-DD'
date_pattern = re.compile(r"'(\d{4}-\d{2}-\d{2})'")
# Define the function to replace the date strings
def replacement(match):
date_str = match.group(1)
# Check if the date is already inside a DATE function
if "DATE '" + date_str + "'" not in sql_query:
return f"DATE '{date_str}'"
return match.group(0)
# Use re.sub to replace all occurrences in the SQL query
converted_query = date_pattern.sub(replacement, sql_query)
return converted_query
def detect_aliases(self,query):
# Step 1: Extract the SELECT clause
select_clause_match = re.search(r'SELECT\s+(.*?)\s+FROM', query, re.IGNORECASE | re.DOTALL)
if not select_clause_match:
return {}
select_clause = select_clause_match.group(1).strip()
# Step 2: Split the SELECT clause by commas, considering nested parentheses
def split_select_clause(clause):
parts = []
bracket_level = 0
start = 0
for i, char in enumerate(clause):
if char == '(':
bracket_level += 1
elif char == ')':
bracket_level -= 1
elif char == ',' and bracket_level == 0:
parts.append(clause[start:i].strip())
start = i + 1
parts.append(clause[start:].strip())
return parts
select_parts = split_select_clause(select_clause)
# Step 3: Extract expressions and aliases
aliases = {}
alias_pattern = re.compile(r'(?P<expression>.+?)\s+AS\s+(?P<alias>\w+)', re.IGNORECASE | re.DOTALL)
for part in select_parts:
expr_alias_match = alias_pattern.search(part)
if expr_alias_match:
expression = expr_alias_match.group('expression').strip()
alias = expr_alias_match.group('alias').strip()
aliases[alias] = expression
return aliases
def replace_group_by_aliases(self,query):
# Detect aliases in the query
aliases = self.detect_aliases(query)
# Find all GROUP BY clauses
group_by_matches = re.finditer(r'GROUP\s+BY\s+(.*?)(?=\s+ORDER\s+BY|\s+LIMIT\s+|$)', query, re.IGNORECASE | re.DOTALL)
# Replace aliases in each GROUP BY clause
new_query = query
for match in group_by_matches:
group_by_clause = match.group(1).strip()
for alias, expression in aliases.items():
group_by_clause = re.sub(r'\b' + re.escape(alias) + r'\b', expression, group_by_clause)
new_query = new_query.replace(match.group(0), f'GROUP BY {group_by_clause}')
return new_query
def check_if(self, sql_query, keyword):
return keyword.lower() in sql_query.lower()
def convert_sql_oracle(self, sql):
if self.check_if(sql, "date_trunc"):
sql = self.replace_date_trunc(sql)
if self.check_if(sql, "QUARTER") and self.check_if(sql, "EXTRACT"):
sql = self.replace_extract_quarter(sql)
if self.check_if(sql, "INTERVAL"):
sql = self.convert_sql_interval(sql)
if self.check_if(sql, "LIMIT"):
sql = self.replace_limit(sql)
if self.check_if(sql, "OFFSET"):
sql = self.replace_offset(sql)
if self.check_if(sql, "date_part"):
sql = self.replace_date_part(sql)
return self.replace_group_by_aliases(self.replace_date_strings(self.replace_words(sql)))
# Example usage
#sql_query = """
#SELECT c.FirstName, c.LastName, (CAST(SUM(CASE WHEN o.PurchaseDate BETWEEN '2023-01-01' AND '2023-04-30' THEN p.Price END) AS FLOAT) / NULLIF(SUM(CASE WHEN o.PurchaseDate BETWEEN '2023-01-01' AND '2023-04-30' THEN p.Price END), 0)) * 100 AS percentage_change FROM Customers c JOIN Orders o ON c.CustomerID = o.CustomerID JOIN Products p ON o.ProductID = p.ProductID GROUP BY c.FirstName, c.LastName HAVING percentage_change > 50 ORDER BY percentage_change DESC;
#"""
#converter = SQLConverter()
#converted_sql = converter.convert_sql_oracle(sql_query)
#print(converted_sql)