File size: 4,886 Bytes
4beb1ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f11a1eb
 
 
 
 
 
 
 
 
4beb1ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f11a1eb
 
 
 
 
 
 
 
 
4beb1ef
 
 
 
 
 
 
 
 
 
 
 
 
 
f11a1eb
 
 
 
 
 
 
 
4beb1ef
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from sqlalchemy import text
import logging

logger = logging.getLogger(__name__)

class SchemaInspector:
    def __init__(self, db_connector):
        self.db = db_connector
        
    def get_foreign_keys(self, table_name):
        """Get all foreign keys for a specific table"""
        # Query information_schema to get foreign key relationships
        query = text("""
            SELECT
                tc.table_schema, 
                tc.constraint_name, 
                tc.table_name, 
                kcu.column_name, 
                ccu.table_schema AS foreign_table_schema,
                ccu.table_name AS foreign_table_name,
                ccu.column_name AS foreign_column_name 
            FROM 
                information_schema.table_constraints AS tc 
                JOIN information_schema.key_column_usage AS kcu
                  ON tc.constraint_name = kcu.constraint_name
                  AND tc.table_schema = kcu.table_schema
                JOIN information_schema.constraint_column_usage AS ccu
                  ON ccu.constraint_name = tc.constraint_name
                  AND ccu.table_schema = tc.table_schema
            WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name=:table_name
        """)
        
        try:
            # Convert the TextClause to a string with parameters interpolated
            query_str = str(query.bindparams(table_name=table_name).compile(
                compile_kwargs={"literal_binds": True}
            ))
            result = self.db.execute_query(query_str)
            # Fix: Convert row tuples to dictionaries with column names
            column_names = ['table_schema', 'constraint_name', 'table_name', 'column_name', 
                           'foreign_table_schema', 'foreign_table_name', 'foreign_column_name']
            return [dict(zip(column_names, row)) for row in result]
        except Exception as e:
            logger.error(f"Error fetching foreign keys for {table_name}: {str(e)}")
            return []
            
    def get_referencing_tables(self, table_name):
        """Get all tables that reference the given table"""
        query = text("""
            SELECT
                tc.table_schema, 
                tc.constraint_name, 
                tc.table_name, 
                kcu.column_name, 
                ccu.table_schema AS foreign_table_schema,
                ccu.table_name AS foreign_table_name,
                ccu.column_name AS foreign_column_name 
            FROM 
                information_schema.table_constraints AS tc 
                JOIN information_schema.key_column_usage AS kcu
                  ON tc.constraint_name = kcu.constraint_name
                  AND tc.table_schema = kcu.table_schema
                JOIN information_schema.constraint_column_usage AS ccu
                  ON ccu.constraint_name = tc.constraint_name
                  AND ccu.table_schema = tc.table_schema
            WHERE tc.constraint_type = 'FOREIGN KEY' AND ccu.table_name=:table_name
        """)
        
        try:
            # Convert the TextClause to a string with parameters interpolated
            query_str = str(query.bindparams(table_name=table_name).compile(
                compile_kwargs={"literal_binds": True}
            ))
            result = self.db.execute_query(query_str)
            # Fix: Convert row tuples to dictionaries with column names
            column_names = ['table_schema', 'constraint_name', 'table_name', 'column_name', 
                           'foreign_table_schema', 'foreign_table_name', 'foreign_column_name']
            return [dict(zip(column_names, row)) for row in result]
        except Exception as e:
            logger.error(f"Error fetching referencing tables for {table_name}: {str(e)}")
            return []
            
    def get_column_info(self, table_name, column_name=None):
        """Get column information for a table"""
        query = text("""
            SELECT column_name, data_type, is_nullable, column_default
            FROM information_schema.columns
            WHERE table_name = :table_name
            AND (:column_name IS NULL OR column_name = :column_name)
        """)
        
        try:
            # Convert the TextClause to a string with parameters interpolated
            query_str = str(query.bindparams(table_name=table_name, column_name=column_name).compile(
                compile_kwargs={"literal_binds": True}
            ))
            result = self.db.execute_query(query_str)
            # Fix: Convert row tuples to dictionaries with column names
            column_names = ['column_name', 'data_type', 'is_nullable', 'column_default']
            return [dict(zip(column_names, row)) for row in result]
        except Exception as e:
            logger.error(f"Error fetching column info for {table_name}: {str(e)}")
            return []