File size: 17,572 Bytes
4a21e7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
#!/usr/bin/env python3
"""

Integration tests for Telegram Analytics: indexer, search, and dashboard endpoints.



Run with: python -m pytest tests.py -v

Or:       python tests.py

"""

import json
import os
import sqlite3
import tempfile
import time
import unittest

from pathlib import Path


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _sample_messages(n: int = 5) -> list[dict]:
    """Generate N realistic Telegram-format messages."""
    base_ts = 1700000000
    users = [
        ("user1", "Alice"),
        ("user2", "Bob"),
        ("user3", "Carol"),
    ]
    msgs = []
    for i in range(1, n + 1):
        uid, name = users[i % len(users)]
        msgs.append({
            "id": 1000 + i,
            "type": "message",
            "date": f"2024-01-{(i % 28) + 1:02d}T10:00:00",
            "date_unixtime": str(base_ts + i * 3600),
            "from": name,
            "from_id": uid,
            "text": f"Test message number {i} from {name}",
            "text_entities": [
                {"type": "plain", "text": f"Test message number {i} from {name}"}
            ],
            "reply_to_message_id": (1000 + i - 1) if i > 1 else None,
        })
    return msgs


def _write_json(path: str, messages: list[dict]):
    """Write messages in Telegram export JSON format."""
    with open(path, "w", encoding="utf-8") as f:
        json.dump({"messages": messages}, f, ensure_ascii=False)


# ---------------------------------------------------------------------------
# 1. Indexer Tests
# ---------------------------------------------------------------------------

class TestIndexer(unittest.TestCase):
    """Tests for OptimizedIndexer and IncrementalIndexer."""

    def setUp(self):
        self.tmpdir = tempfile.mkdtemp()
        self.db_path = os.path.join(self.tmpdir, "test.db")
        self.json_path = os.path.join(self.tmpdir, "messages.json")
        self.messages = _sample_messages(10)
        _write_json(self.json_path, self.messages)

    def tearDown(self):
        import shutil
        shutil.rmtree(self.tmpdir, ignore_errors=True)

    def test_optimized_indexer_indexes_messages(self):
        from indexer import OptimizedIndexer
        indexer = OptimizedIndexer(self.db_path, build_trigrams=False, build_graph=False)
        stats = indexer.index_file(self.json_path, show_progress=False)

        self.assertGreater(stats["messages"], 0)

        conn = sqlite3.connect(self.db_path)
        count = conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
        conn.close()
        self.assertEqual(count, stats["messages"])

    def test_incremental_indexer_deduplication(self):
        from indexer import OptimizedIndexer, IncrementalIndexer

        # First: create DB with OptimizedIndexer
        opt = OptimizedIndexer(self.db_path, build_trigrams=False, build_graph=False)
        opt.index_file(self.json_path, show_progress=False)

        # Now use IncrementalIndexer – same data, should all be duplicates
        idx = IncrementalIndexer(self.db_path)
        stats = idx.update_from_json(self.json_path, show_progress=False)
        idx.close()

        self.assertEqual(stats["new_messages"], 0)
        self.assertGreater(stats["duplicates"], 0)

    def test_incremental_indexer_adds_new(self):
        from indexer import OptimizedIndexer, IncrementalIndexer

        # Create DB with 5 messages
        msgs5 = _sample_messages(5)
        _write_json(self.json_path, msgs5)
        opt = OptimizedIndexer(self.db_path, build_trigrams=False, build_graph=False)
        opt.index_file(self.json_path, show_progress=False)

        # Now add 10 messages (5 old + 5 new)
        msgs10 = _sample_messages(10)
        json2 = os.path.join(self.tmpdir, "messages2.json")
        _write_json(json2, msgs10)

        idx = IncrementalIndexer(self.db_path)
        stats = idx.update_from_json(json2, show_progress=False)
        idx.close()

        self.assertEqual(stats["new_messages"], 5)
        self.assertEqual(stats["duplicates"], 5)

    def test_incremental_indexer_from_json_data(self):
        from indexer import OptimizedIndexer, IncrementalIndexer

        # Init DB first
        opt = OptimizedIndexer(self.db_path, build_trigrams=False, build_graph=False)
        opt.index_file(self.json_path, show_progress=False)

        # Add new messages via json_data
        new_msgs = _sample_messages(15)  # 10 old + 5 new
        idx = IncrementalIndexer(self.db_path)
        stats = idx.update_from_json_data(new_msgs, show_progress=False)
        idx.close()

        self.assertEqual(stats["new_messages"], 5)

    def test_fts5_search_works(self):
        from indexer import OptimizedIndexer
        indexer = OptimizedIndexer(self.db_path, build_trigrams=False, build_graph=False)
        indexer.index_file(self.json_path, show_progress=False)

        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute(
            "SELECT COUNT(*) FROM messages_fts WHERE messages_fts MATCH 'message'"
        )
        count = cursor.fetchone()[0]
        conn.close()

        self.assertGreater(count, 0, "FTS5 search should find messages with 'message'")

    def test_streaming_load_json_messages(self):
        from indexer import load_json_messages
        msgs = list(load_json_messages(self.json_path))
        self.assertEqual(len(msgs), 10)
        self.assertIn("text_plain", msgs[0])

    def test_entities_extracted(self):
        """Messages with links/mentions in text_entities should have entities stored."""
        msgs = [
            {
                "id": 9001,
                "type": "message",
                "date": "2024-01-01T10:00:00",
                "date_unixtime": "1700000000",
                "from": "Alice",
                "from_id": "user1",
                "text": "Check https://example.com and @bob",
                "text_entities": [
                    {"type": "plain", "text": "Check "},
                    {"type": "link", "text": "https://example.com"},
                    {"type": "plain", "text": " and "},
                    {"type": "mention", "text": "@bob"},
                ],
            }
        ]
        _write_json(self.json_path, msgs)

        from indexer import OptimizedIndexer
        indexer = OptimizedIndexer(self.db_path, build_trigrams=False, build_graph=False)
        indexer.index_file(self.json_path, show_progress=False)

        conn = sqlite3.connect(self.db_path)
        entities = conn.execute("SELECT type, value FROM entities WHERE message_id = 9001").fetchall()
        conn.close()

        types = [e[0] for e in entities]
        self.assertIn("link", types)
        self.assertIn("mention", types)


# ---------------------------------------------------------------------------
# 2. Search Tests
# ---------------------------------------------------------------------------

class TestSearch(unittest.TestCase):
    """Tests for FTS search."""

    def setUp(self):
        self.tmpdir = tempfile.mkdtemp()
        self.db_path = os.path.join(self.tmpdir, "test.db")
        self.json_path = os.path.join(self.tmpdir, "messages.json")
        _write_json(self.json_path, _sample_messages(20))

        from indexer import OptimizedIndexer
        indexer = OptimizedIndexer(self.db_path, build_trigrams=False, build_graph=False)
        indexer.index_file(self.json_path, show_progress=False)

    def tearDown(self):
        import shutil
        shutil.rmtree(self.tmpdir, ignore_errors=True)

    def test_fts_match_query(self):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        rows = conn.execute(
            "SELECT id, text_plain FROM messages WHERE id IN "
            "(SELECT rowid FROM messages_fts WHERE messages_fts MATCH 'Alice')"
        ).fetchall()
        conn.close()
        self.assertGreater(len(rows), 0)
        for r in rows:
            self.assertIn("Alice", r["text_plain"])

    def test_fts_returns_no_results_for_nonsense(self):
        conn = sqlite3.connect(self.db_path)
        rows = conn.execute(
            "SELECT COUNT(*) FROM messages_fts WHERE messages_fts MATCH 'xyzzyplugh'"
        ).fetchone()[0]
        conn.close()
        self.assertEqual(rows, 0)


# ---------------------------------------------------------------------------
# 3. SemanticSearch Empty Embeddings
# ---------------------------------------------------------------------------

try:
    import numpy as np
    HAS_NUMPY = True
except ImportError:
    HAS_NUMPY = False


@unittest.skipUnless(HAS_NUMPY, "numpy not installed")
class TestSemanticSearchEmpty(unittest.TestCase):
    """Test that SemanticSearch handles missing/empty embeddings gracefully."""

    def test_is_available_missing_db(self):
        from semantic_search import SemanticSearch
        ss = SemanticSearch(embeddings_db="/tmp/nonexistent_embeddings_12345.db")
        self.assertFalse(ss.is_available())

    def test_is_available_empty_db(self):
        from semantic_search import SemanticSearch
        tmpdir = tempfile.mkdtemp()
        db_path = os.path.join(tmpdir, "empty_emb.db")

        conn = sqlite3.connect(db_path)
        conn.execute(
            "CREATE TABLE embeddings (message_id INTEGER PRIMARY KEY, "
            "from_name TEXT, text_preview TEXT, embedding BLOB)"
        )
        conn.commit()
        conn.close()

        ss = SemanticSearch(embeddings_db=db_path)
        self.assertFalse(ss.is_available())

        import shutil
        shutil.rmtree(tmpdir, ignore_errors=True)

    def test_load_empty_embeddings_no_crash(self):
        from semantic_search import SemanticSearch
        tmpdir = tempfile.mkdtemp()
        db_path = os.path.join(tmpdir, "empty_emb.db")

        conn = sqlite3.connect(db_path)
        conn.execute(
            "CREATE TABLE embeddings (message_id INTEGER PRIMARY KEY, "
            "from_name TEXT, text_preview TEXT, embedding BLOB)"
        )
        conn.commit()
        conn.close()

        ss = SemanticSearch(embeddings_db=db_path)
        ss._load_embeddings()  # Should not crash

        self.assertTrue(ss.embeddings_loaded)
        self.assertEqual(len(ss.message_ids), 0)

        import shutil
        shutil.rmtree(tmpdir, ignore_errors=True)

    def test_stats_empty_db(self):
        from semantic_search import SemanticSearch
        tmpdir = tempfile.mkdtemp()
        db_path = os.path.join(tmpdir, "empty_emb.db")

        conn = sqlite3.connect(db_path)
        conn.execute(
            "CREATE TABLE embeddings (message_id INTEGER PRIMARY KEY, "
            "from_name TEXT, text_preview TEXT, embedding BLOB)"
        )
        conn.commit()
        conn.close()

        ss = SemanticSearch(embeddings_db=db_path)
        s = ss.stats()
        self.assertTrue(s["available"])  # File exists and table exists
        self.assertEqual(s["count"], 0)

        import shutil
        shutil.rmtree(tmpdir, ignore_errors=True)


# ---------------------------------------------------------------------------
# 4. Dashboard Endpoint Tests
# ---------------------------------------------------------------------------

try:
    import flask
    HAS_FLASK = True
except ImportError:
    HAS_FLASK = False


@unittest.skipUnless(HAS_FLASK, "flask not installed")
class TestDashboardEndpoints(unittest.TestCase):
    """Test Flask dashboard API endpoints."""

    @classmethod
    def setUpClass(cls):
        """Create a test DB and configure Flask test client."""
        cls.tmpdir = tempfile.mkdtemp()
        cls.db_path = os.path.join(cls.tmpdir, "test.db")
        cls.json_path = os.path.join(cls.tmpdir, "messages.json")

        _write_json(cls.json_path, _sample_messages(50))

        from indexer import OptimizedIndexer
        indexer = OptimizedIndexer(cls.db_path, build_trigrams=False, build_graph=False)
        indexer.index_file(cls.json_path, show_progress=False)

        import dashboard
        dashboard.DB_PATH = cls.db_path
        dashboard.app.config["TESTING"] = True
        cls.client = dashboard.app.test_client()

    @classmethod
    def tearDownClass(cls):
        import shutil
        shutil.rmtree(cls.tmpdir, ignore_errors=True)

    def test_overview_endpoint(self):
        resp = self.client.get("/api/overview?timeframe=all")
        self.assertEqual(resp.status_code, 200)
        data = resp.get_json()
        self.assertIn("total_messages", data)
        self.assertGreater(data["total_messages"], 0)

    def test_users_endpoint(self):
        resp = self.client.get("/api/users?timeframe=all&limit=10")
        self.assertEqual(resp.status_code, 200)
        data = resp.get_json()
        self.assertIn("users", data)
        self.assertGreater(len(data["users"]), 0)
        user = data["users"][0]
        for field in ("user_id", "name", "messages", "percentage"):
            self.assertIn(field, user)

    def test_users_include_inactive(self):
        resp = self.client.get("/api/users?timeframe=all&include_inactive=0")
        self.assertEqual(resp.status_code, 200)
        data = resp.get_json()
        for user in data["users"]:
            self.assertGreater(user["messages"], 0)

    def test_search_fts_endpoint(self):
        resp = self.client.get("/api/search?q=message&mode=fts&limit=5")
        self.assertEqual(resp.status_code, 200)
        data = resp.get_json()
        self.assertIn("results", data)

    def test_chart_hourly_endpoint(self):
        resp = self.client.get("/api/chart/hourly?timeframe=all")
        self.assertEqual(resp.status_code, 200)
        data = resp.get_json()
        self.assertIsInstance(data, list)
        self.assertEqual(len(data), 24)

    def test_chart_daily_endpoint(self):
        resp = self.client.get("/api/chart/daily?timeframe=all")
        self.assertEqual(resp.status_code, 200)
        data = resp.get_json()
        self.assertIsInstance(data, list)

    def test_cache_invalidate_endpoint(self):
        resp = self.client.get("/api/cache/invalidate")
        self.assertEqual(resp.status_code, 200)
        data = resp.get_json()
        self.assertEqual(data["status"], "invalidated")

    def test_page_routes_return_200(self):
        """All page routes should return 200."""
        for route in ("/", "/users", "/search", "/chat", "/moderation", "/settings"):
            resp = self.client.get(route)
            self.assertEqual(resp.status_code, 200, f"Route {route} failed")

    def test_user_profile_endpoint(self):
        resp = self.client.get("/api/users?timeframe=all&limit=1")
        data = resp.get_json()
        if data["users"]:
            uid = data["users"][0]["user_id"]
            resp2 = self.client.get(f"/api/user/{uid}/profile")
            self.assertEqual(resp2.status_code, 200)
            profile = resp2.get_json()
            self.assertIn("total_messages", profile)
            self.assertIn("hourly_activity", profile)

    def test_overview_has_expected_keys(self):
        resp = self.client.get("/api/overview?timeframe=all")
        data = resp.get_json()
        for key in ("total_messages", "total_users", "links_count", "media_count"):
            self.assertIn(key, data, f"Missing key: {key}")


# ---------------------------------------------------------------------------
# 5. AI Search Schema Test
# ---------------------------------------------------------------------------

class TestAISearchSchema(unittest.TestCase):
    """Test that AI search schema generation matches actual DB."""

    def test_dynamic_schema_includes_real_columns(self):
        tmpdir = tempfile.mkdtemp()
        db_path = os.path.join(tmpdir, "test.db")

        # Initialize DB with real schema
        from indexer import init_database
        conn = init_database(db_path)
        conn.close()

        from ai_search import AISearchEngine
        # Create instance without connecting to a provider
        engine = AISearchEngine.__new__(AISearchEngine)
        engine.db_path = db_path

        schema = engine._get_db_schema()

        # Verify real column names are present
        self.assertIn("text_plain", schema)
        self.assertIn("date_unixtime", schema)
        self.assertIn("has_links", schema)
        self.assertIn("has_media", schema)
        self.assertIn("from_id", schema)
        self.assertIn("participants", schema)

        # Verify old wrong column names are NOT in the dynamic output
        self.assertNotIn("char_count", schema)
        # media_type would not appear unless there's a column named that
        lines_lower = schema.lower()
        # "media_type" should not be a column name (has_media is the real one)
        self.assertNotIn("media_type (", lines_lower)

        import shutil
        shutil.rmtree(tmpdir, ignore_errors=True)


# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    unittest.main(verbosity=2)