| import os |
| import sys |
| import json |
| import time |
| import logging |
| import pandas as pd |
| from pathlib import Path |
| from sqlalchemy import text |
|
|
| |
| sys.path.append(str(Path(__file__).resolve().parents[2])) |
|
|
| from Data.database.sql_connector import engine |
| from src.similarity_model.preprocessing import preprocess_dataset |
| from src.similarity_model.embedding_engine import train_embedding_engine |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s | %(levelname)s | %(name)s | %(message)s" |
| ) |
| logger = logging.getLogger("SyncWorker") |
|
|
| |
| BATCH_SIZE = 10 |
| MAX_RETRIES = 3 |
| POLL_INTERVAL = 5 |
| REBUILD_THRESHOLD = 5 |
| REBUILD_COOLDOWN = 60 |
|
|
| class RebuildManager: |
| def __init__(self, rebuild_threshold=REBUILD_THRESHOLD, rebuild_cooldown=REBUILD_COOLDOWN): |
| self.rebuild_threshold = rebuild_threshold |
| self.rebuild_cooldown = rebuild_cooldown |
| self.accumulated_changes = 0 |
| self.last_rebuild_time = time.time() |
| self.pending_rebuild = False |
|
|
| def record_change(self): |
| self.accumulated_changes += 1 |
| self.pending_rebuild = True |
|
|
| def check_and_rebuild(self, force=False): |
| if not self.pending_rebuild: |
| return False |
|
|
| now = time.time() |
| time_elapsed = now - self.last_rebuild_time |
| |
| |
| if force or self.accumulated_changes >= self.rebuild_threshold or time_elapsed >= self.rebuild_cooldown: |
| logger.info( |
| f"Triggering FAISS index rebuild. " |
| f"Accumulated changes: {self.accumulated_changes}, time elapsed: {time_elapsed:.1f}s, force: {force}" |
| ) |
| try: |
| train_embedding_engine() |
| self.accumulated_changes = 0 |
| self.last_rebuild_time = now |
| self.pending_rebuild = False |
| logger.info("FAISS index rebuild completed successfully.") |
| return True |
| except Exception as e: |
| logger.error(f"Failed to rebuild FAISS index: {e}", exc_info=True) |
| return False |
|
|
| def process_single_item(engine, item) -> bool: |
| queue_id = item["QueueId"] |
| project_id = item["ProjectId"] |
| operation_type = item["OperationType"] |
| |
| changed = False |
| try: |
| |
| with engine.begin() as conn: |
| |
| row = conn.execute(text(""" |
| SELECT QueueId FROM SyncQueue WITH (UPDLOCK, HOLDLOCK) |
| WHERE QueueId = :queue_id AND Processed = 0 |
| """), {"queue_id": queue_id}).fetchone() |
| if not row: |
| logger.info(f"Queue item {queue_id} already processed by another worker. Skipping.") |
| return False |
| |
| if operation_type == 'UPSERT': |
| |
| project_df = pd.read_sql( |
| text("SELECT * FROM Projects WHERE Id = :project_id"), |
| conn, |
| params={"project_id": project_id} |
| ) |
| |
| eligible = False |
| if not project_df.empty: |
| |
| status = project_df.iloc[0].get("Status") or project_df.iloc[0].get("status") |
| if status in ["Completed", "UnderReview", "In_Progress"]: |
| eligible = True |
| |
| if eligible: |
| logger.info(f"Preprocessing eligible project {project_id}...") |
| processed_df = preprocess_dataset(project_df) |
| |
| if not processed_df.empty: |
| |
| cols_to_keep = [ |
| "id", "submittedat", "project_title", "studentnames", "year", |
| "abstract", "description", "problemstatement", "proposedsolution", |
| "objectives", "full_content", "clean_text", "word_count", "features" |
| ] |
| for col in cols_to_keep: |
| if col not in processed_df.columns: |
| processed_df[col] = "" |
| processed_df = processed_df[cols_to_keep] |
| processed_df = processed_df.rename( |
| columns={ |
| "submittedat": "submitted_at", |
| "studentnames": "student_names", |
| "problemstatement": "problem_statement", |
| "proposedsolution": "proposed_solution" |
| } |
| ) |
| processed_df["features"] = processed_df["features"].apply(json.dumps) |
| |
| |
| conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id}) |
| processed_df.to_sql("preprocess", conn, if_exists="append", index=False) |
| logger.info(f"Successfully preprocessed and inserted Project {project_id} into 'preprocess'.") |
| changed = True |
| else: |
| logger.info(f"Project {project_id} filtered out by preprocessing. Removing from 'preprocess' table.") |
| conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id}) |
| changed = True |
| else: |
| logger.info(f"Project {project_id} is ineligible or deleted. Removing from 'preprocess' table.") |
| conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id}) |
| changed = True |
| |
| elif operation_type == 'DELETE': |
| logger.info(f"Removing Project {project_id} from 'preprocess' table...") |
| conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id}) |
| changed = True |
| |
| |
| conn.execute(text(""" |
| UPDATE SyncQueue |
| SET Processed = 1, ProcessedAt = SYSUTCDATETIME(), ErrorMessage = NULL |
| WHERE QueueId = :queue_id |
| """), {"queue_id": queue_id}) |
| |
| return changed |
|
|
| except Exception as e: |
| logger.error(f"Error processing queue item {queue_id} (Project {project_id}): {e}", exc_info=True) |
| |
| try: |
| with engine.begin() as error_conn: |
| error_conn.execute(text(""" |
| UPDATE SyncQueue |
| SET RetryCount = RetryCount + 1, |
| ErrorMessage = :error_msg, |
| ProcessedAt = CASE WHEN RetryCount + 1 >= :max_retries THEN SYSUTCDATETIME() ELSE NULL END, |
| Processed = CASE WHEN RetryCount + 1 >= :max_retries THEN 1 ELSE 0 END |
| WHERE QueueId = :queue_id |
| """), { |
| "queue_id": queue_id, |
| "error_msg": str(e)[:4000], |
| "max_retries": MAX_RETRIES |
| }) |
| except Exception as queue_err: |
| logger.error(f"Failed to write error status for queue item {queue_id}: {queue_err}") |
| return False |
|
|
| def run_worker(): |
| logger.info("Initializing Sync Worker service...") |
| |
| |
| try: |
| with engine.begin() as conn: |
| |
| row = conn.execute(text("SELECT OBJECT_ID('SyncQueue', 'U')")).fetchone() |
| if row[0] is None: |
| logger.info("SyncQueue table does not exist. Initializing from DDL script...") |
| ddl_path = Path(__file__).resolve().parents[2] / "Data" / "database" / "create_sync_queue.sql" |
| if ddl_path.exists(): |
| sql = ddl_path.read_text(encoding="utf-8") |
| conn.execute(text(sql)) |
| logger.info("SyncQueue table and indexes created successfully.") |
| else: |
| logger.error("DDL script 'create_sync_queue.sql' not found!") |
| |
| |
| row_trg = conn.execute(text("SELECT OBJECT_ID('trg_Projects_Insert', 'TR')")).fetchone() |
| if row_trg[0] is None: |
| logger.info("SQL Triggers do not exist on 'Projects'. Deploying triggers...") |
| triggers_path = Path(__file__).resolve().parents[2] / "Data" / "database" / "create_triggers.sql" |
| if triggers_path.exists(): |
| sql_content = triggers_path.read_text(encoding="utf-8") |
| import re |
| |
| statements = re.split(r'(?i)\bGO\b', sql_content) |
| for stmt in statements: |
| stmt_clean = stmt.strip() |
| if stmt_clean: |
| conn.execute(text(stmt_clean)) |
| logger.info("SQL Triggers deployed successfully.") |
| else: |
| logger.error("Trigger script 'create_triggers.sql' not found!") |
| |
| logger.info("Database connection verified and schema initialized successfully.") |
| except Exception as exc: |
| logger.critical(f"Database connection or schema initialization failed: {exc}") |
| sys.exit(1) |
| |
| rebuild_manager = RebuildManager() |
| |
| logger.info("Sync Worker started successfully and polling...") |
| while True: |
| try: |
| |
| with engine.connect() as conn: |
| result = conn.execute(text(""" |
| SELECT TOP (:batch_size) QueueId, ProjectId, OperationType, RetryCount |
| FROM SyncQueue WITH (UPDLOCK, READPAST) |
| WHERE Processed = 0 AND RetryCount < :max_retries |
| ORDER BY CreatedAt ASC |
| """), {"batch_size": BATCH_SIZE, "max_retries": MAX_RETRIES}) |
| batch = result.mappings().all() |
| |
| if not batch: |
| |
| rebuild_manager.check_and_rebuild(force=True) |
| time.sleep(POLL_INTERVAL) |
| continue |
| |
| logger.info(f"Fetched {len(batch)} items from SyncQueue.") |
| for item in batch: |
| changed = process_single_item(engine, item) |
| if changed: |
| rebuild_manager.record_change() |
| |
| |
| with engine.connect() as conn: |
| any_left = conn.execute(text(""" |
| SELECT TOP 1 QueueId FROM SyncQueue |
| WHERE Processed = 0 AND RetryCount < :max_retries |
| """), {"max_retries": MAX_RETRIES}).fetchone() |
| |
| |
| if not any_left: |
| rebuild_manager.check_and_rebuild(force=True) |
| else: |
| rebuild_manager.check_and_rebuild() |
| |
| except Exception as e: |
| logger.error(f"Error in Sync Worker loop: {e}", exc_info=True) |
| time.sleep(POLL_INTERVAL) |
|
|
| if __name__ == "__main__": |
| run_worker() |
|
|