File size: 30,053 Bytes
e982206
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
"""
Nó para processamento de arquivos CSV
"""
import os
import shutil
import logging
import time
import pandas as pd
import numpy as np
from typing import Dict, Any, TypedDict, List, Optional
from sqlalchemy.types import DateTime, Integer, Float, String, Boolean
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp

from utils.config import UPLOADED_CSV_PATH
from utils.object_manager import get_object_manager
import numpy as np

def analyze_numeric_column(sample_values: pd.Series) -> Dict[str, Any]:
    """
    Análise otimizada para detectar se coluna é numérica

    Args:
        sample_values: Amostra dos valores da coluna

    Returns:
        Dicionário com análise numérica
    """
    analysis = {
        "is_numeric": False,
        "is_integer": False,
        "numeric_ratio": 0.0,
        "has_decimals": False
    }

    if len(sample_values) == 0:
        return analysis

    # Converte para string e limpa valores
    str_values = sample_values.astype(str).str.strip()

    # Remove valores vazios e nulos
    clean_values = str_values[
        ~str_values.isin(['', 'nan', 'null', 'none', '-', 'NaN', 'NULL'])
    ]

    if len(clean_values) == 0:
        return analysis

    # Tenta conversão numérica vetorizada
    try:
        # Substitui vírgulas por pontos para formato brasileiro
        numeric_values = clean_values.str.replace(',', '.', regex=False)

        # Tenta conversão para float
        converted = pd.to_numeric(numeric_values, errors='coerce')

        # Conta valores válidos
        valid_count = converted.notna().sum()
        total_count = len(clean_values)

        analysis["numeric_ratio"] = valid_count / total_count if total_count > 0 else 0

        # Se mais de 80% são números válidos, considera numérico
        if analysis["numeric_ratio"] > 0.8:
            analysis["is_numeric"] = True

            # Verifica se são inteiros
            valid_numbers = converted.dropna()
            if len(valid_numbers) > 0:
                # Verifica se todos os números válidos são inteiros
                analysis["is_integer"] = all(
                    float(x).is_integer() for x in valid_numbers
                    if not pd.isna(x) and abs(x) < 1e15  # Evita overflow
                )
                analysis["has_decimals"] = not analysis["is_integer"]

    except Exception as e:
        logging.debug(f"Erro na análise numérica: {e}")
        analysis["is_numeric"] = False

    return analysis

def detect_date_format(date_string: str) -> str:
    """
    Detecta o formato mais provável de uma string de data

    Args:
        date_string: String para analisar

    Returns:
        'iso', 'american', 'brazilian' ou 'auto'
    """
    date_str = str(date_string).strip()

    # Formato ISO (YYYY-MM-DD ou YYYY/MM/DD)
    if len(date_str) >= 10 and date_str[4] in ['-', '/', '.'] and date_str[7] in ['-', '/', '.']:
        if date_str[:4].isdigit() and int(date_str[:4]) > 1900:
            return 'iso'

    # Verifica se pode ser formato americano (MM/DD/YYYY)
    if '/' in date_str:
        parts = date_str.split('/')
        if len(parts) == 3:
            try:
                month, day, year = int(parts[0]), int(parts[1]), int(parts[2])
                # Se o primeiro número é > 12, provavelmente é DD/MM/YYYY
                if month > 12:
                    return 'brazilian'
                # Se o segundo número é > 12, provavelmente é MM/DD/YYYY
                elif day > 12:
                    return 'american'
                # Se ambos <= 12, é ambíguo, assume brasileiro por padrão
                else:
                    return 'brazilian'
            except:
                pass

    # Formato brasileiro por padrão (DD/MM/YYYY, DD-MM-YYYY, DD.MM.YYYY)
    return 'brazilian'

def smart_date_conversion(date_string: str):
    """
    Converte string para data usando detecção inteligente de formato

    Args:
        date_string: String da data

    Returns:
        Timestamp do pandas ou levanta exceção
    """
    format_type = detect_date_format(date_string)

    if format_type == 'iso':
        return pd.to_datetime(date_string, errors='raise')
    elif format_type == 'american':
        return pd.to_datetime(date_string, format='%m/%d/%Y', errors='raise')
    elif format_type == 'brazilian':
        return pd.to_datetime(date_string, dayfirst=True, errors='raise')
    else:
        # Fallback para detecção automática
        return pd.to_datetime(date_string, errors='raise')

async def process_dates_advanced(series: pd.Series) -> pd.Series:
    """
    Processa datas com múltiplos formatos de forma robusta

    Args:
        series: Série pandas com datas em formato texto

    Returns:
        Série com datas convertidas para datetime
    """
    # Formatos de data para tentar em ordem de prioridade
    date_formats = [
        '%d/%m/%Y',     # 01/12/2024
        '%d-%m-%Y',     # 01-12-2024
        '%Y-%m-%d',     # 2024-12-01
        '%d/%m/%y',     # 01/12/24
        '%d-%m-%y',     # 01-12-24
        '%Y/%m/%d',     # 2024/12/01
        '%d.%m.%Y',     # 01.12.2024
        '%Y.%m.%d',     # 2024.12.01
        '%d/%m/%Y %H:%M:%S',  # 01/12/2024 14:30:00
        '%Y-%m-%d %H:%M:%S',  # 2024-12-01 14:30:00
    ]

    result_series = pd.Series(index=series.index, dtype='datetime64[ns]')

    for idx, value in series.items():
        if pd.isna(value) or str(value).strip() in ['', 'nan', 'null', 'none', '-']:
            result_series[idx] = pd.NaT
            continue

        value_str = str(value).strip()
        converted = False

        # Tenta conversão automática com detecção inteligente de formato
        try:
            result_series[idx] = smart_date_conversion(value_str)
            converted = True
        except:
            pass

        # Se não funcionou, tenta formatos específicos
        if not converted:
            for fmt in date_formats:
                try:
                    result_series[idx] = pd.to_datetime(value_str, format=fmt, errors='raise')
                    converted = True
                    break
                except:
                    continue

        # Se ainda não converteu, marca como NaT
        if not converted:
            result_series[idx] = pd.NaT
            logging.warning(f"Não foi possível converter '{value_str}' para data")

    return result_series

class CSVProcessingState(TypedDict):
    """Estado para processamento de CSV"""
    file_path: str
    success: bool
    message: str
    csv_data_sample: dict
    column_info: dict
    processing_stats: dict

async def detect_column_types(df: pd.DataFrame, sample_size: int = 1000) -> Dict[str, Any]:
    """
    Detecta automaticamente os tipos de colunas de forma genérica e otimizada

    Args:
        df: DataFrame do pandas
        sample_size: Número de linhas para amostragem (otimização)

    Returns:
        Dicionário com informações dos tipos detectados
    """
    column_info = {
        "detected_types": {},
        "sql_types": {},
        "date_columns": [],
        "numeric_columns": [],
        "text_columns": [],
        "processing_rules": {}
    }

    # Usa amostra para otimizar performance em datasets grandes
    sample_df = df.sample(n=min(sample_size, len(df)), random_state=42) if len(df) > sample_size else df
    logging.info(f"[OPTIMIZATION] Usando amostra de {len(sample_df)} linhas para detecção de tipos")

    for col in df.columns:
        # Detecta tipo original
        original_type = str(df[col].dtype)
        column_info["detected_types"][col] = original_type

        # Usa amostra para análise
        sample_col = sample_df[col] if col in sample_df.columns else df[col]
        
        # Detecta números já convertidos pelo pandas
        if sample_col.dtype in ['int64', 'Int64', 'float64', 'Float64']:
            if 'int' in str(sample_col.dtype).lower():
                column_info["numeric_columns"].append(col)
                column_info["sql_types"][col] = Integer()
                column_info["processing_rules"][col] = "keep_as_int"
            else:
                column_info["numeric_columns"].append(col)
                column_info["sql_types"][col] = Float()
                column_info["processing_rules"][col] = "keep_as_float"
            continue

        # Tenta detectar datas de forma mais robusta
        if sample_col.dtype == 'object':
            # Tenta detectar datas com múltiplos formatos
            sample_values = sample_col.dropna().head(20)
            date_success_count = 0

            # Formatos de data comuns para testar
            date_formats = [
                '%d/%m/%Y',     # 01/12/2024
                '%d-%m-%Y',     # 01-12-2024
                '%Y-%m-%d',     # 2024-12-01
                '%d/%m/%y',     # 01/12/24
                '%d-%m-%y',     # 01-12-24
                '%Y/%m/%d',     # 2024/12/01
                '%d.%m.%Y',     # 01.12.2024
                '%Y.%m.%d',     # 2024.12.01
            ]

            for val in sample_values:
                val_str = str(val).strip()
                if not val_str or val_str.lower() in ['nan', 'null', 'none', '-']:
                    continue

                # Tenta conversão automática com detecção inteligente
                try:
                    smart_date_conversion(val_str)
                    date_success_count += 1
                    continue
                except:
                    pass

                # Tenta formatos específicos
                for fmt in date_formats:
                    try:
                        pd.to_datetime(val_str, format=fmt, errors='raise')
                        date_success_count += 1
                        break
                    except:
                        continue

            # Se mais de 70% dos valores são datas válidas, considera como coluna de data
            if len(sample_values) > 0 and date_success_count / len(sample_values) > 0.7:
                column_info["date_columns"].append(col)
                column_info["sql_types"][col] = DateTime()
                column_info["processing_rules"][col] = "parse_dates_advanced"
                continue
        
        # Tenta detectar números em colunas de texto (otimizado)
        elif sample_col.dtype == 'object':
            # Análise otimizada de números em texto
            sample_values = sample_col.dropna().head(50)  # Aumenta amostra para melhor precisão

            if len(sample_values) == 0:
                column_info["text_columns"].append(col)
                column_info["sql_types"][col] = String()
                column_info["processing_rules"][col] = "keep_as_text"
                continue

            # Análise vetorizada para performance
            numeric_analysis = analyze_numeric_column(sample_values)

            if numeric_analysis["is_numeric"]:
                if numeric_analysis["is_integer"]:
                    column_info["numeric_columns"].append(col)
                    column_info["sql_types"][col] = Integer()
                    column_info["processing_rules"][col] = "convert_text_to_int_safe"
                else:
                    column_info["numeric_columns"].append(col)
                    column_info["sql_types"][col] = Float()
                    column_info["processing_rules"][col] = "convert_text_to_float_safe"
            else:
                # Mantém como texto
                column_info["text_columns"].append(col)
                column_info["sql_types"][col] = String()
                column_info["processing_rules"][col] = "keep_as_text"
        
        # Tenta detectar números em colunas de texto
        elif df[col].dtype == 'object':
            # Verifica se pode ser convertido para número
            sample_values = df[col].dropna().head(20)
            numeric_count = 0
            
            for val in sample_values:
                try:
                    # Remove caracteres comuns e tenta converter
                    clean_val = str(val).replace(',', '.').replace('-', '').strip()
                    if clean_val:
                        float(clean_val)
                        numeric_count += 1
                except:
                    pass
            
            # Se mais de 70% são números, trata como numérico
            if len(sample_values) > 0 and numeric_count / len(sample_values) > 0.7:
                # Verifica se são inteiros ou floats
                has_decimal = any('.' in str(val) or ',' in str(val) for val in sample_values)
                if has_decimal:
                    column_info["numeric_columns"].append(col)
                    column_info["sql_types"][col] = Float()
                    column_info["processing_rules"][col] = "convert_text_to_float"
                else:
                    column_info["numeric_columns"].append(col)
                    column_info["sql_types"][col] = Integer()
                    column_info["processing_rules"][col] = "convert_text_to_int"
            else:
                # Mantém como texto
                column_info["text_columns"].append(col)
                column_info["sql_types"][col] = String()
                column_info["processing_rules"][col] = "keep_as_text"
        else:
            # Outros tipos mantém como texto
            column_info["text_columns"].append(col)
            column_info["sql_types"][col] = String()
            column_info["processing_rules"][col] = "keep_as_text"
    
    return column_info

async def process_dataframe_generic(df: pd.DataFrame, column_info: Dict[str, Any]) -> pd.DataFrame:
    """
    Processa DataFrame com OTIMIZAÇÕES EXTREMAS para performance máxima

    Args:
        df: DataFrame original
        column_info: Informações dos tipos detectados

    Returns:
        DataFrame processado
    """
    logging.info(f"[ULTRA_OPTIMIZATION] Iniciando processamento ULTRA-OTIMIZADO de {len(df)} linhas")
    start_time = time.time()

    # OTIMIZAÇÃO 1: Evita cópia desnecessária - modifica in-place quando possível
    processed_df = df

    # OTIMIZAÇÃO 2: Agrupa colunas por tipo de processamento
    processing_groups = {
        'dates': [],
        'keep_numeric': [],
        'convert_numeric': [],
        'text': []
    }

    for col, rule in column_info["processing_rules"].items():
        if col not in processed_df.columns:
            continue

        if 'date' in rule:
            processing_groups['dates'].append((col, rule))
        elif 'keep_as' in rule:
            processing_groups['keep_numeric'].append((col, rule))
        elif 'convert' in rule:
            processing_groups['convert_numeric'].append((col, rule))
        else:
            processing_groups['text'].append((col, rule))

    # OTIMIZAÇÃO 3: Processamento paralelo por grupos
    await process_groups_parallel(processed_df, processing_groups)

    total_time = time.time() - start_time
    logging.info(f"[ULTRA_OPTIMIZATION] Processamento ULTRA-OTIMIZADO concluído em {total_time:.2f}s")

    return processed_df

async def process_groups_parallel(df: pd.DataFrame, groups: Dict[str, List]):
    """
    Processa grupos de colunas em paralelo para máxima performance
    """
    tasks = []

    # Processa cada grupo
    for group_name, columns in groups.items():
        if not columns:
            continue

        if group_name == 'dates':
            tasks.append(process_date_columns_batch(df, columns))
        elif group_name == 'keep_numeric':
            tasks.append(process_keep_numeric_batch(df, columns))
        elif group_name == 'convert_numeric':
            tasks.append(process_convert_numeric_batch(df, columns))
        # text não precisa processamento

    # Executa todos os grupos em paralelo
    if tasks:
        import asyncio
        await asyncio.gather(*tasks)

async def process_date_columns_batch(df: pd.DataFrame, date_columns: List[tuple]):
    """Processa colunas de data em lote"""
    for col, rule in date_columns:
        try:
            if rule == "parse_dates_advanced":
                # OTIMIZAÇÃO: Processamento vetorizado de datas
                df[col] = process_dates_vectorized(df[col])
            else:
                df[col] = pd.to_datetime(df[col], dayfirst=True, errors='coerce')
        except Exception as e:
            logging.warning(f"Erro ao processar data {col}: {e}")

async def process_keep_numeric_batch(df: pd.DataFrame, numeric_columns: List[tuple]):
    """Processa colunas numéricas que já estão no tipo correto"""
    for col, rule in numeric_columns:
        try:
            if rule == "keep_as_int" and df[col].dtype != 'Int64':
                df[col] = df[col].astype("Int64")
            elif rule == "keep_as_float" and df[col].dtype != 'float64':
                df[col] = df[col].astype("float64")
        except Exception as e:
            logging.warning(f"Erro ao manter tipo {col}: {e}")

async def process_convert_numeric_batch(df: pd.DataFrame, convert_columns: List[tuple]):
    """Processa conversões numéricas em lote com máxima otimização"""
    for col, rule in convert_columns:
        try:
            if rule == "convert_text_to_int_safe":
                df[col] = convert_to_int_ultra_optimized(df[col])
            elif rule == "convert_text_to_float_safe":
                df[col] = convert_to_float_ultra_optimized(df[col])
        except Exception as e:
            logging.warning(f"Erro ao converter {col}: {e}")
            if rule == "parse_dates":
                processed_df[col] = pd.to_datetime(
                    processed_df[col],
                    dayfirst=True,
                    errors='coerce'
                )

            elif rule == "parse_dates_advanced":
                # Processamento avançado de datas com múltiplos formatos
                processed_df[col] = await process_dates_advanced(processed_df[col])
                
            elif rule == "keep_as_int":
                # Já é inteiro, apenas garante tipo correto
                if processed_df[col].dtype != 'Int64':
                    processed_df[col] = processed_df[col].astype("Int64")

            elif rule == "keep_as_float":
                # Já é float, apenas garante tipo correto
                if processed_df[col].dtype != 'float64':
                    processed_df[col] = processed_df[col].astype("float64")

            elif rule == "convert_text_to_int_safe":
                # Conversão otimizada e segura para inteiros
                processed_df[col] = convert_to_int_optimized(processed_df[col])

            elif rule == "convert_text_to_float_safe":
                # Conversão otimizada e segura para floats
                processed_df[col] = convert_to_float_optimized(processed_df[col])
                
            elif rule == "keep_as_text":
                # Mantém como texto, apenas garante que é string
                processed_df[col] = processed_df[col].astype(str)
                
        except Exception as e:
            logging.warning(f"Erro ao processar coluna {col} com regra {rule}: {e}")
            # Em caso de erro, mantém coluna original
            continue

        col_time = time.time() - col_start_time
        logging.debug(f"[OPTIMIZATION] Coluna {col} processada em {col_time:.2f}s")

    total_time = time.time() - start_time
    logging.info(f"[OPTIMIZATION] Processamento concluído em {total_time:.2f}s")

    return processed_df

def convert_to_int_optimized(series: pd.Series) -> pd.Series:
    """
    Conversão otimizada para inteiros

    Args:
        series: Série para converter

    Returns:
        Série convertida para Int64
    """
    try:
        # Operações vetorizadas para performance
        cleaned = series.astype(str).str.strip()

        # Remove valores inválidos
        cleaned = cleaned.replace(['', 'nan', 'null', 'none', '-', 'NaN', 'NULL'], np.nan)

        # Substitui vírgulas por pontos
        cleaned = cleaned.str.replace(',', '.', regex=False)

        # Converte para numérico
        numeric = pd.to_numeric(cleaned, errors='coerce')

        # Verifica se pode ser convertido para inteiro sem perda
        # Só converte se todos os valores válidos são inteiros
        valid_mask = numeric.notna()
        if valid_mask.any():
            valid_numbers = numeric[valid_mask]
            # Verifica se são inteiros (sem parte decimal significativa)
            is_integer_mask = np.abs(valid_numbers - np.round(valid_numbers)) < 1e-10

            if is_integer_mask.all():
                # Todos são inteiros, pode converter
                result = numeric.round().astype("Int64")
            else:
                # Tem decimais, mantém como float mas avisa
                logging.warning(f"Coluna contém decimais, mantendo como float")
                result = numeric.astype("Float64")
        else:
            # Nenhum valor válido
            result = pd.Series([pd.NA] * len(series), dtype="Int64")

        return result

    except Exception as e:
        logging.error(f"Erro na conversão otimizada para int: {e}")
        return series

def convert_to_float_optimized(series: pd.Series) -> pd.Series:
    """
    Conversão otimizada para floats

    Args:
        series: Série para converter

    Returns:
        Série convertida para float64
    """
    try:
        # Operações vetorizadas para performance
        cleaned = series.astype(str).str.strip()

        # Remove valores inválidos
        cleaned = cleaned.replace(['', 'nan', 'null', 'none', '-', 'NaN', 'NULL'], np.nan)

        # Substitui vírgulas por pontos (formato brasileiro)
        cleaned = cleaned.str.replace(',', '.', regex=False)

        # Converte para numérico
        result = pd.to_numeric(cleaned, errors='coerce')

        return result

    except Exception as e:
        logging.error(f"Erro na conversão otimizada para float: {e}")
        return series

def convert_to_int_ultra_optimized(series: pd.Series) -> pd.Series:
    """
    Conversão ULTRA-OTIMIZADA para inteiros usando NumPy puro
    """
    try:
        # OTIMIZAÇÃO EXTREMA: Usa NumPy diretamente
        values = series.values

        # Se já é numérico, converte diretamente
        if pd.api.types.is_numeric_dtype(series):
            return pd.Series(values, dtype="Int64")

        # Para strings, usa operações vetorizadas do NumPy
        str_values = np.asarray(series.astype(str))

        # Máscara para valores válidos
        valid_mask = ~np.isin(str_values, ['', 'nan', 'null', 'none', '-', 'NaN', 'NULL'])

        # Inicializa resultado
        result = np.full(len(series), pd.NA, dtype=object)

        if valid_mask.any():
            valid_values = str_values[valid_mask]

            # Remove vírgulas e converte
            cleaned = np.char.replace(valid_values, ',', '.')

            # Conversão vetorizada
            try:
                numeric_values = pd.to_numeric(cleaned, errors='coerce')
                # Só converte se são realmente inteiros
                int_mask = np.abs(numeric_values - np.round(numeric_values)) < 1e-10
                int_values = np.round(numeric_values[int_mask]).astype('Int64')

                # Atribui valores convertidos
                valid_indices = np.where(valid_mask)[0]
                int_indices = valid_indices[int_mask]
                result[int_indices] = int_values

            except Exception:
                pass

        return pd.Series(result, dtype="Int64")

    except Exception as e:
        logging.error(f"Erro na conversão ultra-otimizada para int: {e}")
        return series

def convert_to_float_ultra_optimized(series: pd.Series) -> pd.Series:
    """
    Conversão ULTRA-OTIMIZADA para floats usando NumPy puro
    """
    try:
        # OTIMIZAÇÃO EXTREMA: Usa NumPy diretamente
        values = series.values

        # Se já é numérico, retorna diretamente
        if pd.api.types.is_numeric_dtype(series):
            return series.astype('float64')

        # Para strings, usa operações vetorizadas do NumPy
        str_values = np.asarray(series.astype(str))

        # Máscara para valores válidos
        valid_mask = ~np.isin(str_values, ['', 'nan', 'null', 'none', '-', 'NaN', 'NULL'])

        # Inicializa resultado
        result = np.full(len(series), np.nan, dtype='float64')

        if valid_mask.any():
            valid_values = str_values[valid_mask]

            # Remove vírgulas (formato brasileiro)
            cleaned = np.char.replace(valid_values, ',', '.')

            # Conversão vetorizada ultra-rápida
            numeric_values = pd.to_numeric(cleaned, errors='coerce')
            result[valid_mask] = numeric_values

        return pd.Series(result, dtype='float64')

    except Exception as e:
        logging.error(f"Erro na conversão ultra-otimizada para float: {e}")
        return series

def process_dates_vectorized(series: pd.Series) -> pd.Series:
    """
    Processamento vetorizado ULTRA-OTIMIZADO de datas
    """
    try:
        # OTIMIZAÇÃO: Tenta conversão direta primeiro
        try:
            return pd.to_datetime(series, dayfirst=True, errors='coerce')
        except:
            pass

        # Se falhou, usa abordagem mais robusta mas ainda otimizada
        str_values = series.astype(str)

        # Detecta formato mais comum na amostra
        sample = str_values.dropna().head(100)
        if len(sample) > 0:
            first_val = sample.iloc[0]

            # Detecta formato baseado no primeiro valor
            if len(first_val) >= 10 and first_val[4] in ['-', '/']:
                # Formato ISO
                return pd.to_datetime(series, errors='coerce')
            else:
                # Formato brasileiro
                return pd.to_datetime(series, dayfirst=True, errors='coerce')

        return pd.to_datetime(series, errors='coerce')

    except Exception as e:
        logging.error(f"Erro no processamento vetorizado de datas: {e}")
        return series

async def csv_processing_node(state: CSVProcessingState) -> CSVProcessingState:
    """
    Nó principal para processamento de CSV
    
    Args:
        state: Estado do processamento CSV
        
    Returns:
        Estado atualizado
    """
    try:
        file_path = state["file_path"]
        
        # Copia arquivo para diretório de upload
        shutil.copy(file_path, UPLOADED_CSV_PATH)
        logging.info(f"[CSV_PROCESSING] Arquivo copiado para: {UPLOADED_CSV_PATH}")
        
        # OTIMIZAÇÃO EXTREMA: Leitura de CSV ultra-otimizada
        separators = [';', ',', '\t', '|']
        df = None
        used_separator = None

        # Detecta separador com amostra mínima
        for sep in separators:
            try:
                test_df = pd.read_csv(file_path, sep=sep, nrows=3, engine='c')  # Engine C é mais rápido
                if len(test_df.columns) > 1:
                    # OTIMIZAÇÃO: Lê com configurações de performance máxima
                    df = pd.read_csv(
                        file_path,
                        sep=sep,
                        encoding='utf-8',
                        on_bad_lines="skip",
                        engine='c',  # Engine C para máxima performance
                        low_memory=False,  # Evita warnings de tipos mistos
                        dtype=str  # Lê tudo como string primeiro (mais rápido)
                    )
                    used_separator = sep
                    break
            except:
                continue
        
        if df is None:
            raise ValueError("Não foi possível detectar o formato do CSV")
        
        logging.info(f"[CSV_PROCESSING] CSV lido com separador '{used_separator}', {len(df)} linhas, {len(df.columns)} colunas")
        
        # Detecta tipos de colunas automaticamente
        column_info = await detect_column_types(df)
        
        # Processa DataFrame
        processed_df = await process_dataframe_generic(df, column_info)
        
        # Estatísticas do processamento
        processing_stats = {
            "original_rows": len(df),
            "processed_rows": len(processed_df),
            "original_columns": len(df.columns),
            "processed_columns": len(processed_df.columns),
            "separator_used": used_separator,
            "date_columns_detected": len(column_info["date_columns"]),
            "numeric_columns_detected": len(column_info["numeric_columns"]),
            "text_columns_detected": len(column_info["text_columns"])
        }
        
        # Amostra dos dados para o estado
        csv_data_sample = {
            "head": processed_df.head(5).to_dict(),
            "dtypes": processed_df.dtypes.astype(str).to_dict(),
            "columns": list(processed_df.columns)
        }
        
        # Armazena DataFrame processado no gerenciador de objetos
        obj_manager = get_object_manager()
        df_id = obj_manager.store_object(processed_df, "processed_dataframe")
        
        # Atualiza estado
        state.update({
            "success": True,
            "message": f"✅ CSV processado com sucesso! {processing_stats['processed_rows']} linhas, {processing_stats['processed_columns']} colunas",
            "csv_data_sample": csv_data_sample,
            "column_info": column_info,
            "processing_stats": processing_stats,
            "dataframe_id": df_id
        })
        
        logging.info(f"[CSV_PROCESSING] Processamento concluído: {processing_stats}")
        
    except Exception as e:
        error_msg = f"❌ Erro ao processar CSV: {e}"
        logging.error(f"[CSV_PROCESSING] {error_msg}")
        state.update({
            "success": False,
            "message": error_msg,
            "csv_data_sample": {},
            "column_info": {},
            "processing_stats": {}
        })
    
    return state