File size: 92,039 Bytes
9d76e23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca2bd00
 
 
 
 
 
 
9d76e23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca2bd00
 
 
 
 
 
 
9d76e23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca2bd00
9d76e23
 
 
 
ca2bd00
 
 
 
9d76e23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca2bd00
 
 
 
9d76e23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca2bd00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9d76e23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca2bd00
 
 
 
 
 
 
 
 
9d76e23
 
ca2bd00
 
9d76e23
 
 
 
 
 
 
 
 
 
ca2bd00
 
 
 
 
 
 
 
 
 
 
 
9d76e23
 
 
 
 
 
ca2bd00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9d76e23
ca2bd00
 
 
 
9d76e23
 
 
ca2bd00
 
 
 
 
9d76e23
 
ca2bd00
 
 
 
 
 
 
 
 
 
 
 
 
 
9d76e23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
import pandas as pd # Import pandas for data manipulation, aliased as pd
import numpy as np # Import numpy for numerical operations, aliased as np
import faiss # Import faiss for efficient similarity search
import json # Import json for working with JSON data
from typing import List, Dict, Optional # Import typing hints for better code readability and static analysis
from datetime import datetime # Import datetime for handling date and time objects
import logging # Import logging module for application logging
from sentence_transformers import SentenceTransformer, CrossEncoder, util # Import specific classes from sentence_transformers library
from indicnlp.tokenize import indic_tokenize # Import tokenizer for Indic languages
from indicnlp.normalize.indic_normalize import IndicNormalizerFactory # Import normalizer factory for Indic languages
import torch # Import PyTorch for deep learning functionalities
import os # Import os module for interacting with the operating system (e.g., file paths)
from indicnlp import common # Import common utilities from the indicnlp library
import pickle # Import pickle for saving/loading Python objects (like lists of IDs)
import re # Import re module for regular expression operations
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM # Import classes for loading pre-trained models from Hugging Face Transformers
from fastapi import HTTPException # Import HTTPException for raising HTTP errors in FastAPI applications
import asyncio # Import asyncio for asynchronous operations
from src.fine_tuning.trainer import ModelTrainer
from src.fine_tuning.config import MODEL_STATUS

from src.config.settings import ( # Import configuration constants from the settings file
    EMBED_MODEL_NAME, GENERATOR_MODEL_NAME, RERANKER_MODEL_NAME, # Model names
    INDEX_PATH, INTERACTION_LOG_PATH, INDIC_NLP_RESOURCES_PATH, MONGO_FAISS_META_COLLECTION_NAME, # File paths & DB names
    HEADLINE_COL, SEOLOCATION_COL, DEEPLINK_COL, LAST_UPDATED_COL, FINE_TUNED_RERANKER_SAVE_PATH, # Column names & paths
    IMAGE_ID_COL, IMAGE_RATIO_COL, IMAGE_SIZE_COL, TAXONOMY_COL, INDEX_IDS_PATH, # Column names
    SYN_COL, KEY_COL, ID_COL, TOPIC_COL, PROPERTY_COL, # Existing Column names
    DEFAULT_K, SIMILARITY_THRESHOLD, CANDIDATE_MULTIPLIER # Recommendation parameters
)
from src.database.mongodb import mongodb # Import the MongoDB client instance

logger = logging.getLogger(__name__) # Initialize a logger instance for this module


class RecoRecommender:
    """
    A RAG-based recommender system for multi language content using FAISS retrieval.
    """
    FAISS_IDS_DOC_ID = "faiss_indexed_document_ids_v1" # Document ID for storing indexed IDs in MongoDB
    MODEL_METADATA_DOC_ID = "model_metadata"
    EMBEDDING_CHECKSUM_KEY = "embedding_checksum"
    MODEL_VERSION_KEY = "model_version"
    FINE_TUNING_CONFIG_KEY = "fine_tuning_config"
    EMBEDDING_STATUS_KEY = "embedding_status"
    MODEL_STATUS = MODEL_STATUS  # Add MODEL_STATUS as class attribute

    
    def __init__(self):
        """Initialize the recommender with configuration."""
        logger.info("Initializing RecoRecommender...")
        
        # Configuration
        self.headline_col = HEADLINE_COL
        self.key = KEY_COL
        self.syn = SYN_COL
        self.id_col = ID_COL
        self.topic_col = TOPIC_COL
        self.property_col = PROPERTY_COL
        self.taxonomy_col = TAXONOMY_COL
        self.seolocation_col = SEOLOCATION_COL
        self.deeplink_col = DEEPLINK_COL
        self.last_updated_col = LAST_UPDATED_COL
        self.image_id_col = IMAGE_ID_COL
        self.image_ratio_col = IMAGE_RATIO_COL
        self.image_size_col = IMAGE_SIZE_COL
        self.processed_content_col = f"Processed Content"
        
        # State Variables
        self.df: Optional[pd.DataFrame] = None
        self.index: Optional[faiss.Index] = None
        self.embed_model: Optional[SentenceTransformer] = None
        self.base_reranker: Optional[CrossEncoder] = None
        self.fine_tuned_reranker: Optional[CrossEncoder] = None
        self.indexed_ids: List[str] = []
        self.tokenizer: Optional[AutoTokenizer] = None
        self.generator: Optional[AutoModelForSeq2SeqLM] = None
        self.normalizer = None
        # self.device = "cuda" if torch.cuda.is_available() else "cpu" # Old device detection

        logger.info("Determining compute device...")
        if torch.cuda.is_available():
            self.device = "cuda"
            logger.info("CUDA is available. Using NVIDIA GPU.")
        else:
            try:
                import intel_extension_for_pytorch as ipex # Attempt to import IPEX
                if hasattr(torch, 'xpu') and torch.xpu.is_available():
                    self.device = "xpu"
                    logger.info("Intel XPU is available via IPEX. Using Intel GPU.")
                else:
                    self.device = "cpu"
                    logger.info("CUDA not available. Intel XPU not available or IPEX not fully configured. Using CPU.")
            except ImportError:
                self.device = "cpu"
                logger.info("CUDA not available. Intel Extension for PyTorch (IPEX) not found. Using CPU.")
            except Exception as e: # Catch other potential errors during XPU check
                self.device = "cpu"
                logger.error(f"Error during Intel XPU check: {e}. Using CPU.")
        logger.info(f"Selected device: {self.device}")

        # Initialize MongoDB collection only if connection is available
        if mongodb.db is not None:
            self.faiss_meta_collection = mongodb.db[MONGO_FAISS_META_COLLECTION_NAME]
        else:
            self.faiss_meta_collection = None
            logger.warning("MongoDB not available. Some features may be limited.")
        
        # Initialize model trainer
        self.model_trainer = ModelTrainer(RERANKER_MODEL_NAME, device=self.device) # Pass the determined device
        self._setup_indic_nlp()
        logger.info("RecoRecommender initialized.")

    def _setup_indic_nlp(self):
        """Initialize Indic NLP resources."""
        logger.info("Setting up Indic NLP resources...") # Log the start of Indic NLP setup
        if not os.path.exists(INDIC_NLP_RESOURCES_PATH): # Check if the Indic NLP resources path exists
            raise FileNotFoundError(f"Indic NLP resources not found at {INDIC_NLP_RESOURCES_PATH}") # Raise error if path not found
        
        os.environ["INDIC_RESOURCES_PATH"] = INDIC_NLP_RESOURCES_PATH # Set environment variable for Indic NLP resource path
        try: # Start a try-except block for error handling
            common.set_resources_path(INDIC_NLP_RESOURCES_PATH) # Set the resource path for the indicnlp library
            self.normalizer = IndicNormalizerFactory().get_normalizer("hi") # Initialize the language normalizer
            logger.info("Indic NLP resources setup complete.") # Log successful setup
        except Exception as e: # Catch any exception during setup
            logger.error(f"Error setting up Indic NLP resources: {e}", exc_info=True) # Log the error with traceback
            raise # Re-raise the exception to halt execution if setup fails

    def load_models(self):
        """Load all required ML models."""
        logger.info("Loading ML models...") # Log the start of ML model loading
        try: # Start a try-except block for error handling
            if self.embed_model is None: # Check if the embedding model is not already loaded
                logger.info(f"Loading embedding model: {EMBED_MODEL_NAME}") # Log which embedding model is being loaded
                self.embed_model = SentenceTransformer(EMBED_MODEL_NAME, device=self.device) # Load the sentence transformer model
            
            if self.tokenizer is None or self.generator is None: # Check if tokenizer or generator model is not loaded
                logger.info(f"Loading generator model: {GENERATOR_MODEL_NAME}") # Log which generator model is being loaded
                self.tokenizer = AutoTokenizer.from_pretrained(GENERATOR_MODEL_NAME, model_max_length=1024) # Load tokenizer for the generator
                self.generator = AutoModelForSeq2SeqLM.from_pretrained(GENERATOR_MODEL_NAME) # Load the sequence-to-sequence LM
                if self.device == "cuda": # Check if CUDA (GPU) is the selected device
                    self.generator = self.generator.to(self.device) # Move the generator model to the GPU
            
            # Load base reranker model
            if self.base_reranker is None:
                logger.info(f"Loading base reranker model: {RERANKER_MODEL_NAME}")
                self.base_reranker = CrossEncoder(RERANKER_MODEL_NAME, device=self.device)
                self.reranker = self.base_reranker  # Default to base model
            
            # Try to load fine-tuned model if available
            self._load_fine_tuned_model()
            
            logger.info("ML models loaded.") # Log successful loading of all models
        except Exception as e: # Catch any exception during model loading
            logger.error(f"Error loading ML models: {e}", exc_info=True) # Log the error with traceback
            raise # Re-raise the exception

    def _load_fine_tuned_model(self):
        """Attempt to load the fine-tuned reranker model if available."""
        try:
            metadata = self.model_trainer.get_model_status()
            if metadata.get("current_model_status") == MODEL_STATUS["FINE_TUNED"]:
                current_version = metadata.get("current_version", "v0")
                model_path = str(self.model_trainer.get_model_path(current_version))
                
                if os.path.exists(model_path):
                    logger.info(f"Loading fine-tuned reranker model version {current_version}")
                    self.fine_tuned_reranker = CrossEncoder(model_path, device=self.device)
                    self.reranker = self.fine_tuned_reranker
                    logger.info("Successfully loaded fine-tuned reranker model")
                    return
            
            logger.info("No fine-tuned model found or not in fine-tuned state, using base model")
            self.reranker = self.base_reranker
        except Exception as e:
            logger.error(f"Error loading fine-tuned model: {e}. Falling back to base model.")
            self.reranker = self.base_reranker

    def _calculate_embedding_checksum(self, content: str) -> str:
        """Calculate a checksum for content to detect changes in embedding logic."""
        import hashlib
        # Include model name and any relevant preprocessing parameters in the checksum
        checksum_content = f"{content}_{EMBED_MODEL_NAME}"
        return hashlib.md5(checksum_content.encode()).hexdigest()

    def _get_model_metadata(self) -> Dict:
        """Retrieve current model metadata from MongoDB."""
        try:
            if self.faiss_meta_collection is None:
                logger.warning("MongoDB not available. Returning default metadata.")
                return {
                    "_id": self.MODEL_METADATA_DOC_ID,
                    "embedding_model_name": EMBED_MODEL_NAME,
                }
            
            metadata = self.faiss_meta_collection.find_one({"_id": self.MODEL_METADATA_DOC_ID})
            if metadata:
                return metadata
            # Default metadata for the recommender service, focusing on aspects not directly
            # managed by ModelTrainer's file-based metadata (e.g., embedding model name).
            # Reranker model status, version, and its specific fine-tuning configuration
            # are primarily managed by ModelTrainer and its model_metadata.json file.
            return {
                "_id": self.MODEL_METADATA_DOC_ID,
                "embedding_model_name": EMBED_MODEL_NAME,
                # Add other global operational metadata specific to RecoRecommender here if needed.
            }
        except Exception as e:
            logger.error(f"Error retrieving model metadata: {e}")
            # Return a minimal fallback to ensure core functionalities can proceed if possible
            return {"_id": self.MODEL_METADATA_DOC_ID, "embedding_model_name": EMBED_MODEL_NAME}

    def _update_model_metadata(self, updates: Dict) -> bool:
        """Update model metadata in MongoDB."""
        try:
            if self.faiss_meta_collection is None:
                logger.warning("MongoDB not available. Cannot update model metadata.")
                return False
                
            result = self.faiss_meta_collection.update_one(
                {"_id": self.MODEL_METADATA_DOC_ID},
                {"$set": {**updates, "metadata_last_updated": datetime.now()}}, # Key changed for clarity
                upsert=True
            )
            return result.acknowledged
        except Exception as e:
            logger.error(f"Error updating model metadata: {e}")
            return False

    def _increment_model_version(self) -> str:
        """DEPRECATED: Reranker model versioning is handled by ModelTrainer."""
        # This method appears unused and its logic conflicts with ModelTrainer's
        # file-system based versioning for fine-tuned models.
        logger.warning("RecoRecommender._increment_model_version() is deprecated. Reranker versioning is managed by ModelTrainer.")
        # Fallback to ModelTrainer's current version if absolutely needed, but ideally this method should be removed.
        return self.model_trainer.get_model_status().get("current_version", "v0")

    def _needs_reembedding_batch(self, doc_ids: List[str], current_checksum: str) -> List[str]:
        """Check which documents from a batch need reembedding."""
        try:
            if self.faiss_meta_collection is None:
                logger.warning("MongoDB not available. Assuming all documents need reembedding.")
                return doc_ids
            
            # Query for all documents in one go
            metadata_docs = self.faiss_meta_collection.find(
                {"_id": {"$in": doc_ids}}
            )
            
            # Create a dict for quick lookup
            metadata_map = {doc["_id"]: doc.get(self.EMBEDDING_CHECKSUM_KEY) for doc in metadata_docs}
            
            # Return IDs that need reembedding
            return [
                doc_id for doc_id in doc_ids 
                if doc_id not in metadata_map or metadata_map[doc_id] != current_checksum
            ]
        except Exception as e:
            logger.error(f"Error checking embedding status for batch: {e}")
            return doc_ids  # If error, assume all need reembedding

    def _update_embedding_metadata(self, doc_id: str, checksum: str):
        """Update metadata for document embeddings."""
        try:
            self.faiss_meta_collection.update_one(
                {"_id": doc_id},
                {
                    "$set": {
                        self.EMBEDDING_CHECKSUM_KEY: checksum,
                        "last_updated": datetime.now()
                    }
                },
                upsert=True
            )
        except Exception as e:
            logger.error(f"Error updating embedding metadata for doc {doc_id}: {e}")

    def _needs_reembedding(self, doc_id: str, current_checksum: str) -> bool:
        """Check if a document needs to be reembedded."""
        try:
            metadata = self.faiss_meta_collection.find_one({"_id": doc_id})
            if not metadata or metadata.get(self.EMBEDDING_CHECKSUM_KEY) != current_checksum:
                return True
            return False
        except Exception as e:
            logger.error(f"Error checking embedding status for doc {doc_id}: {e}")
            return True

    async def update_embeddings_and_index(self, force_reload_data: bool = True):
        """
        Updates embeddings and the FAISS index.
        If existing indexed documents have changed or the embedding model is different,
        a full rebuild of the index is triggered.
        Otherwise, only new documents are added incrementally.
        The FAISS index file (INDEX_PATH) and metadata are saved.
        """
        if force_reload_data:
            logger.info("Reloading data from MongoDB for index update...")
            self._load_data_from_mongo()

        if self.df is None or self.df.empty:
            logger.warning("DataFrame is empty. Cannot update embeddings or index.")
            return

        if self.embed_model is None:
            logger.info("Embedding model not loaded. Loading models...")
            self.load_models() # Ensure embed_model is available

        logger.info("Checking for documents requiring embedding updates or additions...")

        trigger_full_rebuild = False
        if self.index and self.indexed_ids:
            df_indexed_docs = self.df[self.df[self.id_col].isin(self.indexed_ids)]
            content_map_for_indexed_ids = pd.Series(
                df_indexed_docs[self.processed_content_col].values, 
                index=df_indexed_docs[self.id_col]
            ).to_dict()

            for doc_id in self.indexed_ids:
                if doc_id in content_map_for_indexed_ids:
                    current_content = content_map_for_indexed_ids[doc_id]
                    current_checksum = self._calculate_embedding_checksum(current_content)
                    if self._needs_reembedding(doc_id, current_checksum):
                        logger.info(f"Existing document ID {doc_id} requires re-embedding. Content/model change detected.")
                        trigger_full_rebuild = True
                        break
                else:
                    logger.info(f"Document ID {doc_id} was in index but not in current data (deleted). Rebuild needed.")
                    trigger_full_rebuild = True
                    break
        
        if self.index is None:
            logger.info("No existing FAISS index found. A full build is required.")
            trigger_full_rebuild = True

        if trigger_full_rebuild:
            logger.info("Triggering a full rebuild of the FAISS index.")
            self.build_indexes_and_save(data_already_loaded=True) # self.df is already loaded
            return

        logger.info("No changes detected in existing indexed documents that require a full rebuild.")
        current_indexed_ids_set = set(self.indexed_ids)
        all_df_ids_set = set(self.df[self.id_col].tolist())
        new_doc_ids_to_add = list(all_df_ids_set - current_indexed_ids_set)

        if new_doc_ids_to_add:
            logger.info(f"Found {len(new_doc_ids_to_add)} new documents to add to the index.")
            new_docs_df = self.df[self.df[self.id_col].isin(new_doc_ids_to_add)].copy()
            
            if not new_docs_df.empty:
                new_embeddings, actual_new_ids_embedded = self._generate_embeddings(new_docs_df)

                if new_embeddings.size > 0:
                    self.index.add(new_embeddings.astype(np.float32))
                    self.indexed_ids.extend(actual_new_ids_embedded)
                    
                    logger.info(f"Added {len(actual_new_ids_embedded)} new vectors to FAISS index. Total vectors: {self.index.ntotal}.")
                    faiss.write_index(self.index, INDEX_PATH)
                    
                    self.faiss_meta_collection.update_one(
                        {"_id": self.FAISS_IDS_DOC_ID},
                        {"$set": {"ids": self.indexed_ids, "last_updated": datetime.now(), "total_vectors": self.index.ntotal}},
                        upsert=True
                    )
                    logger.info("Updated FAISS index and list of indexed IDs saved.")

                    for doc_id in actual_new_ids_embedded:
                        content_for_checksum = new_docs_df[new_docs_df[self.id_col] == doc_id][self.processed_content_col].iloc[0]
                        checksum = self._calculate_embedding_checksum(content_for_checksum)
                        self._update_embedding_metadata(doc_id, checksum)
                    logger.info(f"Updated embedding metadata for {len(actual_new_ids_embedded)} new documents.")
                else:
                    logger.info("No embeddings were generated for the new documents (e.g., content was empty).")
            else:
                logger.info("Identified new document IDs, but the corresponding DataFrame slice was empty.")
        else:
            logger.info("No new documents to add. Index is up-to-date with the current DataFrame.")

        # Ensure index is saved if any changes (new docs added) occurred, even if not a full rebuild
        # The logic above already saves if new_embeddings.size > 0.
        # If no new docs and no rebuild, this point is reached.
        # Redundant save check removed; saves are handled within the conditional branches above.

    def fallback_to_base_model(self):
        """Switch to using the base model for inference."""
        if self.base_reranker is not None:
            self.reranker = self.base_reranker
            logger.info("Switched to base reranker model")
        else:
            logger.warning("Base reranker model not available")

    def use_fine_tuned_model(self):
        """Switch to using the fine-tuned model for inference."""
        if self.fine_tuned_reranker is not None:
            self.reranker = self.fine_tuned_reranker
            logger.info("Switched to fine-tuned reranker model")
        else:
            logger.warning("Fine-tuned model not available, staying with current model")

    def _load_data_from_mongo(self):
        """Load and preprocess data from MongoDB."""
        logger.info("Loading data from MongoDB...") # Log the start of data loading from MongoDB
        try: # Start a try-except block for error handling
            # Fetch documents with projection
            projection = { # Define which fields to retrieve from MongoDB
                self.id_col: 1,
                self.headline_col: 1, # Include the headline column
                self.key: 1, # Include the key column
                self.syn: 1, # Include the system column
                self.topic_col: 1, # Include the topic column
                self.taxonomy_col: 1, # Include the taxonomy column
                self.property_col: 1, # Include the property column
                self.seolocation_col: 1,
                self.deeplink_col: 1,
                self.last_updated_col: 1,
                self.image_id_col: 1,
                self.image_ratio_col: 1,
                self.image_size_col: 1,
                "_id": 0 # Exclude the default MongoDB _id field
            }
            cursor = mongodb.news_collection.find({}, projection) # Execute the find query on the news_collection
            data = list(cursor) # Convert the cursor result to a list of dictionaries

            if not data: # Check if no data was returned from MongoDB
                logger.warning("No documents found in MongoDB.") # Log a warning if no documents are found
                self.df = pd.DataFrame(columns=[ # Create an empty DataFrame with expected columns
                    self.id_col, self.headline_col, self.key, self.syn, self.taxonomy_col,
                    self.topic_col, self.property_col, self.processed_content_col,
                    self.seolocation_col, self.deeplink_col, self.last_updated_col,
                    self.image_id_col, self.image_ratio_col, self.image_size_col
                ])
                return # Exit the method as there's no data to process

            self.df = pd.DataFrame(data) # Convert the list of dictionaries to a pandas DataFrame
            logger.info(f"Loaded {len(self.df)} documents from MongoDB.") # Log the number of documents loaded

            # Data cleaning and preprocessing
            required_cols = [self.id_col, self.headline_col, self.key, self.syn] # Update the list of required columns
            for col in required_cols: # Iterate over the list of required columns
                if col not in self.df.columns: # Check if a required column is missing in the DataFrame
                    raise ValueError(f"Required column '{col}' not found in MongoDB documents.") # Raise error if missing

            # Handle optional columns that are primarily textual or have simple defaults
            textual_optional_cols = {
                self.topic_col: "N/A",      # Default to "N/A" string
                self.property_col: "N/A",   # Default to "N/A" string
                self.key: ""                # Default to empty string, used for text processing
            }
            for col, default_val in textual_optional_cols.items():
                if col not in self.df.columns:
                    logger.warning(f"Optional column '{col}' not found. Adding with default value '{default_val}'.") # Log a warning
                    self.df[col] = default_val
                else:
                    self.df[col] = self.df[col].fillna(default_val).astype(str) # Fill NA and ensure string

            # Handle other optional columns that are expected to be None if missing or all NaN
            other_optional_cols = [
                self.seolocation_col, self.deeplink_col, self.last_updated_col,
                self.image_id_col, self.image_ratio_col, self.image_size_col
            ]
            for col in other_optional_cols:
                if col not in self.df.columns:
                    logger.warning(f"Optional column '{col}' not found. Adding with default value None.")
                    self.df[col] = None
                else:
                    # If column exists, convert np.nan to None for object dtypes.
                    # For numeric dtypes, np.nan is the standard missing value representation.
                    if pd.api.types.is_object_dtype(self.df[col].dtype):
                        self.df[col] = self.df[col].replace({np.nan: None})

            # Special handling for taxonomy column (list of objects)
            if self.taxonomy_col not in self.df.columns:
                logger.warning(f"Taxonomy column '{self.taxonomy_col}' not found. Adding with default empty list for each row.")
                self.df[self.taxonomy_col] = [[] for _ in range(len(self.df))]
            else:
                # Apply the cleaning function to the taxonomy column
                self.df[self.taxonomy_col] = self.df[self.taxonomy_col].apply(self._clean_taxonomy_cell)

            # Clean data
            initial_len = len(self.df) # Store the number of rows before cleaning
            self.df = self.df.dropna(subset=[self.id_col, self.headline_col, self.syn]) # Drop rows where essential columns are NaN
            self.df = self.df[self.df[self.headline_col].apply(lambda x: isinstance(x, str) and x.strip() != '')] # Keep rows with non-empty string headlines
            self.df = self.df[self.df[self.syn].apply(lambda x: isinstance(x, str) and x.strip() != '')] 

            if len(self.df) < initial_len: # Check if any rows were dropped during cleaning
                logger.warning(f"Dropped {initial_len - len(self.df)} rows due to missing/invalid values.") # Log the number of dropped rows

            if self.df.empty: # Check if the DataFrame is empty after cleaning
                logger.warning("DataFrame is empty after cleaning.") # Log a warning
                self.df = pd.DataFrame(columns=[ # Re-initialize an empty DataFrame with expected columns
                    self.id_col, self.headline_col, self.key, self.syn, self.taxonomy_col,
                    self.topic_col, self.property_col, self.processed_content_col,
                    self.seolocation_col, self.deeplink_col, self.last_updated_col,
                    self.image_id_col, self.image_ratio_col, self.image_size_col
                ])
                return # Exit if DataFrame is empty

            # Improved contextual preprocessing: combine headline, synopsis, and keywords for richer embeddings
            logger.info("Preprocessing and combining content (headline, synopsis, keywords, taxonomy) for contextual embeddings...")
            self.df[self.headline_col] = self.df[self.headline_col].astype(str)
            self.df[self.syn] = self.df[self.syn].astype(str)
            self.df[self.key] = self.df[self.key].astype(str)

            def combine_and_preprocess_content(row):
                # Combine headline, synopsis, keywords, and taxonomy for better context
                headline = row[self.headline_col].strip()
                synopsis = row[self.syn].strip()
                keywords = str(row[self.key]).strip() # Ensure string, though already handled

                # Preprocess each part
                processed_headline = self._preprocess_text(headline)
                processed_synopsis = self._preprocess_text(synopsis)
                processed_keywords = self._preprocess_text(keywords)

                # Process taxonomy terms
                taxonomy_terms_list = row[self.taxonomy_col]
                processed_taxonomy_names = []
                if isinstance(taxonomy_terms_list, list):
                    for term_obj in taxonomy_terms_list:
                        if isinstance(term_obj, dict) and "name" in term_obj and term_obj["name"]:
                            processed_taxonomy_names.append(self._preprocess_text(str(term_obj["name"])))
                processed_taxonomy_string = " ".join(p_name for p_name in processed_taxonomy_names if p_name)

                # Join all parts, prioritizing non-empty fields
                parts = [p for p in [processed_headline, processed_synopsis, processed_keywords, processed_taxonomy_string] if p]

                # A more structured combination might help models understand the different parts
                structured_parts = []
                if processed_headline: structured_parts.append(f"शीर्षक: {processed_headline}")
                if processed_synopsis: structured_parts.append(f"सारांश: {processed_synopsis}")
                if processed_keywords: structured_parts.append(f"कीवर्ड: {processed_keywords}")
                if processed_taxonomy_string: structured_parts.append(f"श्रेणी: {processed_taxonomy_string}")
                return " ".join(structured_parts)

            self.df[self.processed_content_col] = self.df.apply(combine_and_preprocess_content, axis=1)

            logger.info("Data loading and preprocessing complete.") # Log completion of data loading and preprocessing

        except Exception as e: # Catch any exception during the process
            # Ensure df is None or empty on failure
            logger.error(f"Error loading data from MongoDB: {e}", exc_info=True) # Log the error with traceback
            self.df = None # Set DataFrame to None to indicate failure
            raise # Re-raise the exception

    def _clean_taxonomy_cell(self, cell_value) -> list:
        """
        Cleans individual cells of the taxonomy column.
        Ensures that the cell content is a list, handling scalars, NaNs, and unexpected array types.
        """
        if isinstance(cell_value, list):
            # If it's already a list (empty or not), return it.
            return cell_value
        elif pd.api.types.is_scalar(cell_value) and pd.isna(cell_value):
            # Handles scalar np.nan, None, pd.NA by returning an empty list.
            return []
        elif isinstance(cell_value, (np.ndarray, pd.Series)):
            # Handles unexpected np.ndarray or pd.Series in a cell.
            logger.warning(
                f"Unexpected array/Series type in taxonomy column cell: {type(cell_value)}. "
                f"Content (first 100 chars): {str(cell_value)[:100]}. Converting to empty list."
            )
            return []
        else:
            # Handles other scalar types (e.g., string, int) or unhandled types.
            # These are converted to an empty list, mimicking the original lambda's behavior.
            if not pd.api.types.is_scalar(cell_value): # Log if it's an unexpected non-scalar, non-list, non-array type
                 logger.warning(
                    f"Unexpected non-scalar, non-list/array type in taxonomy column cell: {type(cell_value)}. "
                    f"Content (first 100 chars): {str(cell_value)[:100]}. Converting to empty list."
                )
            return []

    def _preprocess_text(self, text: str) -> str:
        """Normalize and tokenize Hindi text."""
        """
        Normalize and tokenize text. Applies Indic normalization for Hindi text
        and generic tokenization for others.
        """
        if not isinstance(text, str):
            logger.warning(f"Received non-string input for preprocessing: {type(text)}")
            return ""
        
        text = text.strip() # Remove leading/trailing whitespace
        if not text: # Handle empty strings after stripping
            return ""

        try:
            # Check for presence of Devanagari characters to identify Hindi text.
            # This is a basic check; for more complex scenarios, consider a language detection library.
            is_text = bool(re.search(r'[\u0900-\u097F]', text))

            if is_text:
                if self.normalizer is None:
                    logger.error("IndicNormalizer not initialized, but text detected. Proceeding without normalization.")
                    normalized_text = text # Fallback: use original text if normalizer is missing
                else:
                    normalized_text = self.normalizer.normalize(text)
            else:
                # For non-Hindi text, skip Hindi-specific normalization
                # logger.debug(f"Skipping Hindi normalization for non-Hindi text: {text[:50]}...") # Optional: enable if needed
                normalized_text = text
            
            tokens = indic_tokenize.trivial_tokenize(normalized_text) # Tokenize the (potentially normalized) text
            return " ".join(tokens)
        
        except Exception as e:
            logger.error(f"Error during text preprocessing for text starting with '{text[:50]}...': {e}", exc_info=True)
            # Fallback to returning the original text to prevent downstream errors
            return text

    def _generate_embeddings(self, df_subset: pd.DataFrame) -> tuple[np.ndarray, List[str]]:
        """
        Generate embeddings for the processed content column of a DataFrame subset.
        Returns a tuple of (embeddings_array, list_of_ids).
        """
        if not self.embed_model: # Check if the embedding model is loaded
            raise RuntimeError("Embedding model not loaded") # Raise an error if the model is not loaded
        
        if df_subset.empty or self.processed_content_col not in df_subset.columns or self.id_col not in df_subset.columns:
            logger.warning(
                f"DataFrame subset is empty or missing required columns "
                f"('{self.processed_content_col}', '{self.id_col}') for embedding generation."
            )
            return np.array([]), []

        texts_to_embed = df_subset[self.processed_content_col].tolist()
        # Ensure all texts are strings (preprocessing should ideally handle this, but as a safeguard)
        texts_to_embed = [str(t) if pd.notna(t) else "" for t in texts_to_embed]
        
        ids_to_embed = df_subset[self.id_col].tolist()

        if not texts_to_embed: # If all texts became empty strings or list was initially empty
            logger.warning("No non-empty texts available in the subset to generate embeddings.")
            return np.array([]), []

        try: # Start a try-except block for error handling
            embeddings = self.embed_model.encode(texts_to_embed, show_progress_bar=False, convert_to_numpy=True) # Generate embeddings
            return embeddings, ids_to_embed # Return the generated embeddings and their IDs
        except Exception as e: # Catch any exception during embedding generation
            logger.error(f"Error generating embeddings: {e}", exc_info=True) # Log the error with traceback
            return np.array([]), [] # Return empty array and list in case of error

    def _build_faiss_index(self) -> List[str]:
        """Build FAISS index from processed content and return the list of indexed IDs."""
        if self.df is None or self.df.empty or self.processed_content_col not in self.df.columns:
            raise ValueError("Cannot build FAISS index: Data not ready (DataFrame is None, empty, or missing processed content column)")
        
        if self.embed_model is None:
            raise RuntimeError("Cannot build FAISS index: Embedding model not loaded")

        logger.info("Building FAISS index...")
        if self.df.empty:
             raise ValueError("DataFrame is empty, cannot build FAISS index")

        # Process in batches for better memory management and parallelization
        BATCH_SIZE = 1000  # Adjust based on available memory
        total_docs = len(self.df)
        all_embeddings = []
        all_ids = []

        for start_idx in range(0, total_docs, BATCH_SIZE):
            end_idx = min(start_idx + BATCH_SIZE, total_docs)
            batch_df = self.df.iloc[start_idx:end_idx]
            
            # Generate embeddings for the batch
            batch_embeddings, batch_ids = self._generate_embeddings(batch_df)
            
            if batch_embeddings.size > 0:
                all_embeddings.append(batch_embeddings)
                all_ids.extend(batch_ids)
                
            logger.info(f"Processed batch {start_idx//BATCH_SIZE + 1}/{(total_docs + BATCH_SIZE - 1)//BATCH_SIZE}")

        if not all_embeddings:
            logger.warning("No embeddings were generated. FAISS index will be empty.")
            self.index = None
            return []

        # Concatenate all batch embeddings
        embeddings = np.vstack(all_embeddings).astype(np.float32)
        dimension = embeddings.shape[1]
        
        # Initialize HNSW index with optimized parameters
        hnsw_m = 32  # Number of neighbors per layer
        ef_construction = 100  # Higher value = better accuracy but slower construction
        self.index = faiss.IndexHNSWFlat(dimension, hnsw_m, faiss.METRIC_INNER_PRODUCT)
        self.index.hnsw.efConstruction = ef_construction
        
        # Add vectors in batches to reduce memory usage
        BATCH_SIZE_INDEX = 10000  # Adjust based on available memory
        for i in range(0, len(embeddings), BATCH_SIZE_INDEX):
            batch = embeddings[i:i + BATCH_SIZE_INDEX]
            self.index.add(batch)
            logger.info(f"Added batch {i//BATCH_SIZE_INDEX + 1}/{(len(embeddings) + BATCH_SIZE_INDEX - 1)//BATCH_SIZE_INDEX} to index")

        logger.info(f"FAISS index built with {self.index.ntotal} vectors")
        return all_ids

    def _load_faiss_index_and_ids(self):
        """Load FAISS index and corresponding IDs from files."""
        if not os.path.exists(INDEX_PATH): # Check if the FAISS index file exists at the specified path
            raise FileNotFoundError(f"FAISS index file not found at {INDEX_PATH}") # Raise error if file not found
        
        logger.info(f"Loading FAISS index from: {INDEX_PATH}") # Log the path from which the index is being loaded
        try: # Start a try-except block for error handling
            self.index = faiss.read_index(INDEX_PATH) # Read the FAISS index from the file
            logger.info(f"FAISS index loaded with {self.index.ntotal} vectors") # Log the number of vectors in the loaded index
        except Exception as e: # Catch any exception during index loading
            logger.error(f"Error loading FAISS index: {e}", exc_info=True) # Log the error with traceback
            self.index = None # Ensure index is None on failure
            raise # Re-raise the exception

        # Try to load IDs from MongoDB if available
        if self.faiss_meta_collection is not None:
            logger.info(f"Loading FAISS index IDs from MongoDB collection '{MONGO_FAISS_META_COLLECTION_NAME}', document_id '{self.FAISS_IDS_DOC_ID}'")
            try:
                ids_document = self.faiss_meta_collection.find_one({"_id": self.FAISS_IDS_DOC_ID})
                if ids_document and "ids" in ids_document:
                    self.indexed_ids = ids_document["ids"]
                    logger.info(f"Loaded {len(self.indexed_ids)} indexed IDs from MongoDB.")

                    # Basic consistency check
                    if self.index and self.index.ntotal != len(self.indexed_ids):
                        logger.warning(
                            f"FAISS index vector count ({self.index.ntotal}) "
                            f"does not match loaded ID count from MongoDB ({len(self.indexed_ids)}). "
                            "Index might be inconsistent. Consider rebuilding."
                        )
                else:
                    logger.warning(f"FAISS index IDs document not found in MongoDB or 'ids' field missing. Will attempt to build if necessary.")
                    self.indexed_ids = [] # Initialize as empty if not found

            except Exception as e:
                logger.error(f"Error loading FAISS index IDs from MongoDB: {e}", exc_info=True)
                self.indexed_ids = [] # Ensure IDs list is empty on failure
                # We don't re-raise here, as load_components will decide if a rebuild is needed
                # based on whether self.indexed_ids is populated.
        else:
            logger.warning("MongoDB not available. Cannot load indexed IDs. Operating with empty ID list.")
            self.indexed_ids = []

    def build_indexes_and_save(self, data_already_loaded: bool = False):
        """
        Load data (if not already loaded), build FAISS index from current self.df, and save.
        Assumes self.df is populated if data_already_loaded is True.
        """
        logger.info("Starting index building process...") # Log the start of the index building and saving process
        try: # Start a try-except block for error handling
            if not data_already_loaded:
                self._load_data_from_mongo() # Load data from MongoDB if not already loaded
            
            if self.df is None or self.df.empty: # Check if DataFrame is None or empty after loading
                raise ValueError("Data is empty. Cannot build index.") # Raise error if data is not loaded properly
            if self.embed_model is None: # Ensure models are loaded before building index
                 self.load_models()

            # Build the FAISS index using the loaded data and models
            # This method now returns the list of IDs that were indexed
            indexed_ids = self._build_faiss_index()
            self.indexed_ids = indexed_ids # Store the list of IDs

            # Save FAISS index
            logger.info(f"Saving FAISS index to: {INDEX_PATH}") # Log the path where the index will be saved
            index_dir = os.path.dirname(INDEX_PATH) # Get the directory part of the index path
            if index_dir and not os.path.exists(index_dir): # If the directory exists and is not empty string
                os.makedirs(index_dir) # Create the directory if it doesn't exist

            # Save the index and the corresponding IDs
            faiss.write_index(self.index, INDEX_PATH)

            # Save indexed_ids to MongoDB
            logger.info(f"Saving FAISS index IDs to MongoDB collection '{MONGO_FAISS_META_COLLECTION_NAME}', document_id '{self.FAISS_IDS_DOC_ID}'")
            try:
                if self.faiss_meta_collection is not None:
                    self.faiss_meta_collection.update_one(
                        {"_id": self.FAISS_IDS_DOC_ID},
                        {"$set": {"ids": self.indexed_ids, "last_updated": datetime.now()}},
                        upsert=True
                    )
                    logger.info(f"Saved {len(self.indexed_ids)} indexed IDs to MongoDB.")
                else:
                    logger.warning("MongoDB not available. Skipping indexed IDs save.")
            except Exception as e:
                logger.error(f"Error saving FAISS index IDs to MongoDB: {e}", exc_info=True)
                # Don't raise here as the FAISS index was saved successfully
                # The IDs can be regenerated if needed
            logger.info("Index building and saving complete") # Log successful completion
        except Exception as e: # Catch any exception during the process
            logger.error(f"Error during index building: {e}", exc_info=True) # Log the error with traceback
            raise # Re-raise the exception

    def load_components(self):
        """Load all components (models, data, index)."""
        logger.info("Loading components...") # Log the start of component loading
        try: # Start a try-except block for error handling
            self.load_models() # Load all machine learning models
            
            # Try to load data from MongoDB, but handle failures gracefully
            try:
                self._load_data_from_mongo() # Load data from MongoDB and preprocess it
                logger.info("Successfully loaded data from MongoDB")
            except Exception as mongo_error:
                logger.warning(f"Failed to load data from MongoDB: {mongo_error}")
                logger.info("Attempting to work with existing FAISS index without MongoDB data...")
                # Set df to None to indicate no MongoDB data is available
                self.df = None

            # Try to load FAISS index and IDs
            try:
                self._load_faiss_index_and_ids() # Tries to load .bin and IDs from Mongo.
                                                # Sets self.index and self.indexed_ids.
                                                # self.indexed_ids will be [] if Mongo data for IDs is missing.
                                                # Raises FileNotFoundError if .bin (INDEX_PATH) is missing.

                if self.index and self.index.ntotal > 0:
                    logger.info(f"FAISS index loaded successfully with {self.index.ntotal} vectors")
                    
                    # If we have MongoDB data, proceed with normal logic
                    if self.df is not None and not self.df.empty:
                        # Consistency check and incremental update logic
                        if not self.indexed_ids:
                            logger.warning("FAISS index file loaded, but no corresponding IDs found in MongoDB. Rebuilding for consistency.")
                            self.build_indexes_and_save(data_already_loaded=True)
                        else:
                            logger.info("Existing FAISS index and IDs loaded from storage.")
                            # Proceed with incremental update logic
                            current_df_ids = set(self.df[self.id_col].tolist())
                            indexed_ids_set = set(self.indexed_ids)
                            new_ids_to_add = list(current_df_ids - indexed_ids_set)
     
                            if new_ids_to_add:
                                logger.info(f"Found {len(new_ids_to_add)} new documents to add to the index.")
                                new_docs_df = self.df[self.df[self.id_col].isin(new_ids_to_add)].copy()
     
                                new_embeddings, new_doc_ids_added = self._generate_embeddings(new_docs_df)

                                if new_embeddings.size > 0:
                                    self.index.add(new_embeddings.astype(np.float32))
                                    self.indexed_ids.extend(new_doc_ids_added)
                                    logger.info(f"Added {len(new_doc_ids_added)} new vectors to FAISS index. Total vectors: {self.index.ntotal}")

                                    # Save the updated FAISS index
                                    faiss.write_index(self.index, INDEX_PATH)
                                    # Try to save the updated IDs to MongoDB, but don't fail if it doesn't work
                                    try:
                                        self.faiss_meta_collection.update_one(
                                            {"_id": self.FAISS_IDS_DOC_ID},
                                            {"$set": {"ids": self.indexed_ids, "last_updated": datetime.now()}},
                                            upsert=True
                                        )
                                        logger.info("Updated FAISS index and IDs saved to MongoDB.")
                                    except Exception as e:
                                        logger.warning(f"Could not save IDs to MongoDB: {e}")
                            else:
                                logger.info("No new documents found to add to the index. Index is up-to-date.")
                    else:
                        # No MongoDB data available, but we have a FAISS index
                        logger.info("FAISS index available but no MongoDB data. Operating in limited mode.")
                        if not self.indexed_ids:
                            logger.warning("No indexed IDs available. Some functionality may be limited.")
                else:
                    # This case handles if self.index is None (FileNotFoundError caught below)
                    # or if index was loaded but empty and no IDs from Mongo.
                    if self.df is not None and not self.df.empty:
                        logger.info("FAISS index and/or IDs not found or empty. Building new index.")
                        self.build_indexes_and_save(data_already_loaded=True)
                    else:
                        logger.warning("No data available (neither MongoDB nor FAISS index). Cannot build index.")

            except FileNotFoundError: # This means INDEX_PATH (.bin file) was not found.
                logger.warning(f"FAISS index file ({INDEX_PATH}) not found.")
                if self.df is not None and not self.df.empty:
                    logger.info("Building index from scratch.")
                    self.build_indexes_and_save(data_already_loaded=True)
                else:
                    logger.error("Cannot build index: no data available.")
            except Exception as e:
                logger.error(f"Error loading FAISS index: {e}", exc_info=True)
                if self.df is not None and not self.df.empty:
                    logger.info("Attempting to rebuild index due to loading error.")
                    self.build_indexes_and_save(data_already_loaded=True)
                else:
                    logger.error("Cannot rebuild index: no data available.")
                    
            logger.info("Components loaded successfully") # Log successful loading of all components
        except Exception as e: # Catch any exception during component loading
            logger.error(f"Error loading components: {e}", exc_info=True) # Log the error with traceback
            raise # Re-raise the exception

    def get_recommendations(
        self, # Added self parameter
        query: str,
        k: int = DEFAULT_K, # Number of recommendations to return, defaults to DEFAULT_K from config
        similarity_threshold: float = SIMILARITY_THRESHOLD # Similarity threshold, defaults to SIMILARITY_THRESHOLD from config
    ) -> Dict:
        """
        Get recommendations for a query.
        Returns a dictionary with retrieved documents and generated response.
        """
        # Check prerequisites
        if self.df is None or self.df.empty: # Check if the DataFrame (content data) is loaded
            raise HTTPException(status_code=503, detail="Recommender data not available") # Raise 503 error if data is missing
        
        if not all([self.index, self.embed_model, self.reranker, self.generator]): # Check if all essential components are loaded
            missing = [ # List comprehension to find names of missing components
                name for name, component in [ # Iterate through component names and their instances
                    ("FAISS index", self.index), # FAISS index
                    ("Embedding model", self.embed_model), # Embedding model
                    ("Reranker model", self.reranker), # Reranker model
                    ("Generator model", self.generator) # Generator model
                ] if component is None # Check if the component is None (not loaded)
            ]
            raise HTTPException( # Raise 503 error if components are missing
                status_code=503, # HTTP status code for Service Unavailable
                detail=f"Recommender not fully initialized. Missing: {', '.join(missing)}" # Error detail listing missing components
            )

        logger.info(f"Processing recommendation request: query='{query}', k={k}") # Log the incoming recommendation request

        # Preprocess the query
        processed_query = self._preprocess_text(query) # Preprocess the input Hindi query
        query_embedding, _ = self._generate_embeddings(pd.DataFrame({self.processed_content_col: [processed_query], self.id_col: ["query"]})) # Generate embedding for the processed query
        if query_embedding.size == 0: # Check if the query embedding is empty (e.g., generation failed)
            logger.warning("Query embedding is empty.") # Log a warning
            return { # Return an empty result
                "retrieved_documents": [], # Empty list of documents
                "generated_response": "No recommendations found. (Query embedding failed)" # Informative message
            }

        # Retrieve candidates from FAISS
        num_candidates = max(k * CANDIDATE_MULTIPLIER, k) # Determine number of candidates to fetch (k * multiplier, or at least k)
        try: # Start a try-except block for FAISS search
            D, I = self.index.search(query_embedding.astype(np.float32), num_candidates) # Search FAISS index (D=distances, I=indices)
        except Exception as e: # Catch any exception during FAISS search
            logger.error(f"Error during FAISS search: {e}", exc_info=True) # Log the error with traceback
            return { # Return an empty result
                "retrieved_documents": [], # Empty list of documents
                "generated_response": "No recommendations found. (FAISS search failed)" # Informative message
            }

        # Process FAISS results
        retrieved_faiss_indices = I[0]
        retrieved_faiss_scores = D[0]

        # Map FAISS indices to original document IDs and collect scores
        valid_candidate_data = []
        for faiss_idx, score in zip(retrieved_faiss_indices, retrieved_faiss_scores):
            # Ensure FAISS index is valid and within bounds of self.indexed_ids
            if faiss_idx != -1 and faiss_idx < len(self.indexed_ids):
                valid_candidate_data.append({
                    "original_id": self.indexed_ids[faiss_idx], # Actual ID of the item
                    "faiss_score": score
                })
            elif faiss_idx != -1: # Log if faiss_idx is valid but out of bounds for indexed_ids
                logger.warning(
                    f"FAISS index {faiss_idx} is out of bounds for self.indexed_ids (len: {len(self.indexed_ids)}). Skipping."
                )

        if not valid_candidate_data:
            logger.info(f"No valid candidates found from FAISS/ID mapping for query '{query}'.")
            return {
                "retrieved_documents": [],
                "generated_response": f"No recommendations found for '{query}' (no FAISS results or ID mapping issue)."
            }

        # Create a DataFrame from valid FAISS candidates (contains 'original_id' and 'faiss_score')
        faiss_candidates_df = pd.DataFrame(valid_candidate_data)
        
        # Fetch full candidate details from self.df by merging
        # This uses the 'original_id' (which are actual item IDs) to robustly fetch data
        # and preserves the order from FAISS retrieval.
        candidates = pd.merge(
            faiss_candidates_df,  # Left DataFrame (dictates order and includes faiss_score)
            self.df,              # Right DataFrame (provides full item details)
            left_on="original_id",# Key in faiss_candidates_df
            right_on=self.id_col, # Key in self.df
            how="inner"           # Ensures only items present in both are kept
        )
        
        # If 'original_id' column is different from self.id_col and still exists, drop it as it's redundant
        if "original_id" in candidates.columns and "original_id" != self.id_col:
            candidates = candidates.drop(columns=["original_id"])
        
        # Ensure the ID column is of a consistent type if needed, though it should match indexed_ids type
        # candidates[self.id_col] = candidates[self.id_col].astype(str) # Example if IDs need to be strings

        # Filter out exact matches with query (case-insensitive, strip spaces)
        candidates = candidates[
            (candidates[self.headline_col].str.strip().str.lower() != query.strip().lower()) &
            (candidates[self.syn].str.strip().str.lower() != query.strip().lower())
        ]
        if candidates.empty:
            logger.info(f"No candidates left after filtering exact query matches for query '{query}'.")
            return {"retrieved_documents": [], "generated_response": f"No distinct recommendations found for '{query}'."}

        candidates = candidates.drop_duplicates(subset=[self.syn]).copy() # Use .copy() after selection/drop_duplicates
        if candidates.empty:
            logger.info(f"No candidates left after dropping duplicates for query '{query}'.")
            return {"retrieved_documents": [], "generated_response": f"No unique recommendations found for '{query}'."}

        # Rerank using cross-encoder
        #rerank_pairs = [(query, str(row[self.headline_col])) for _, row in candidates.iterrows()] # Create pairs of (query, candidate_headline) for reranking

        rerank_pairs = [(query, str(row[self.syn])) for _, row in candidates.iterrows()]
        if rerank_pairs: # Check if there are any candidate pairs to rerank
            rerank_scores = self.reranker.predict(rerank_pairs, show_progress_bar=False) # Predict reranking scores
            logger.info(f"Raw rerank scores for query '{query}': {rerank_scores.tolist()}") # Log raw scores
            candidates["rerank_score"] = rerank_scores # Add rerank scores as a new column
            candidates = candidates.sort_values("rerank_score", ascending=False) # Sort candidates by rerank score in descending order
            
            logger.debug(f"Top candidates before thresholding (query='{query}', threshold={similarity_threshold}):")
            for _, row in candidates.head().iterrows(): # Log top few candidates before filtering
                logger.debug(f"  ID: {row[self.id_col]}, Synopsis: {str(row[self.syn])[:50]}..., Rerank Score: {row['rerank_score']:.4f}")

            #candidates = candidates[candidates["rerank_score"] >= similarity_threshold]

            candidates = candidates[candidates["rerank_score"] >= similarity_threshold]
            logger.info(f"Number of candidates after applying similarity_threshold ({similarity_threshold}): {len(candidates)}")
        else: # If no pairs to rerank (e.g., all candidates were filtered out)
            logger.info(f"No candidate pairs to rerank for query '{query}'.")
            candidates["rerank_score"] = 0.0 # Assign a default rerank score of 0.0

        # Select top-k
        top_candidates = candidates.head(k) # Select the top-k candidates after reranking

        # Prepare output
        retrieved_documents = [] # Initialize an empty list to store formatted retrieved documents
        for _, row in top_candidates.iterrows(): # Iterate through the top-k candidate rows
            taxonomy_data = row[self.taxonomy_col]
            taxonomy_names = []
            if isinstance(taxonomy_data, list):
                for term_obj in taxonomy_data:
                    if isinstance(term_obj, dict) and term_obj.get("name"): # Check key exists and has a value
                        taxonomy_names.append(str(term_obj["name"]))

            retrieved_documents.append({ # Create a dictionary for each retrieved document
                "id": row[self.id_col], # Document ID
                "hl": str(row[self.headline_col]), # Document headline
                "synopsis": row[self.syn], # Document primary content (synopsis)
                "keywords": row[self.key], # Document secondary content (keywords)
                "type": row.get(self.topic_col, None), # Document topic (or None if not available)
                "taxonomy": taxonomy_names, # List of taxonomy names
                "score": row["rerank_score"],  # Rerank score of the document
                "seolocation": row.get(self.seolocation_col, None),
                "dl": row.get(self.deeplink_col, None),
                "lu": row.get(self.last_updated_col, None),
                "imageid": row.get(self.image_id_col, None),
                "imgratio": row.get(self.image_ratio_col, None),
                "imgsize": row.get(self.image_size_col, None)
            })

        # Optionally, generate a response using the generator model (not implemented here)
        generated_response = f"Top {len(retrieved_documents)} recommendations for '{query}'." # Create a simple generated response string

        return { # Return the final result as a dictionary
            "retrieved_documents": retrieved_documents, # List of retrieved documents
            "generated_response": generated_response # Generated textual response
        }

    def _format_retrieved_documents(self, documents_df: pd.DataFrame) -> List[Dict]:
        """Helper function to format DataFrame rows into a list of document dictionaries."""
        retrieved_documents = []
        for _, row in documents_df.iterrows():
            taxonomy_data = row[self.taxonomy_col]
            taxonomy_names = []
            if isinstance(taxonomy_data, list):
                for term_obj in taxonomy_data:
                    if isinstance(term_obj, dict) and term_obj.get("name"):
                        taxonomy_names.append(str(term_obj["name"]))

            retrieved_documents.append({
                "id": row[self.id_col],
                "hl": str(row[self.headline_col]),
                "synopsis": str(row.get(self.syn, "")), # Ensure synopsis is string
                "keywords": str(row.get(self.key, "")), # Ensure keywords is string
                "type": row.get(self.topic_col, None),
                "taxonomy": taxonomy_names,
                "score": row.get("rerank_score", 0.0), # Use .get for safety if rerank_score might be missing
                "seolocation": row.get(self.seolocation_col, None),
                "dl": row.get(self.deeplink_col, None),
                "lu": row.get(self.last_updated_col, None),
                "imageid": row.get(self.image_id_col, None),
                "imgratio": row.get(self.image_ratio_col, None),
                "imgsize": row.get(self.image_size_col, None)
            })
        return retrieved_documents

    def get_recommendations_by_id(
        self,
        msid: str,
        k: int = DEFAULT_K,
        similarity_threshold: float = SIMILARITY_THRESHOLD
    ) -> Dict:
        """
        Get recommendations based on a given item ID (msid).
        Finds items similar to the item identified by msid.
        Returns a dictionary with retrieved documents.
        """
        # Check prerequisites
        if self.df is None or self.df.empty:
            logger.error("Recommender data not available for get_recommendations_by_id.")
            raise HTTPException(status_code=503, detail="Recommender data not available")

        # Generator model is not strictly needed for this item-to-item recommendation path
        if not all([self.index, self.embed_model, self.reranker]):
            missing = [
                name for name, component in [
                    ("FAISS index", self.index),
                    ("Embedding model", self.embed_model),
                    ("Reranker model", self.reranker),
                ] if component is None
            ]
            logger.error(f"Recommender not fully initialized for get_recommendations_by_id. Missing: {', '.join(missing)}")
            raise HTTPException(
                status_code=503,
                detail=f"Recommender not fully initialized. Missing: {', '.join(missing)}"
            )

        logger.info(f"Processing recommendation request for item ID: '{msid}', k={k}")

        # Find the source item in the DataFrame
        source_item_row = self.df[self.df[self.id_col] == msid]
        if source_item_row.empty:
            logger.warning(f"Item with ID '{msid}' not found in DataFrame.")
            # Ensure the response structure matches what RecommendationResponse expects
            # even on failure, to avoid FastAPI server errors during response model validation.
            return {"retrieved_documents": [], "generated_response": f"Item with ID '{msid}' not found."}
            # Or, if you prefer to let the route handler catch this as a 404:
            # raise HTTPException(status_code=404, detail=f"Item with ID '{msid}' not found")

        source_item = source_item_row.iloc[0]
        source_item_content_for_embedding = source_item[self.processed_content_col]
        source_item_content_for_reranking = str(source_item[self.syn]) # Using synopsis for reranker query

        # Generate embedding for the source item's content
        item_embedding, _ = self._generate_embeddings(
            pd.DataFrame({
                self.processed_content_col: [source_item_content_for_embedding],
                self.id_col: [msid] # Dummy ID, as _generate_embeddings expects it
            })
        )

        if item_embedding.size == 0:
            logger.warning(f"Embedding generation failed for item ID '{msid}'.")
            return {"retrieved_documents": [], "generated_response": "No recommendations found (source item embedding failed)"}

        # Retrieve candidates from FAISS: fetch k+1 (or more with multiplier) to account for filtering source item
        num_candidates_to_fetch = max((k + 1) * CANDIDATE_MULTIPLIER, k + 1)
        try:
            D, I = self.index.search(item_embedding.astype(np.float32), num_candidates_to_fetch)
        except Exception as e:
            logger.error(f"Error during FAISS search for item ID '{msid}': {e}", exc_info=True)
            return {"retrieved_documents": [], "generated_response": "No recommendations found (FAISS search failed)"}

        candidate_faiss_indices = I[0]
        # candidate_scores = D[0] # FAISS scores, can be used if needed

        valid_mask = candidate_faiss_indices != -1
        candidate_faiss_indices = candidate_faiss_indices[valid_mask]

        if len(candidate_faiss_indices) == 0:
            logger.info(f"No candidates found from FAISS for item ID '{msid}'.")
            return {"retrieved_documents": [], "generated_response": f"No similar items found for ID '{msid}'."}

        candidate_original_ids = [self.indexed_ids[i] for i in candidate_faiss_indices if i < len(self.indexed_ids)]
        
        # Fetch candidates from the main DataFrame, excluding the source item itself
        candidates_df = self.df[self.df[self.id_col].isin(candidate_original_ids) & (self.df[self.id_col] != msid)].copy()
        candidates_df = candidates_df.drop_duplicates(subset=[self.syn]) # Avoid duplicate content

        if candidates_df.empty:
            logger.info(f"No candidates left after filtering for item ID '{msid}'.")
            return {"retrieved_documents": [], "generated_response": f"No other similar items found for ID '{msid}'."}

        # Rerank using cross-encoder
        rerank_pairs = [(source_item_content_for_reranking, str(row[self.syn])) for _, row in candidates_df.iterrows()]
        if rerank_pairs:
            rerank_scores = self.reranker.predict(rerank_pairs, show_progress_bar=False)
            candidates_df["rerank_score"] = rerank_scores
            candidates_df = candidates_df.sort_values("rerank_score", ascending=False)
            candidates_df = candidates_df[candidates_df["rerank_score"] >= similarity_threshold]
        else:
            candidates_df["rerank_score"] = 0.0 # Default score if no pairs or reranking skipped

        top_candidates = candidates_df.head(k)

        retrieved_documents = self._format_retrieved_documents(top_candidates)

        generated_response = f"Top {len(retrieved_documents)} recommendations similar to item ID '{msid}'."
        if not retrieved_documents:
            generated_response = f"No recommendations found similar to item ID '{msid}'."
        return {"retrieved_documents": retrieved_documents, "generated_response": generated_response}

    def prepare_reranker_training_data_from_new_feedback_format(self, user_id: str, training_event_details: Dict) -> List[Dict]:
        """
        Prepares training data for the reranker model from a user's feedback document.
        Generates both positive and negative training samples using semantic similarity
        based negative sampling for better model discrimination.
        """
        if self.df is None or self.df.empty:
            logger.warning(f"User {user_id}: DataFrame not loaded. Cannot prepare training data.")
            return []

        training_samples = []

        query_msid = training_event_details.get("query_msid")
        positive_msids_list = training_event_details.get("positive_msids")

        if not query_msid or not isinstance(query_msid, str):
            logger.warning(f"User {user_id}: 'query_msid' missing or invalid in training_event_details. Details: {str(training_event_details)[:200]}")
            return []

        if not isinstance(positive_msids_list, list) or not positive_msids_list:
            logger.warning(f"User {user_id}: 'positive_msids' field missing, not a list, or empty in training_event_details for query_msid '{query_msid}'. Details: {str(training_event_details)[:200]}")
            return []

        source_item_row = self.df[self.df[self.id_col] == query_msid]
        if source_item_row.empty:
            logger.warning(f"User {user_id}: Query item (msid: {query_msid}) for training data not found in DataFrame.")
            return []
        
        query_text = str(source_item_row.iloc[0].get(self.syn, "")).strip()
        if not query_text:
            logger.warning(f"User {user_id}: Query text (synopsis) is empty for msid {query_msid}. Skipping training sample generation for this event.")
            return []

        # Process positive samples
        positive_samples = []
        for positive_msid in positive_msids_list:
            if not isinstance(positive_msid, str) or not positive_msid.strip():
                continue

            clicked_item_row = self.df[self.df[self.id_col] == positive_msid]
            if not clicked_item_row.empty:
                candidate_text_positive = str(clicked_item_row.iloc[0].get(self.syn, "")).strip()
                if candidate_text_positive:
                    positive_samples.append({
                        "query_text": query_text,
                        "candidate_text": candidate_text_positive,
                        "label": 1.0,
                        "msid": positive_msid
                    })

        if not positive_samples:
            logger.warning(f"User {user_id}: No valid positive samples found for query_msid {query_msid}")
            return []

        # Generate negative samples through semantic similarity based sampling
        num_negatives_per_positive = 5  # Increased for better training
        all_msids = set(self.df[self.id_col].tolist())
        positive_msids_set = set(p["msid"] for p in positive_samples)
        
        # Get query embedding for semantic search
        query_embedding, _ = self._generate_embeddings(
            pd.DataFrame({
                self.processed_content_col: [query_text],
                self.id_col: ["temp_query"]
            })
        )

        if query_embedding.size > 0:
            # Get semantically similar candidates (harder negatives)
            D, I = self.index.search(query_embedding.astype(np.float32), k=50)  # Get more candidates
            candidate_indices = I[0]
            candidate_msids = [
                self.indexed_ids[idx] for idx in candidate_indices 
                if idx != -1 and idx < len(self.indexed_ids)
            ]
            
            # Filter out positives and query item
            negative_candidates = [
                msid for msid in candidate_msids 
                if msid not in positive_msids_set and msid != query_msid
            ]

            import random
            for pos_sample in positive_samples:
                # Mix of hard and random negatives
                num_hard_negatives = min(3, len(negative_candidates))
                num_random_negatives = num_negatives_per_positive - num_hard_negatives

                # Select hard negatives (semantically similar)
                hard_negatives = negative_candidates[:num_hard_negatives]
                
                # Select random negatives from remaining pool
                remaining_candidates = list(all_msids - positive_msids_set - set(hard_negatives) - {query_msid})
                random_negatives = random.sample(remaining_candidates, num_random_negatives)
                
                # Combine hard and random negatives
                selected_negatives = hard_negatives + random_negatives

                for neg_msid in selected_negatives:
                    neg_item_row = self.df[self.df[self.id_col] == neg_msid]
                    if not neg_item_row.empty:
                        candidate_text_negative = str(neg_item_row.iloc[0].get(self.syn, "")).strip()
                        if candidate_text_negative:
                            training_samples.append({
                                "query_text": pos_sample["query_text"],
                                "candidate_text": candidate_text_negative,
                                "label": 0.0,
                                "msid": neg_msid
                            })

        # Add positive samples to final training data
        training_samples.extend(positive_samples)

        if training_samples:
            logger.info(f"User {user_id}: Prepared {len(training_samples)} training samples ({len(positive_samples)} positive, {len(training_samples)-len(positive_samples)} negative) from the interaction event (query_msid: {query_msid}).")
        
        return training_samples

    def _log_training_data_for_refinement(self, training_data: List[Dict]):
        """Save training data for future fine-tuning."""
        if not training_data:
            logger.info("No training data provided for saving.")
            return
        self.model_trainer.prepare_training_data(training_data)

    async def check_and_trigger_fine_tuning(self):
        """
        Check if fine-tuning should be triggered based on conditions and start if needed.
        Returns True if fine-tuning was triggered, False otherwise.
        """
        try:
            if not self.model_trainer.check_training_conditions():
                return False

            # Start fine-tuning process in background
            asyncio.create_task(self._run_fine_tuning_process())
            return True

        except Exception as e:
            logger.error(f"Error in check_and_trigger_fine_tuning: {e}")
            return False

    async def _run_fine_tuning_process(self):
        """Run the fine-tuning process in the background."""
        try:
            # Start fine-tuning
            logger.info("Starting fine-tuning process")
            
            # Run fine-tuning using the model trainer
            new_version = await asyncio.to_thread(self.model_trainer.fine_tune)
            
            if new_version:
                # Load and validate the fine-tuned model before deploying
                model_path = str(self.model_trainer.get_model_path(new_version))
                if os.path.exists(model_path):
                    # Load the new model for validation
                    # This is a synchronous operation, potentially okay if quick,
                    # but could be moved to to_thread if model loading is slow.
                    new_model = await asyncio.to_thread(CrossEncoder, model_path, device=self.device)
                    
                    # Validate model performance
                    validation_passed = await self._validate_fine_tuned_model(new_model)
                    
                    if validation_passed:
                        logger.info(f"Fine-tuned model validation passed. Deploying version: {new_version}")
                        self.fine_tuned_reranker = new_model # new_model is already on self.device
                        self.reranker = self.fine_tuned_reranker # Switch active reranker
                        
                        # Update embeddings and index if needed
                        # update_embeddings_and_index is synchronous and can be long
                        await asyncio.to_thread(self.update_embeddings_and_index)
                        
                        logger.info(f"Fine-tuning process completed successfully. New version: {new_version}")
                    else:
                        logger.warning("Fine-tuned model validation failed. Keeping current model.")
                        self.reranker = self.base_reranker
                else:
                    logger.error(f"Fine-tuned model file not found at {model_path}")
                    self.reranker = self.base_reranker
            else:
                logger.error("Fine-tuning process failed")
                self.reranker = self.base_reranker
            
        except Exception as e:
            logger.error(f"Error during fine-tuning process: {e}")
            self.reranker = self.base_reranker

    async def _validate_fine_tuned_model(self, new_model: CrossEncoder) -> bool:
        """
        Validate the fine-tuned model's performance before deployment.
        Uses multiple metrics for a more comprehensive evaluation.
        Returns True if validation passes, False otherwise.
        """
        try:
            # Get a sample of validation data
            validation_data = self.model_trainer.get_validation_data()
            if not validation_data:
                logger.warning("No validation data available")
                return False

            # Initialize metrics
            base_metrics = {
                "true_positives": 0,
                "false_positives": 0,
                "true_negatives": 0,
                "false_negatives": 0,
                "scores": []
            }
            new_metrics = {
                "true_positives": 0,
                "false_positives": 0,
                "true_negatives": 0,
                "false_negatives": 0,
                "scores": []
            }
            
            # Evaluate both models on validation data
            for sample in validation_data:
                query = sample["query_text"]
                candidate = sample["candidate_text"]
                label = float(sample["label"])
                
                # Get predictions from both models
                # predict is synchronous and CPU/GPU bound
                base_pred_array = await asyncio.to_thread(self.base_reranker.predict, [(query, candidate)])
                base_pred = base_pred_array[0]
                new_pred_array = await asyncio.to_thread(new_model.predict, [(query, candidate)])
                new_pred = new_pred_array[0]
                
                # Update metrics for base model
                base_metrics["scores"].append(base_pred)
                if label == 1.0:
                    if base_pred >= 0.5:
                        base_metrics["true_positives"] += 1
                    else:
                        base_metrics["false_negatives"] += 1
                else:
                    if base_pred >= 0.5:
                        base_metrics["false_positives"] += 1
                    else:
                        base_metrics["true_negatives"] += 1

                # Update metrics for new model
                new_metrics["scores"].append(new_pred)
                if label == 1.0:
                    if new_pred >= 0.5:
                        new_metrics["true_positives"] += 1
                    else:
                        new_metrics["false_negatives"] += 1
                else:
                    if new_pred >= 0.5:
                        new_metrics["false_positives"] += 1
                    else:
                        new_metrics["true_negatives"] += 1
            
            if not base_metrics["scores"] or not new_metrics["scores"]:
                logger.warning("No predictions generated during validation")
                return False
            
            # Calculate metrics for both models
            def calculate_model_metrics(metrics):
                tp = metrics["true_positives"]
                fp = metrics["false_positives"]
                tn = metrics["true_negatives"]
                fn = metrics["false_negatives"]
                
                # Prevent division by zero
                precision = tp / (tp + fp) if (tp + fp) > 0 else 0
                recall = tp / (tp + fn) if (tp + fn) > 0 else 0
                f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
                accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0
                
                return {
                    "precision": precision,
                    "recall": recall,
                    "f1": f1,
                    "accuracy": accuracy
                }
            
            base_performance = calculate_model_metrics(base_metrics)
            new_performance = calculate_model_metrics(new_metrics)
            
            # Log detailed performance comparison
            logger.info("Model validation results:")
            logger.info(f"Base model metrics: {base_performance}")
            logger.info(f"Fine-tuned model metrics: {new_performance}")
            
            # More lenient validation criteria
            min_relative_improvement = 0.001  # 0.1% minimum relative improvement
            min_absolute_f1 = 0.3  # Lower minimum F1 score required
            
            # Check if new model shows improvement in any metric
            f1_improvement = new_performance["f1"] - base_performance["f1"]
            precision_improvement = new_performance["precision"] - base_performance["precision"]
            recall_improvement = new_performance["recall"] - base_performance["recall"]
            accuracy_improvement = new_performance["accuracy"] - base_performance["accuracy"]
            
            # Calculate relative improvements
            relative_f1_imp = f1_improvement / base_performance["f1"] if base_performance["f1"] > 0 else float('inf')
            relative_prec_imp = precision_improvement / base_performance["precision"] if base_performance["precision"] > 0 else float('inf')
            relative_recall_imp = recall_improvement / base_performance["recall"] if base_performance["recall"] > 0 else float('inf')
            relative_acc_imp = accuracy_improvement / base_performance["accuracy"] if base_performance["accuracy"] > 0 else float('inf')
            
            # Model passes validation if it shows improvement in any metric and meets minimum F1
            if new_performance["f1"] >= min_absolute_f1 and (
                relative_f1_imp >= min_relative_improvement or
                relative_prec_imp >= min_relative_improvement or
                relative_recall_imp >= min_relative_improvement or
                relative_acc_imp >= min_relative_improvement
            ):
                logger.info(
                    f"Fine-tuned model shows improvement. Metrics changes:\n"
                    f"F1: {f1_improvement:.4f} ({relative_f1_imp:.2%})\n"
                    f"Precision: {precision_improvement:.4f} ({relative_prec_imp:.2%})\n"
                    f"Recall: {recall_improvement:.4f} ({relative_recall_imp:.2%})\n"
                    f"Accuracy: {accuracy_improvement:.4f} ({relative_acc_imp:.2%})"
                )
                return True
            else:
                logger.warning(
                    f"Fine-tuned model does not meet improvement criteria.\n"
                    f"F1 change: {f1_improvement:.4f} ({relative_f1_imp:.2%})\n"
                    f"Precision change: {precision_improvement:.4f} ({relative_prec_imp:.2%})\n"
                    f"Recall change: {recall_improvement:.4f} ({relative_recall_imp:.2%})\n"
                    f"Accuracy change: {accuracy_improvement:.4f} ({relative_acc_imp:.2%})"
                )
                return False
                
        except Exception as e:
            logger.error(f"Error during model validation: {e}")
            return False

    def reload_fine_tuned_model(self):
        """
        Reload the fine-tuned reranker model from disk and switch to it if available.
        """
        self._load_fine_tuned_model()
        logger.info("Reloaded fine-tuned reranker model (if available).")

    def get_recommendations_summary(self, msid: str, k: int = DEFAULT_K, summary: bool = True, smart_tip: bool = True):
        """
        Synchronous wrapper for recommendations with summary and smart tip. This is a simplified version for Gradio or direct calls.
        """
        # Get base recommendations by msid
        recommendations_data = self.get_recommendations_by_id(msid, k)
        if not recommendations_data or "retrieved_documents" not in recommendations_data:
            return {
                "generated_response": f"No recommendations found for item ID '{msid}'.",
                "retrieved_documents": []
            }
        retrieved_docs = recommendations_data.get("retrieved_documents", [])
        if not retrieved_docs:
            return recommendations_data

        # Fetch article details from MongoDB
        from src.database.mongodb import mongodb
        doc_ids_to_fetch = [doc["id"] for doc in retrieved_docs if doc.get("id")]
        articles_details_map = {}
        if doc_ids_to_fetch and (summary or smart_tip):
            projection = {"_id": 0, "id": 1}
            if summary:
                projection.update({"story": 1, "syn": 1})
            if smart_tip:
                projection.update({"seolocation": 1, "tn": 1, "hl": 1})
            fetched_articles_list = list(mongodb.news_collection.find({"id": {"$in": doc_ids_to_fetch}}, projection))
            for article in fetched_articles_list:
                if article.get("id"):
                    if summary and not article.get("story") and article.get("syn"):
                        article["story"] = article["syn"]
                    articles_details_map[article["id"]] = article

        # Helper functions for summary and smart tip
        def _generate_summary(article_data):
            try:
                from src.test_summarize import get_summary_points
                story = article_data.get("story", "")
                if not story:
                    return None
                summary_points = get_summary_points(story)
                if isinstance(summary_points, list):
                    return " ".join(summary_points) if summary_points else None
                elif isinstance(summary_points, str):
                    return summary_points if summary_points.strip() else None
                return None
            except Exception:
                return None

        def _generate_smart_tip(article_data):
            seolocation = article_data.get("seolocation")
            title = article_data.get("tn")
            headline = article_data.get("hl")
            if not all([seolocation, title, headline]):
                return None
            # Find related articles
            topic = title.lower() if title else ""
            query = {}
            if topic:
                query["$or"] = [
                    {"tn": {"$regex": topic, "$options": "i"}},
                    {"hl": {"$regex": topic, "$options": "i"}}
                ]
            if article_data.get("id"):
                query["id"] = {"$ne": article_data["id"]}
            related_articles = list(mongodb.news_collection.find(query, {"hl": 1, "seolocation": 1, "tn": 1, "_id": 0}).limit(3))
            suggestions = []
            for rel_article in related_articles:
                if rel_article.get("hl") and rel_article.get("seolocation"):
                    suggestions.append({
                        "label": rel_article.get("hl", ""),
                        "url": rel_article.get("seolocation", "")
                    })
            if not suggestions:
                suggestions = [{
                    "label": headline,
                    "url": seolocation
                }]
            return {
                "title": f"\U0001F50D Smart Tip: {title}",
                "description": "You might also be interested in:",
                "suggestions": suggestions
            }

        # Process each document
        processed_documents = []
        for doc in retrieved_docs:
            article_data = articles_details_map.get(doc.get("id"))
            if summary and article_data:
                doc["summary"] = _generate_summary(article_data)
            if smart_tip and article_data:
                doc["smart_tip"] = _generate_smart_tip(article_data)
            processed_documents.append(doc)
        recommendations_data["retrieved_documents"] = processed_documents
        return recommendations_data

    def get_recommendations_user_feedback(self, user_id: str, msid: str, clicked_msid: str, k: int = DEFAULT_K):
        """
        Synchronous wrapper for user feedback recommendations. Returns recommendations based on clicked items.
        """
        # clicked_msid can be a comma-separated string
        actual_clicked_msids = [s.strip() for s in clicked_msid.split(',') if s.strip()]
        combined_recommendations_docs = []
        seen_recommendation_ids = set()
        for c_msid in actual_clicked_msids:
            result = self.get_recommendations_by_id(c_msid, k)
            for doc in result.get("retrieved_documents", []):
                if doc['id'] not in seen_recommendation_ids:
                    combined_recommendations_docs.append(doc)
                    seen_recommendation_ids.add(doc['id'])
        if not combined_recommendations_docs:
            recommendations_result = {"retrieved_documents": [], "generated_response": "No recommendations found for the clicked items."}
        else:
            combined_recommendations_docs.sort(key=lambda x: x.get('score', 0.0), reverse=True)
            final_retrieved_documents = combined_recommendations_docs[:k]
            recommendations_result = {
                "retrieved_documents": final_retrieved_documents,
                "generated_response": f"Top {len(final_retrieved_documents)} recommendations based on your recent clicks on: {', '.join(actual_clicked_msids)}."
            }
        return recommendations_result

# Instantiate the recommender for use by other modules
recommender = RecoRecommender()