File size: 7,355 Bytes
b3ebb38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from typing import Dict, List, Union
import sqlite3
from pathlib import Path
import re
import pandas as pd

class DatabaseManager:
    """Classe responsável por gerenciar o banco de dados SQLite com múltiplas tabelas"""

    def __init__(self, csv_sources: Union[str, List[str]], db_path: str = "data.db"):
        """

        Inicializa o gerenciador de banco de dados



        Args:

            csv_sources: Caminho para arquivo CSV, lista de caminhos, ou diretório contendo CSVs

            db_path: Caminho para o banco de dados SQLite

        """
        self.db_path = db_path
        self.tables_info = {}  # Dicionário {nome_tabela: info_tabela}

        # Processa as fontes CSV
        self.csv_files = self._process_csv_sources(csv_sources)

        # Carrega todos os CSVs no SQLite
        self._load_csvs_to_sqlite()

        # Obtém informações de todas as tabelas
        self._get_all_tables_schema()

    def _process_csv_sources(self, csv_sources: Union[str, List[str]]) -> Dict[str, str]:
        """

        Processa as fontes CSV e retorna dicionário {nome_tabela: caminho_arquivo}

        """
        csv_files = {}

        if isinstance(csv_sources, str):
            # Se é string, pode ser arquivo ou diretório
            path = Path(csv_sources)

            if path.is_file() and path.suffix.lower() == '.csv':
                # É um arquivo CSV único
                table_name = self._generate_table_name(path.stem)
                csv_files[table_name] = str(path)

            elif path.is_dir():
                # É um diretório, busca todos os CSVs
                for csv_file in path.glob("*.csv"):
                    table_name = self._generate_table_name(csv_file.stem)
                    csv_files[table_name] = str(csv_file)

            else:
                raise ValueError(f"Caminho não é arquivo CSV válido nem diretório: {csv_sources}")

        elif isinstance(csv_sources, list):
            # É uma lista de caminhos
            for csv_path in csv_sources:
                path = Path(csv_path)
                if path.is_file() and path.suffix.lower() == '.csv':
                    table_name = self._generate_table_name(path.stem)
                    csv_files[table_name] = str(path)
                else:
                    print(f"Ignorando arquivo não-CSV: {csv_path}")

        if not csv_files:
            raise ValueError("Nenhum arquivo CSV válido encontrado")

        print(f"Encontrados {len(csv_files)} arquivo(s) CSV:")
        for table_name, file_path in csv_files.items():
            print(f"   - {table_name}{file_path}")

        return csv_files

    def _generate_table_name(self, filename: str) -> str:
        """

        Gera nome de tabela válido a partir do nome do arquivo

        """
        # Remove caracteres especiais e espaços, converte para minúsculas
        table_name = re.sub(r'[^\w\s]', '', filename)
        table_name = re.sub(r'\s+', '_', table_name)
        table_name = table_name.lower().strip('_')

        # Garante que não comece com número
        if table_name[0].isdigit():
            table_name = f"tabela_{table_name}"

        return table_name

    def _load_csvs_to_sqlite(self):
      """Carrega todos os arquivos CSV no banco de dados SQLite"""
      try:
          conn = sqlite3.connect(self.db_path)

          for table_name, csv_path in self.csv_files.items():
              print(f"Carregando {csv_path} → tabela '{table_name}'...")

              # Lê CSV
              df = pd.read_csv(csv_path)

              # Renomeia colunas: espaços viram underscores
              df.columns = [col.strip().replace(" ", "_") for col in df.columns]

              # Carrega no SQLite
              df.to_sql(table_name, conn, if_exists='replace', index=False)

              print(f"{len(df)} linhas, {len(df.columns)} colunas")

          conn.close()
          print(f"Todos os CSVs carregados no banco: {self.db_path}")

      except Exception as e:
          raise Exception(f"Erro ao carregar CSVs no SQLite: {str(e)}")

    def _get_all_tables_schema(self):
        """Obtém informações do schema de todas as tabelas"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        for table_name in self.csv_files.keys():
            # Obtém informações das colunas
            cursor.execute(f"PRAGMA table_info({table_name})")
            columns = cursor.fetchall()

            # Obtém dados de exemplo
            cursor.execute(f"SELECT * FROM {table_name} LIMIT 3")
            sample_data = cursor.fetchall()

            # Obtém contagem total
            cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
            total_rows = cursor.fetchone()[0]

            # Monta informações da tabela
            table_info = {
                'columns': columns,
                'sample_data': sample_data,
                'total_rows': total_rows,
                'csv_source': self.csv_files[table_name]
            }

            self.tables_info[table_name] = table_info

        conn.close()

    def get_database_schema(self) -> str:
        """Retorna schema completo do banco de dados"""
        schema_info = "=== ESQUEMA DO BANCO DE DADOS ===\n\n"

        for table_name, info in self.tables_info.items():
            schema_info += f"TABELA: {table_name}\n"
            schema_info += f"Fonte: {info['csv_source']}\n"
            schema_info += f"Total de registros: {info['total_rows']}\n"
            schema_info += "Colunas:\n"

            for col in info['columns']:
                schema_info += f"  - {col[1]} ({col[2]})\n"

            schema_info += "\nDados de exemplo:\n"
            column_names = [col[1] for col in info['columns']]
            schema_info += f"  {column_names}\n"

            for row in info['sample_data']:
                schema_info += f"  {list(row)}\n"

            schema_info += "\n" + "="*50 + "\n\n"

        return schema_info

    def get_tables_list(self) -> List[str]:
        """Retorna lista dos nomes das tabelas"""
        return list(self.tables_info.keys())

    def get_table_info(self, table_name: str) -> Dict:
        """Retorna informações de uma tabela específica"""
        return self.tables_info.get(table_name, {})

    def execute_sql_query(self, sql_query: str) -> List[Dict]:
        """Executa consulta SQL e retorna resultados"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            cursor.execute(sql_query)
            results = cursor.fetchall()

            print(f'{results=}')
            print(f'{cursor.description=}')

            # Obtém nomes das colunas
            column_names = [description[0] for description in cursor.description]

            print(f'{column_names=}')
            conn.close()

            # Converte para lista de dicionários
            result_dicts = []
            for row in results:
                result_dicts.append(dict(zip(column_names, row)))

            return result_dicts

        except Exception as e:
            raise Exception(f"Erro ao executar SQL: {str(e)}")