Spaces:
Sleeping
Sleeping
| """ | |
| dbt Exporter for CodeFlow AI | |
| Generates production-ready dbt models from SQL queries | |
| """ | |
| import io | |
| import zipfile | |
| from datetime import datetime | |
| from typing import Dict, Optional | |
| from jinja2 import Template | |
| class DBTExporter: | |
| """ | |
| Export SQL queries as dbt models | |
| Generates model.sql and schema.yml files | |
| """ | |
| def __init__(self): | |
| # Use {0}, {1} placeholders instead of Jinja {{ }} to avoid conflict with dbt | |
| self.model_template = """{{{{ | |
| config( | |
| materialized='{materialization}', | |
| schema='{schema}', | |
| tags=['{tags}'] | |
| ) | |
| }}}} | |
| /* | |
| Model: {model_name} | |
| Description: {description} | |
| Generated: {generated_date} | |
| Generated by: CodeFlow AI | |
| */ | |
| {sql_query} | |
| """ | |
| self.schema_template = """version: 2 | |
| models: | |
| - name: {{ model_name }} | |
| description: {{ description }} | |
| columns: | |
| {% for column in columns %} | |
| - name: {{ column.name }} | |
| description: {{ column.description }} | |
| {% if column.tests %} | |
| tests: | |
| {% for test in column.tests %} | |
| - {{ test }} | |
| {% endfor %} | |
| {% endif %} | |
| {% endfor %} | |
| """ | |
| def export_model( | |
| self, | |
| sql: str, | |
| model_name: str, | |
| description: str = "", | |
| materialization: str = "table", | |
| schema: str = "analytics", | |
| tags: str = "codeflow_generated", | |
| columns: Optional[list] = None | |
| ) -> Dict[str, str]: | |
| """ | |
| Export SQL as dbt model | |
| Args: | |
| sql: SQL query to export | |
| model_name: Name for the dbt model | |
| description: Model description | |
| materialization: dbt materialization (table, view, incremental) | |
| schema: Target schema name | |
| tags: Model tags | |
| columns: List of column definitions | |
| Returns: | |
| Dict with model.sql and schema.yml content | |
| """ | |
| # Clean SQL (remove CodeFlow comments) | |
| clean_sql = self._clean_sql(sql) | |
| # Generate model.sql using format() instead of Jinja2 to avoid dbt {{ }} conflict | |
| model_content = self.model_template.format( | |
| materialization=materialization, | |
| schema=schema, | |
| tags=tags, | |
| model_name=model_name, | |
| description=description or f"Generated model: {model_name}", | |
| generated_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| sql_query=clean_sql | |
| ) | |
| # Generate schema.yml | |
| if not columns: | |
| # Auto-detect columns from SQL (basic) | |
| columns = self._detect_columns(clean_sql) | |
| schema_template = Template(self.schema_template) | |
| schema_content = schema_template.render( | |
| model_name=model_name, | |
| description=description or f"Generated model: {model_name}", | |
| columns=columns | |
| ) | |
| return { | |
| "model.sql": model_content, | |
| "schema.yml": schema_content | |
| } | |
| def _clean_sql(self, sql: str) -> str: | |
| """Remove CodeFlow-specific comments from SQL""" | |
| lines = [] | |
| for line in sql.split('\n'): | |
| stripped = line.strip() | |
| # Skip CodeFlow header comments | |
| if stripped.startswith('-- CodeFlow'): | |
| continue | |
| if stripped.startswith('-- Dialect:'): | |
| continue | |
| if stripped.startswith('-- Query Type:'): | |
| continue | |
| lines.append(line) | |
| return '\n'.join(lines).strip() | |
| def _detect_columns(self, sql: str) -> list: | |
| """ | |
| Basic column detection from SQL | |
| Looks for SELECT clause columns | |
| """ | |
| columns = [] | |
| # Find SELECT clause (very basic parser) | |
| sql_upper = sql.upper() | |
| if 'SELECT' not in sql_upper: | |
| return columns | |
| # Extract between SELECT and FROM | |
| select_idx = sql_upper.find('SELECT') | |
| from_idx = sql_upper.find('FROM', select_idx) | |
| if from_idx == -1: | |
| from_idx = len(sql) | |
| select_clause = sql[select_idx + 6:from_idx].strip() | |
| # Split by comma (basic - doesn't handle nested commas) | |
| parts = select_clause.split(',') | |
| for part in parts[:10]: # Limit to first 10 columns | |
| part = part.strip() | |
| if not part: | |
| continue | |
| # Extract column name (after AS if present) | |
| if ' AS ' in part.upper(): | |
| col_name = part.upper().split(' AS ')[-1].strip() | |
| elif ' ' in part: | |
| # Take last word | |
| col_name = part.split()[-1].strip() | |
| else: | |
| col_name = part | |
| # Clean column name | |
| col_name = col_name.replace('`', '').replace('"', '').replace("'", "") | |
| if col_name and col_name != '*': | |
| columns.append({ | |
| "name": col_name.lower(), | |
| "description": f"Column: {col_name}", | |
| "tests": ["not_null"] if "id" in col_name.lower() else [] | |
| }) | |
| return columns | |
| def create_zip(self, files: Dict[str, str], model_name: str) -> bytes: | |
| """ | |
| Create a ZIP file containing the dbt files | |
| Args: | |
| files: Dict of filename -> content | |
| model_name: Model name for directory structure | |
| Returns: | |
| ZIP file as bytes | |
| """ | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| # Create models directory structure | |
| for filename, content in files.items(): | |
| if filename.endswith('.sql'): | |
| path = f"models/{model_name}.sql" | |
| elif filename.endswith('.yml'): | |
| path = f"models/schema.yml" | |
| else: | |
| path = f"models/{filename}" | |
| zip_file.writestr(path, content) | |
| # Add README | |
| readme = f"""# dbt Model: {model_name} | |
| Generated by CodeFlow AI on {datetime.now().strftime("%Y-%m-%d")} | |
| ## Files Included | |
| - `models/{model_name}.sql` - The dbt model | |
| - `models/schema.yml` - Model and column documentation | |
| ## Usage | |
| 1. Copy these files to your dbt project's `models/` directory | |
| 2. Run `dbt run --select {model_name}` to execute | |
| 3. Run `dbt test --select {model_name}` to run tests | |
| ## Configuration | |
| You can modify the model configuration at the top of {model_name}.sql: | |
| - materialization: table, view, or incremental | |
| - schema: target schema name | |
| - tags: for organizing models | |
| For more information, visit: https://docs.getdbt.com/ | |
| """ | |
| zip_file.writestr("README.md", readme) | |
| zip_buffer.seek(0) | |
| return zip_buffer.getvalue() | |
| # Singleton instance | |
| _dbt_exporter = None | |
| def get_dbt_exporter() -> DBTExporter: | |
| """Get or create the global dbt exporter instance""" | |
| global _dbt_exporter | |
| if _dbt_exporter is None: | |
| _dbt_exporter = DBTExporter() | |
| return _dbt_exporter |