File size: 12,133 Bytes
ac751b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
334beca
ac751b2
 
 
 
 
 
334beca
 
ac751b2
 
334beca
ac751b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b0ff04f
ac751b2
b0ff04f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac751b2
b0ff04f
ac751b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
334beca
 
ac751b2
 
 
 
 
 
 
 
 
334beca
 
 
 
 
 
 
 
 
 
 
 
ac751b2
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import os
import sys
import json
import time
import logging
import pandas as pd
from pathlib import Path
from sqlalchemy import text

# Ensure workspace root is in path
sys.path.append(str(Path(__file__).resolve().parents[2]))

from Data.database.sql_connector import engine
from src.similarity_model.preprocessing import preprocess_dataset
from src.similarity_model.embedding_engine import train_embedding_engine

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
logger = logging.getLogger("SyncWorker")

# Settings
BATCH_SIZE = 10
MAX_RETRIES = 3
POLL_INTERVAL = 5  # seconds
REBUILD_THRESHOLD = 5  # Rebuild FAISS index after 5 database changes
REBUILD_COOLDOWN = 60  # Or after 60 seconds if changes exist but threshold not met

class RebuildManager:
    def __init__(self, rebuild_threshold=REBUILD_THRESHOLD, rebuild_cooldown=REBUILD_COOLDOWN):
        self.rebuild_threshold = rebuild_threshold
        self.rebuild_cooldown = rebuild_cooldown
        self.accumulated_changes = 0
        self.last_rebuild_time = time.time()
        self.pending_rebuild = False

    def record_change(self):
        self.accumulated_changes += 1
        self.pending_rebuild = True

    def check_and_rebuild(self, force=False):
        if not self.pending_rebuild:
            return False

        now = time.time()
        time_elapsed = now - self.last_rebuild_time
        
        # Trigger rebuild if we hit the change threshold, OR if the cooldown has passed, OR if forced
        if force or self.accumulated_changes >= self.rebuild_threshold or time_elapsed >= self.rebuild_cooldown:
            logger.info(
                f"Triggering FAISS index rebuild. "
                f"Accumulated changes: {self.accumulated_changes}, time elapsed: {time_elapsed:.1f}s, force: {force}"
            )
            try:
                train_embedding_engine()
                self.accumulated_changes = 0
                self.last_rebuild_time = now
                self.pending_rebuild = False
                logger.info("FAISS index rebuild completed successfully.")
                return True
            except Exception as e:
                logger.error(f"Failed to rebuild FAISS index: {e}", exc_info=True)
        return False

def process_single_item(engine, item) -> bool:
    queue_id = item["QueueId"]
    project_id = item["ProjectId"]
    operation_type = item["OperationType"]
    
    changed = False
    try:
        # Start transaction for project processing
        with engine.begin() as conn:
            # Re-verify queue item is still unprocessed and lock it
            row = conn.execute(text("""
                SELECT QueueId FROM SyncQueue WITH (UPDLOCK, HOLDLOCK)
                WHERE QueueId = :queue_id AND Processed = 0
            """), {"queue_id": queue_id}).fetchone()
            if not row:
                logger.info(f"Queue item {queue_id} already processed by another worker. Skipping.")
                return False
            
            if operation_type == 'UPSERT':
                # Fetch project from Projects table
                project_df = pd.read_sql(
                    text("SELECT * FROM Projects WHERE Id = :project_id"),
                    conn,
                    params={"project_id": project_id}
                )
                
                eligible = False
                if not project_df.empty:
                    # Support case-insensitive key retrieval
                    status = project_df.iloc[0].get("Status") or project_df.iloc[0].get("status")
                    if status in ["Completed", "UnderReview", "In_Progress"]:
                        eligible = True
                
                if eligible:
                    logger.info(f"Preprocessing eligible project {project_id}...")
                    processed_df = preprocess_dataset(project_df)
                    
                    if not processed_df.empty:
                        # Standardize columns to match preprocess table schema
                        cols_to_keep = [
                            "id", "submittedat", "project_title", "studentnames", "year",
                            "abstract", "description", "problemstatement", "proposedsolution",
                            "objectives", "full_content", "clean_text", "word_count", "features"
                        ]
                        for col in cols_to_keep:
                            if col not in processed_df.columns:
                                processed_df[col] = ""
                        processed_df = processed_df[cols_to_keep]
                        processed_df = processed_df.rename(
                            columns={
                                "submittedat": "submitted_at",
                                "studentnames": "student_names",
                                "problemstatement": "problem_statement",
                                "proposedsolution": "proposed_solution"
                            }
                        )
                        processed_df["features"] = processed_df["features"].apply(json.dumps)
                        
                        # Upsert behavior: delete existing first, then append
                        conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id})
                        processed_df.to_sql("preprocess", conn, if_exists="append", index=False)
                        logger.info(f"Successfully preprocessed and inserted Project {project_id} into 'preprocess'.")
                        changed = True
                    else:
                        logger.info(f"Project {project_id} filtered out by preprocessing. Removing from 'preprocess' table.")
                        conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id})
                        changed = True
                else:
                    logger.info(f"Project {project_id} is ineligible or deleted. Removing from 'preprocess' table.")
                    conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id})
                    changed = True
            
            elif operation_type == 'DELETE':
                logger.info(f"Removing Project {project_id} from 'preprocess' table...")
                conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id})
                changed = True
            
            # Mark queue item as processed successfully
            conn.execute(text("""
                UPDATE SyncQueue
                SET Processed = 1, ProcessedAt = SYSUTCDATETIME(), ErrorMessage = NULL
                WHERE QueueId = :queue_id
            """), {"queue_id": queue_id})
            
        return changed

    except Exception as e:
        logger.error(f"Error processing queue item {queue_id} (Project {project_id}): {e}", exc_info=True)
        # Log failure on SyncQueue in a separate transaction to avoid rollback
        try:
            with engine.begin() as error_conn:
                error_conn.execute(text("""
                    UPDATE SyncQueue
                    SET RetryCount = RetryCount + 1,
                        ErrorMessage = :error_msg,
                        ProcessedAt = CASE WHEN RetryCount + 1 >= :max_retries THEN SYSUTCDATETIME() ELSE NULL END,
                        Processed = CASE WHEN RetryCount + 1 >= :max_retries THEN 1 ELSE 0 END
                    WHERE QueueId = :queue_id
                """), {
                    "queue_id": queue_id,
                    "error_msg": str(e)[:4000],
                    "max_retries": MAX_RETRIES
                })
        except Exception as queue_err:
            logger.error(f"Failed to write error status for queue item {queue_id}: {queue_err}")
        return False

def run_worker():
    logger.info("Initializing Sync Worker service...")
    
    # Verify DB connection and auto-initialize schema/triggers if missing
    try:
        with engine.begin() as conn:
            # 1. Check and create SyncQueue table
            row = conn.execute(text("SELECT OBJECT_ID('SyncQueue', 'U')")).fetchone()
            if row[0] is None:
                logger.info("SyncQueue table does not exist. Initializing from DDL script...")
                ddl_path = Path(__file__).resolve().parents[2] / "Data" / "database" / "create_sync_queue.sql"
                if ddl_path.exists():
                    sql = ddl_path.read_text(encoding="utf-8")
                    conn.execute(text(sql))
                    logger.info("SyncQueue table and indexes created successfully.")
                else:
                    logger.error("DDL script 'create_sync_queue.sql' not found!")
            
            # 2. Check and deploy SQL triggers
            row_trg = conn.execute(text("SELECT OBJECT_ID('trg_Projects_Insert', 'TR')")).fetchone()
            if row_trg[0] is None:
                logger.info("SQL Triggers do not exist on 'Projects'. Deploying triggers...")
                triggers_path = Path(__file__).resolve().parents[2] / "Data" / "database" / "create_triggers.sql"
                if triggers_path.exists():
                    sql_content = triggers_path.read_text(encoding="utf-8")
                    import re
                    # Split by SQL Server GO batch separator and execute statements individually
                    statements = re.split(r'(?i)\bGO\b', sql_content)
                    for stmt in statements:
                        stmt_clean = stmt.strip()
                        if stmt_clean:
                            conn.execute(text(stmt_clean))
                    logger.info("SQL Triggers deployed successfully.")
                else:
                    logger.error("Trigger script 'create_triggers.sql' not found!")
                    
        logger.info("Database connection verified and schema initialized successfully.")
    except Exception as exc:
        logger.critical(f"Database connection or schema initialization failed: {exc}")
        sys.exit(1)
        
    rebuild_manager = RebuildManager()
    
    logger.info("Sync Worker started successfully and polling...")
    while True:
        try:
            # Fetch batch of unprocessed items
            with engine.connect() as conn:
                result = conn.execute(text("""
                    SELECT TOP (:batch_size) QueueId, ProjectId, OperationType, RetryCount
                    FROM SyncQueue WITH (UPDLOCK, READPAST)
                    WHERE Processed = 0 AND RetryCount < :max_retries
                    ORDER BY CreatedAt ASC
                """), {"batch_size": BATCH_SIZE, "max_retries": MAX_RETRIES})
                batch = result.mappings().all()
            
            if not batch:
                # Idle period: force rebuild of any pending changes since queue is empty
                rebuild_manager.check_and_rebuild(force=True)
                time.sleep(POLL_INTERVAL)
                continue
            
            logger.info(f"Fetched {len(batch)} items from SyncQueue.")
            for item in batch:
                changed = process_single_item(engine, item)
                if changed:
                    rebuild_manager.record_change()
            
            # Check if there are any remaining unprocessed items in the queue
            with engine.connect() as conn:
                any_left = conn.execute(text("""
                    SELECT TOP 1 QueueId FROM SyncQueue 
                    WHERE Processed = 0 AND RetryCount < :max_retries
                """), {"max_retries": MAX_RETRIES}).fetchone()
            
            # If no more items are left in the queue, we can rebuild immediately!
            if not any_left:
                rebuild_manager.check_and_rebuild(force=True)
            else:
                rebuild_manager.check_and_rebuild()
            
        except Exception as e:
            logger.error(f"Error in Sync Worker loop: {e}", exc_info=True)
            time.sleep(POLL_INTERVAL)

if __name__ == "__main__":
    run_worker()