codeflow-ai / export /dbt_exporter.py
unknown
Initial commit: CodeFlow AI - NL to SQL Generator
7814c1f
"""
dbt Exporter for CodeFlow AI
Generates production-ready dbt models from SQL queries
"""
import io
import zipfile
from datetime import datetime
from typing import Dict, Optional
from jinja2 import Template
class DBTExporter:
"""
Export SQL queries as dbt models
Generates model.sql and schema.yml files
"""
def __init__(self):
# Use {0}, {1} placeholders instead of Jinja {{ }} to avoid conflict with dbt
self.model_template = """{{{{
config(
materialized='{materialization}',
schema='{schema}',
tags=['{tags}']
)
}}}}
/*
Model: {model_name}
Description: {description}
Generated: {generated_date}
Generated by: CodeFlow AI
*/
{sql_query}
"""
self.schema_template = """version: 2
models:
- name: {{ model_name }}
description: {{ description }}
columns:
{% for column in columns %}
- name: {{ column.name }}
description: {{ column.description }}
{% if column.tests %}
tests:
{% for test in column.tests %}
- {{ test }}
{% endfor %}
{% endif %}
{% endfor %}
"""
def export_model(
self,
sql: str,
model_name: str,
description: str = "",
materialization: str = "table",
schema: str = "analytics",
tags: str = "codeflow_generated",
columns: Optional[list] = None
) -> Dict[str, str]:
"""
Export SQL as dbt model
Args:
sql: SQL query to export
model_name: Name for the dbt model
description: Model description
materialization: dbt materialization (table, view, incremental)
schema: Target schema name
tags: Model tags
columns: List of column definitions
Returns:
Dict with model.sql and schema.yml content
"""
# Clean SQL (remove CodeFlow comments)
clean_sql = self._clean_sql(sql)
# Generate model.sql using format() instead of Jinja2 to avoid dbt {{ }} conflict
model_content = self.model_template.format(
materialization=materialization,
schema=schema,
tags=tags,
model_name=model_name,
description=description or f"Generated model: {model_name}",
generated_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
sql_query=clean_sql
)
# Generate schema.yml
if not columns:
# Auto-detect columns from SQL (basic)
columns = self._detect_columns(clean_sql)
schema_template = Template(self.schema_template)
schema_content = schema_template.render(
model_name=model_name,
description=description or f"Generated model: {model_name}",
columns=columns
)
return {
"model.sql": model_content,
"schema.yml": schema_content
}
def _clean_sql(self, sql: str) -> str:
"""Remove CodeFlow-specific comments from SQL"""
lines = []
for line in sql.split('\n'):
stripped = line.strip()
# Skip CodeFlow header comments
if stripped.startswith('-- CodeFlow'):
continue
if stripped.startswith('-- Dialect:'):
continue
if stripped.startswith('-- Query Type:'):
continue
lines.append(line)
return '\n'.join(lines).strip()
def _detect_columns(self, sql: str) -> list:
"""
Basic column detection from SQL
Looks for SELECT clause columns
"""
columns = []
# Find SELECT clause (very basic parser)
sql_upper = sql.upper()
if 'SELECT' not in sql_upper:
return columns
# Extract between SELECT and FROM
select_idx = sql_upper.find('SELECT')
from_idx = sql_upper.find('FROM', select_idx)
if from_idx == -1:
from_idx = len(sql)
select_clause = sql[select_idx + 6:from_idx].strip()
# Split by comma (basic - doesn't handle nested commas)
parts = select_clause.split(',')
for part in parts[:10]: # Limit to first 10 columns
part = part.strip()
if not part:
continue
# Extract column name (after AS if present)
if ' AS ' in part.upper():
col_name = part.upper().split(' AS ')[-1].strip()
elif ' ' in part:
# Take last word
col_name = part.split()[-1].strip()
else:
col_name = part
# Clean column name
col_name = col_name.replace('`', '').replace('"', '').replace("'", "")
if col_name and col_name != '*':
columns.append({
"name": col_name.lower(),
"description": f"Column: {col_name}",
"tests": ["not_null"] if "id" in col_name.lower() else []
})
return columns
def create_zip(self, files: Dict[str, str], model_name: str) -> bytes:
"""
Create a ZIP file containing the dbt files
Args:
files: Dict of filename -> content
model_name: Model name for directory structure
Returns:
ZIP file as bytes
"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# Create models directory structure
for filename, content in files.items():
if filename.endswith('.sql'):
path = f"models/{model_name}.sql"
elif filename.endswith('.yml'):
path = f"models/schema.yml"
else:
path = f"models/{filename}"
zip_file.writestr(path, content)
# Add README
readme = f"""# dbt Model: {model_name}
Generated by CodeFlow AI on {datetime.now().strftime("%Y-%m-%d")}
## Files Included
- `models/{model_name}.sql` - The dbt model
- `models/schema.yml` - Model and column documentation
## Usage
1. Copy these files to your dbt project's `models/` directory
2. Run `dbt run --select {model_name}` to execute
3. Run `dbt test --select {model_name}` to run tests
## Configuration
You can modify the model configuration at the top of {model_name}.sql:
- materialization: table, view, or incremental
- schema: target schema name
- tags: for organizing models
For more information, visit: https://docs.getdbt.com/
"""
zip_file.writestr("README.md", readme)
zip_buffer.seek(0)
return zip_buffer.getvalue()
# Singleton instance
_dbt_exporter = None
def get_dbt_exporter() -> DBTExporter:
"""Get or create the global dbt exporter instance"""
global _dbt_exporter
if _dbt_exporter is None:
_dbt_exporter = DBTExporter()
return _dbt_exporter