File size: 31,458 Bytes
611e2c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
#!/usr/bin/env python3
"""
One-shot migration script that copies existing SQLite data (admin rules + analytics)
into Supabase tables. Run this after creating the Supabase schemas:

1. supabase_admin_rules_table.sql
2. supabase_analytics_tables.sql

Usage:
    python migrate_sqlite_to_supabase.py [--force]

Connection Methods (choose one):
- POSTGRESQL_URL (recommended): Direct PostgreSQL connection
  Format: postgresql://user:password@host:port/database
- SUPABASE_URL + SUPABASE_SERVICE_KEY: Supabase REST API
  Requires: SUPABASE_URL and SUPABASE_SERVICE_KEY in your .env

Notes:
- Does not delete local SQLite data
- Re-running without --force will skip tables that already contain Supabase rows
- POSTGRESQL_URL method is faster and doesn't require service_role key
"""

from __future__ import annotations

import argparse
import json
import os
import socket
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List

from dotenv import load_dotenv

# Try to import both connection methods
try:
    from supabase import Client, create_client
    SUPABASE_AVAILABLE = True
except ImportError:
    SUPABASE_AVAILABLE = False
    Client = None

try:
    import psycopg2
    from psycopg2.extras import execute_batch
    PSYCOPG2_AVAILABLE = True
except ImportError:
    PSYCOPG2_AVAILABLE = False

BATCH_SIZE = 500


def chunked(items: List[Dict[str, Any]], size: int) -> Iterable[List[Dict[str, Any]]]:
    for i in range(0, len(items), size):
        yield items[i : i + size]


def get_connection_method():
    """Determine which connection method to use: PostgreSQL direct or Supabase API."""
    load_dotenv()
    postgres_url = os.getenv("POSTGRESQL_URL")
    supabase_url = os.getenv("SUPABASE_URL")
    supabase_key = os.getenv("SUPABASE_SERVICE_KEY")
    
    # Prefer PostgreSQL direct connection if available
    if postgres_url and PSYCOPG2_AVAILABLE:
        return "postgresql"
    elif supabase_url and supabase_key and SUPABASE_AVAILABLE:
        return "supabase"
    else:
        return None


def get_postgres_connection():
    """Get direct PostgreSQL connection using POSTGRESQL_URL."""
    load_dotenv()
    postgres_url = os.getenv("POSTGRESQL_URL")
    
    if not postgres_url:
        raise RuntimeError(
            "POSTGRESQL_URL not set in .env file.\n"
            "   Format: postgresql://user:password@host:port/database"
        )
    
    if not PSYCOPG2_AVAILABLE:
        raise RuntimeError(
            "psycopg2 not installed. Install it with: pip install psycopg2-binary"
        )
    
    try:
        conn = psycopg2.connect(postgres_url)
        return conn
    except Exception as e:
        raise RuntimeError(
            f"Failed to connect to PostgreSQL: {e}\n"
            "   Check that POSTGRESQL_URL is correct and the database is accessible."
        ) from e


def load_supabase_client() -> Client:
    load_dotenv()
    url = os.getenv("SUPABASE_URL")
    key = os.getenv("SUPABASE_SERVICE_KEY")
    
    if not url or not key:
        raise RuntimeError(
            "Supabase credentials missing. Set SUPABASE_URL and SUPABASE_SERVICE_KEY in your .env file.\n"
            f"  SUPABASE_URL: {'βœ… Set' if url else '❌ Missing'}\n"
            f"  SUPABASE_SERVICE_KEY: {'βœ… Set' if key else '❌ Missing'}"
        )
    
    # Validate URL format
    if not url.startswith("https://"):
        raise RuntimeError(
            f"Invalid SUPABASE_URL format. Expected https://... but got: {url[:50]}...\n"
            "  Example: https://your-project-id.supabase.co"
        )
    
    if ".supabase.co" not in url:
        print(f"⚠️  Warning: SUPABASE_URL doesn't contain '.supabase.co': {url[:50]}...")
        print("   Make sure this is the correct Supabase project URL.")
    
    # Validate and clean API key format
    key_trimmed = key.strip()
    if key_trimmed != key:
        print("⚠️  Warning: SUPABASE_SERVICE_KEY has leading/trailing whitespace. Trimming...")
    key = key_trimmed  # Use trimmed version
    
    if not key.startswith("eyJ"):
        print("⚠️  Warning: SUPABASE_SERVICE_KEY doesn't start with 'eyJ' (expected JWT format)")
        print("   Make sure you're using the service_role key, not the anon key.")
    
    if len(key) < 100:
        print("⚠️  Warning: SUPABASE_SERVICE_KEY seems too short (should be ~200+ characters)")
        print("   Make sure you copied the entire key from Supabase Dashboard.")
    
    # Mask URL and key for display
    masked_url = url[:20] + "..." + url[-15:] if len(url) > 35 else url[:20] + "..."
    masked_key = key[:10] + "..." + key[-10:] if len(key) > 20 else key[:10] + "..."
    print(f"πŸ”— Connecting to Supabase: {masked_url}")
    print(f"πŸ”‘ Using API key: {masked_key} ({len(key)} chars)")
    
    # Test DNS resolution first
    try:
        hostname = url.replace("https://", "").replace("http://", "").split("/")[0]
        print(f"   Resolving DNS for: {hostname}...")
        socket.gethostbyname(hostname)
        print("   βœ… DNS resolution successful")
    except socket.gaierror as dns_err:
        raise RuntimeError(
            f"❌ Cannot resolve DNS for Supabase URL: {url}\n"
            "   This usually means:\n"
            "   1. The Supabase project doesn't exist or was deleted\n"
            "   2. The project is paused (check Supabase Dashboard)\n"
            "   3. The URL is incorrect\n"
            "   4. Network/DNS connectivity issue\n\n"
            "   To fix:\n"
            "   1. Go to https://app.supabase.com and check your project status\n"
            "   2. If paused, resume it from the dashboard\n"
            "   3. Copy the correct URL from Settings β†’ API\n"
            f"   DNS Error: {dns_err}"
        ) from dns_err
    
    try:
        client = create_client(url, key)
        # Test connection with a simple query
        print("   Testing connection...")
        client.table("admin_rules").select("id").limit(0).execute()
        print("   βœ… Connection successful!")
        return client
    except Exception as e:
        error_msg = str(e)
        if "getaddrinfo failed" in error_msg or "ConnectError" in str(type(e)):
            raise RuntimeError(
                f"❌ Cannot connect to Supabase URL: {url}\n"
                "   Possible issues:\n"
                "   1. URL is incomplete or incorrect (should be: https://xxxxx.supabase.co)\n"
                "   2. Network connectivity problem\n"
                "   3. Supabase project doesn't exist or is paused\n"
                "   4. Firewall/proxy blocking the connection\n\n"
                "   Check your Supabase project at: https://app.supabase.com\n"
                f"   Error: {error_msg}"
            ) from e
        else:
            # Check if it's an API key error
            if "Invalid API key" in error_msg or "401" in error_msg:
                raise RuntimeError(
                    f"❌ Invalid API Key Error\n"
                    "   The SUPABASE_SERVICE_KEY in your .env file is incorrect.\n\n"
                    "   To fix:\n"
                    "   1. Go to https://app.supabase.com β†’ Your Project β†’ Settings β†’ API\n"
                    "   2. Find the 'service_role' key (NOT the 'anon' key)\n"
                    "   3. Click 'Reveal' to show the full key\n"
                    "   4. Copy the ENTIRE key (it's very long, ~200+ characters)\n"
                    "   5. Update SUPABASE_SERVICE_KEY in your .env file\n"
                    "   6. Make sure there are NO quotes, spaces, or line breaks\n\n"
                    f"   Current key length: {len(key)} characters\n"
                    f"   Expected: ~200+ characters (JWT token starting with 'eyJ')\n\n"
                    f"   Error details: {error_msg}"
                ) from e
            else:
                raise RuntimeError(
                    f"❌ Failed to connect to Supabase: {error_msg}\n"
                    "   Check that:\n"
                    "   1. SUPABASE_URL is correct and complete\n"
                    "   2. SUPABASE_SERVICE_KEY is the service_role key (not anon key)\n"
                    "   3. Your Supabase project is active (not paused)\n"
                    "   4. The tables exist (run supabase_admin_rules_table.sql and supabase_analytics_tables.sql)"
                ) from e


def sqlite_rows(db_path: Path, query: str) -> List[Dict[str, Any]]:
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    rows = conn.execute(query).fetchall()
    conn.close()
    return [dict(row) for row in rows]


def warn_if_table_has_rows(client: Client, table: str) -> bool:
    response = client.table(table).select("id", count="exact").limit(1).execute()
    count = getattr(response, "count", None)
    return bool(count and count > 0)


def iso_from_unix(ts: Any) -> str | None:
    if ts is None:
        return None
    try:
        return datetime.fromtimestamp(int(ts), tz=timezone.utc).isoformat()
    except (ValueError, TypeError):
        return None


def migrate_rules(client: Client, db_path: Path, force: bool):
    table = "admin_rules"
    if not force and warn_if_table_has_rows(client, table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    if not db_path.exists():
        print(f"ℹ️  No local rules database found at {db_path}, skipping rules migration.")
        return

    rows = sqlite_rows(
        db_path,
        """
        SELECT tenant_id, rule, pattern, severity, description, enabled, created_at
        FROM admin_rules
        """,
    )
    if not rows:
        print("ℹ️  No rules to migrate.")
        return

    payload = []
    for row in rows:
        payload.append(
            {
                "tenant_id": row["tenant_id"],
                "rule": row["rule"],
                "pattern": row["pattern"] or row["rule"],
                "severity": row.get("severity") or "medium",
                "description": row.get("description") or row["rule"],
                "enabled": bool(row.get("enabled", 1)),
                "created_at": iso_from_unix(row.get("created_at")) or None,
            }
        )

    for batch in chunked(payload, BATCH_SIZE):
        client.table(table).upsert(batch, on_conflict="tenant_id,rule").execute()

    print(f"βœ… Migrated {len(payload)} admin rule(s) to Supabase.")


def migrate_tool_usage(client: Client, db_path: Path, force: bool):
    table = "tool_usage_events"
    if not force and warn_if_table_has_rows(client, table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    rows = sqlite_rows(db_path, "SELECT * FROM tool_usage_events")
    if not rows:
        print("ℹ️  No tool usage events to migrate.")
        return

    payload = []
    for row in rows:
        metadata = row.get("metadata")
        payload.append(
            {
                "tenant_id": row["tenant_id"],
                "user_id": row.get("user_id"),
                "tool_name": row["tool_name"],
                "timestamp": row["timestamp"],
                "latency_ms": row.get("latency_ms"),
                "tokens_used": row.get("tokens_used"),
                "success": bool(row.get("success", 1)),
                "error_message": row.get("error_message"),
                "metadata": json.loads(metadata) if metadata else None,
            }
        )

    for batch in chunked(payload, BATCH_SIZE):
        client.table(table).insert(batch).execute()

    print(f"βœ… Migrated {len(payload)} tool usage event(s).")


def migrate_redflags(client: Client, db_path: Path, force: bool):
    table = "redflag_violations"
    if not force and warn_if_table_has_rows(client, table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    rows = sqlite_rows(db_path, "SELECT * FROM redflag_violations")
    if not rows:
        print("ℹ️  No red-flag violations to migrate.")
        return

    payload = []
    for row in rows:
        payload.append(
            {
                "tenant_id": row["tenant_id"],
                "user_id": row.get("user_id"),
                "rule_id": row["rule_id"],
                "rule_pattern": row.get("rule_pattern"),
                "severity": row["severity"],
                "matched_text": row.get("matched_text"),
                "confidence": row.get("confidence"),
                "message_preview": row.get("message_preview"),
                "timestamp": row["timestamp"],
            }
        )

    for batch in chunked(payload, BATCH_SIZE):
        client.table(table).insert(batch).execute()

    print(f"βœ… Migrated {len(payload)} red-flag violation(s).")


def migrate_rag_searches(client: Client, db_path: Path, force: bool):
    table = "rag_search_events"
    if not force and warn_if_table_has_rows(client, table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    rows = sqlite_rows(db_path, "SELECT * FROM rag_search_events")
    if not rows:
        print("ℹ️  No RAG search events to migrate.")
        return

    payload = []
    for row in rows:
        payload.append(
            {
                "tenant_id": row["tenant_id"],
                "query": row["query"],
                "hits_count": row.get("hits_count"),
                "avg_score": row.get("avg_score"),
                "top_score": row.get("top_score"),
                "timestamp": row["timestamp"],
                "latency_ms": row.get("latency_ms"),
            }
        )

    for batch in chunked(payload, BATCH_SIZE):
        client.table(table).insert(batch).execute()

    print(f"βœ… Migrated {len(payload)} RAG search event(s).")


def migrate_agent_queries(client: Client, db_path: Path, force: bool):
    table = "agent_query_events"
    if not force and warn_if_table_has_rows(client, table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    rows = sqlite_rows(db_path, "SELECT * FROM agent_query_events")
    if not rows:
        print("ℹ️  No agent query events to migrate.")
        return

    payload = []
    for row in rows:
        tools = row.get("tools_used")
        payload.append(
            {
                "tenant_id": row["tenant_id"],
                "user_id": row.get("user_id"),
                "message_preview": row.get("message_preview"),
                "intent": row.get("intent"),
                "tools_used": json.loads(tools) if tools else None,
                "total_tokens": row.get("total_tokens"),
                "total_latency_ms": row.get("total_latency_ms"),
                "success": bool(row.get("success", 1)),
                "timestamp": row["timestamp"],
            }
        )

    for batch in chunked(payload, BATCH_SIZE):
        client.table(table).insert(batch).execute()

    print(f"βœ… Migrated {len(payload)} agent query event(s).")


def migrate_rules_postgres(conn, db_path: Path, force: bool, check_table_func):
    """Migrate rules using PostgreSQL direct connection."""
    table = "admin_rules"
    
    # Check if table exists
    if not table_exists_postgres(conn, table):
        print(f"❌ Table '{table}' does not exist in PostgreSQL!")
        print(f"   Please create it first by running 'supabase_admin_rules_table.sql' in Supabase SQL Editor")
        print(f"   Skipping rules migration.")
        return
    
    if not force and check_table_func(table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    if not db_path.exists():
        print(f"ℹ️  No local rules database found at {db_path}, skipping rules migration.")
        return

    rows = sqlite_rows(
        db_path,
        """
        SELECT tenant_id, rule, pattern, severity, description, enabled, created_at
        FROM admin_rules
        """,
    )
    if not rows:
        print("ℹ️  No rules to migrate.")
        return

    payload = []
    for row in rows:
        payload.append({
            "tenant_id": row["tenant_id"],
            "rule": row["rule"],
            "pattern": row["pattern"] or row["rule"],
            "severity": row.get("severity") or "medium",
            "description": row.get("description") or row["rule"],
            "enabled": bool(row.get("enabled", 1)),
            "created_at": iso_from_unix(row.get("created_at")) or None,
        })

    columns = ["tenant_id", "rule", "pattern", "severity", "description", "enabled", "created_at"]
    for batch in chunked(payload, BATCH_SIZE):
        insert_batch_postgres(conn, table, columns, batch, on_conflict="tenant_id,rule")

    print(f"βœ… Migrated {len(payload)} admin rule(s) to Supabase.")


def migrate_tool_usage_postgres(conn, db_path: Path, force: bool, check_table_func):
    """Migrate tool usage events using PostgreSQL direct connection."""
    table = "tool_usage_events"
    
    if not table_exists_postgres(conn, table):
        print(f"❌ Table '{table}' does not exist in PostgreSQL!")
        print(f"   Please create it first by running 'supabase_analytics_tables.sql' in Supabase SQL Editor")
        print(f"   Skipping tool usage migration.")
        return
    
    if not force and check_table_func(table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    rows = sqlite_rows(db_path, "SELECT * FROM tool_usage_events")
    if not rows:
        print("ℹ️  No tool usage events to migrate.")
        return

    payload = []
    for row in rows:
        metadata = row.get("metadata")
        payload.append({
            "tenant_id": row["tenant_id"],
            "user_id": row.get("user_id"),
            "tool_name": row["tool_name"],
            "timestamp": row["timestamp"],
            "latency_ms": row.get("latency_ms"),
            "tokens_used": row.get("tokens_used"),
            "success": bool(row.get("success", 1)),
            "error_message": row.get("error_message"),
            "metadata": json.loads(metadata) if metadata else None,
        })

    columns = ["tenant_id", "user_id", "tool_name", "timestamp", "latency_ms", "tokens_used", "success", "error_message", "metadata"]
    for batch in chunked(payload, BATCH_SIZE):
        insert_batch_postgres(conn, table, columns, batch)

    print(f"βœ… Migrated {len(payload)} tool usage event(s).")


def migrate_redflags_postgres(conn, db_path: Path, force: bool, check_table_func):
    """Migrate redflag violations using PostgreSQL direct connection."""
    table = "redflag_violations"
    
    if not table_exists_postgres(conn, table):
        print(f"❌ Table '{table}' does not exist in PostgreSQL!")
        print(f"   Please create it first by running 'supabase_analytics_tables.sql' in Supabase SQL Editor")
        print(f"   Skipping redflag migration.")
        return
    
    if not force and check_table_func(table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    rows = sqlite_rows(db_path, "SELECT * FROM redflag_violations")
    if not rows:
        print("ℹ️  No red-flag violations to migrate.")
        return

    payload = []
    for row in rows:
        payload.append({
            "tenant_id": row["tenant_id"],
            "user_id": row.get("user_id"),
            "rule_id": row["rule_id"],
            "rule_pattern": row.get("rule_pattern"),
            "severity": row["severity"],
            "matched_text": row.get("matched_text"),
            "confidence": row.get("confidence"),
            "message_preview": row.get("message_preview"),
            "timestamp": row["timestamp"],
        })

    columns = ["tenant_id", "user_id", "rule_id", "rule_pattern", "severity", "matched_text", "confidence", "message_preview", "timestamp"]
    for batch in chunked(payload, BATCH_SIZE):
        insert_batch_postgres(conn, table, columns, batch)

    print(f"βœ… Migrated {len(payload)} red-flag violation(s).")


def migrate_rag_searches_postgres(conn, db_path: Path, force: bool, check_table_func):
    """Migrate RAG search events using PostgreSQL direct connection."""
    table = "rag_search_events"
    
    if not table_exists_postgres(conn, table):
        print(f"❌ Table '{table}' does not exist in PostgreSQL!")
        print(f"   Please create it first by running 'supabase_analytics_tables.sql' in Supabase SQL Editor")
        print(f"   Skipping RAG search migration.")
        return
    
    if not force and check_table_func(table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    rows = sqlite_rows(db_path, "SELECT * FROM rag_search_events")
    if not rows:
        print("ℹ️  No RAG search events to migrate.")
        return

    payload = []
    for row in rows:
        payload.append({
            "tenant_id": row["tenant_id"],
            "query": row["query"],
            "hits_count": row.get("hits_count"),
            "avg_score": row.get("avg_score"),
            "top_score": row.get("top_score"),
            "timestamp": row["timestamp"],
            "latency_ms": row.get("latency_ms"),
        })

    columns = ["tenant_id", "query", "hits_count", "avg_score", "top_score", "timestamp", "latency_ms"]
    for batch in chunked(payload, BATCH_SIZE):
        insert_batch_postgres(conn, table, columns, batch)

    print(f"βœ… Migrated {len(payload)} RAG search event(s).")


def migrate_agent_queries_postgres(conn, db_path: Path, force: bool, check_table_func):
    """Migrate agent query events using PostgreSQL direct connection."""
    table = "agent_query_events"
    
    if not table_exists_postgres(conn, table):
        print(f"❌ Table '{table}' does not exist in PostgreSQL!")
        print(f"   Please create it first by running 'supabase_analytics_tables.sql' in Supabase SQL Editor")
        print(f"   Skipping agent query migration.")
        return
    
    if not force and check_table_func(table):
        print(f"⚠️  Supabase table '{table}' already has rows. Skipping (use --force to override).")
        return

    rows = sqlite_rows(db_path, "SELECT * FROM agent_query_events")
    if not rows:
        print("ℹ️  No agent query events to migrate.")
        return

    payload = []
    for row in rows:
        tools = row.get("tools_used")
        payload.append({
            "tenant_id": row["tenant_id"],
            "user_id": row.get("user_id"),
            "message_preview": row.get("message_preview"),
            "intent": row.get("intent"),
            "tools_used": json.loads(tools) if tools else None,
            "total_tokens": row.get("total_tokens"),
            "total_latency_ms": row.get("total_latency_ms"),
            "success": bool(row.get("success", 1)),
            "timestamp": row["timestamp"],
        })

    columns = ["tenant_id", "user_id", "message_preview", "intent", "tools_used", "total_tokens", "total_latency_ms", "success", "timestamp"]
    for batch in chunked(payload, BATCH_SIZE):
        insert_batch_postgres(conn, table, columns, batch)

    print(f"βœ… Migrated {len(payload)} agent query event(s).")


def table_exists_postgres(conn, table: str) -> bool:
    """Check if PostgreSQL table exists."""
    with conn.cursor() as cur:
        cur.execute("""
            SELECT EXISTS (
                SELECT FROM information_schema.tables 
                WHERE table_schema = 'public' 
                AND table_name = %s
            )
        """, (table,))
        return cur.fetchone()[0]


def check_table_has_rows_postgres(conn, table: str) -> bool:
    """Check if PostgreSQL table has rows."""
    if not table_exists_postgres(conn, table):
        return False
    try:
        with conn.cursor() as cur:
            cur.execute(f"SELECT COUNT(*) FROM {table}")
            count = cur.fetchone()[0]
            return count > 0
    except Exception as e:
        error_str = str(e)
        if "does not exist" in error_str or "relation" in error_str.lower():
            return False
        raise


def check_table_has_rows_supabase(client: Client, table: str) -> bool:
    """Check if Supabase table has rows."""
    try:
        response = client.table(table).select("id", count="exact").limit(1).execute()
        count = getattr(response, "count", None)
        return bool(count and count > 0)
    except:
        return False


def insert_batch_postgres(conn, table: str, columns: List[str], batch: List[Dict[str, Any]], on_conflict: str = None):
    """Insert batch into PostgreSQL table."""
    if not batch:
        return
    
    placeholders = ", ".join(["%s"] * len(columns))
    cols = ", ".join(columns)
    
    if on_conflict:
        # For admin_rules with unique constraint
        update_cols = ", ".join([f"{col} = EXCLUDED.{col}" for col in columns if col != "id"])
        query = f"INSERT INTO {table} ({cols}) VALUES ({placeholders}) ON CONFLICT ({on_conflict}) DO UPDATE SET {update_cols}"
    else:
        query = f"INSERT INTO {table} ({cols}) VALUES ({placeholders})"
    
    # Prepare values, converting dicts/lists to JSON for JSONB columns
    values = []
    for row in batch:
        row_values = []
        for col in columns:
            val = row.get(col)
            # Convert dict/list to JSON string for JSONB columns
            if col in ["metadata", "tools_used"] and val is not None:
                if isinstance(val, (dict, list)):
                    val = json.dumps(val)
            row_values.append(val)
        values.append(row_values)
    
    with conn.cursor() as cur:
        execute_batch(cur, query, values)
    conn.commit()


def insert_batch_supabase(client: Client, table: str, batch: List[Dict[str, Any]], on_conflict: str = None):
    """Insert batch into Supabase table."""
    if not batch:
        return
    
    if on_conflict:
        client.table(table).upsert(batch, on_conflict=on_conflict).execute()
    else:
        for chunk in chunked(batch, BATCH_SIZE):
            client.table(table).insert(chunk).execute()


def main():
    parser = argparse.ArgumentParser(description="Migrate SQLite analytics/rules data into Supabase.")
    parser.add_argument("--force", action="store_true", help="Insert even if Supabase tables already contain rows.")
    args = parser.parse_args()

    print("=" * 70)
    print("SQLite to Supabase Migration Tool")
    print("=" * 70)
    print()

    # Check for SQLite databases first
    root = Path(__file__).resolve().parent
    data_dir = root / "data"
    rules_db = data_dir / "admin_rules.db"
    analytics_db = data_dir / "analytics.db"

    print("πŸ“ Checking for local SQLite databases:")
    print(f"   Rules DB: {rules_db} {'βœ…' if rules_db.exists() else '❌ Not found'}")
    print(f"   Analytics DB: {analytics_db} {'βœ…' if analytics_db.exists() else '❌ Not found'}")
    print()

    if not rules_db.exists() and not analytics_db.exists():
        print("⚠️  No SQLite databases found. Nothing to migrate.")
        return

    # Determine connection method
    print("πŸ” Checking connection method...")
    method = get_connection_method()
    
    if method == "postgresql":
        print("   βœ… Using PostgreSQL direct connection (POSTGRESQL_URL)")
        conn = get_postgres_connection()
        print("   βœ… Connected to PostgreSQL")
        client = None
        check_table = lambda t: check_table_has_rows_postgres(conn, t)
        
        # Check if required tables exist
        print()
        print("πŸ“‹ Checking if required tables exist...")
        required_tables = {
            "admin_rules": "supabase_admin_rules_table.sql",
            "tool_usage_events": "supabase_analytics_tables.sql",
            "redflag_violations": "supabase_analytics_tables.sql",
            "rag_search_events": "supabase_analytics_tables.sql",
            "agent_query_events": "supabase_analytics_tables.sql",
        }
        missing_tables = {}
        for table, sql_file in required_tables.items():
            if table_exists_postgres(conn, table):
                print(f"   βœ… {table}")
            else:
                print(f"   ❌ {table} (missing)")
                if sql_file not in missing_tables:
                    missing_tables[sql_file] = []
                missing_tables[sql_file].append(table)
        
        if missing_tables:
            print()
            print("⚠️  Some tables are missing! Please create them first:")
            print()
            for sql_file, tables in missing_tables.items():
                print(f"   Run '{sql_file}' in Supabase SQL Editor to create:")
                for table in tables:
                    print(f"      - {table}")
            print()
            print("   Steps:")
            print("   1. Go to https://app.supabase.com β†’ Your Project β†’ SQL Editor")
            print("   2. Click 'New query'")
            for sql_file in missing_tables.keys():
                print(f"   3. Open and copy contents of '{sql_file}' from your project")
                print("   4. Paste into SQL Editor and click 'Run'")
            print("   5. Run this migration script again")
            print()
            conn.close()
            return
    elif method == "supabase":
        print("   βœ… Using Supabase API (SUPABASE_URL + SUPABASE_SERVICE_KEY)")
        client = load_supabase_client()
        conn = None
        check_table = lambda t: check_table_has_rows_supabase(client, t)
    else:
        print("   ❌ No connection method available!")
        print()
        print("   Please set one of the following in your .env file:")
        print("   - POSTGRESQL_URL=postgresql://user:password@host:port/database")
        print("     OR")
        print("   - SUPABASE_URL=https://xxxxx.supabase.co")
        print("   - SUPABASE_SERVICE_KEY=your_service_role_key")
        return
    
    print()

    print("πŸš€ Starting migration...")
    print()

    # Migrate using the appropriate method
    if method == "postgresql":
        migrate_rules_postgres(conn, rules_db, args.force, check_table)
        migrate_tool_usage_postgres(conn, analytics_db, args.force, check_table)
        migrate_redflags_postgres(conn, analytics_db, args.force, check_table)
        migrate_rag_searches_postgres(conn, analytics_db, args.force, check_table)
        migrate_agent_queries_postgres(conn, analytics_db, args.force, check_table)
        conn.close()
    else:
        migrate_rules(client, rules_db, args.force)
        migrate_tool_usage(client, analytics_db, args.force)
        migrate_redflags(client, analytics_db, args.force)
        migrate_rag_searches(client, analytics_db, args.force)
        migrate_agent_queries(client, analytics_db, args.force)

    print()
    print("=" * 70)
    print("πŸŽ‰ Migration completed!")
    print("=" * 70)
    print()
    print("πŸ’‘ Next steps:")
    print("   1. Verify data in Supabase Dashboard β†’ Table Editor")
    print("   2. Restart your FastAPI/MCP services to use Supabase backend")
    print("   3. (Optional) Back up SQLite files before deleting them")


if __name__ == "__main__":
    main()