bat-6 commited on
Commit
ac751b2
·
1 Parent(s): b4f49d4

feat: implement database synchronization queue with SQL triggers and a background worker for automated FAISS index rebuilding.

Browse files
Data/database/__pycache__/sql_connector.cpython-313.pyc CHANGED
Binary files a/Data/database/__pycache__/sql_connector.cpython-313.pyc and b/Data/database/__pycache__/sql_connector.cpython-313.pyc differ
 
Data/database/create_sync_queue.sql ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- SQL Server DDL for SyncQueue table
2
+ -- Path: D:\GRAD!!!!\Final\Graduation_Project-v1.2\Data\database\create_sync_queue.sql
3
+
4
+ IF OBJECT_ID('SyncQueue', 'U') IS NULL
5
+ BEGIN
6
+ CREATE TABLE SyncQueue (
7
+ QueueId INT IDENTITY(1,1) PRIMARY KEY,
8
+ ProjectId INT NOT NULL,
9
+ OperationType VARCHAR(10) NOT NULL,
10
+ Processed BIT NOT NULL DEFAULT 0,
11
+ CreatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
12
+ ProcessedAt DATETIME2 NULL,
13
+ RetryCount INT NOT NULL DEFAULT 0,
14
+ ErrorMessage NVARCHAR(MAX) NULL,
15
+ CONSTRAINT CHK_SyncQueue_OperationType CHECK (OperationType IN ('UPSERT', 'DELETE'))
16
+ );
17
+
18
+ -- Index to optimize querying of unprocessed items in chronological order
19
+ CREATE INDEX IX_SyncQueue_Unprocessed
20
+ ON SyncQueue (Processed, CreatedAt)
21
+ INCLUDE (ProjectId, OperationType, RetryCount);
22
+
23
+ PRINT 'SyncQueue table and index IX_SyncQueue_Unprocessed created successfully.';
24
+ END
25
+ ELSE
26
+ BEGIN
27
+ PRINT 'SyncQueue table already exists.';
28
+ END
Data/database/create_triggers.sql ADDED
@@ -0,0 +1,102 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- SQL Server Triggers for Projects Table
2
+ -- Path: D:\GRAD!!!!\Final\Graduation_Project-v1.2\Data\database\create_triggers.sql
3
+
4
+ -- 1. INSERT Trigger
5
+ IF OBJECT_ID('trg_Projects_Insert', 'TR') IS NOT NULL
6
+ DROP TRIGGER trg_Projects_Insert;
7
+ GO
8
+
9
+ CREATE TRIGGER trg_Projects_Insert
10
+ ON Projects
11
+ AFTER INSERT
12
+ AS
13
+ BEGIN
14
+ SET NOCOUNT ON;
15
+
16
+ INSERT INTO SyncQueue (ProjectId, OperationType, Processed, CreatedAt, RetryCount)
17
+ SELECT Id, 'UPSERT', 0, SYSUTCDATETIME(), 0
18
+ FROM inserted
19
+ WHERE Status IN ('Completed', 'UnderReview', 'In_Progress')
20
+ AND NOT EXISTS (
21
+ SELECT 1 FROM SyncQueue
22
+ WHERE ProjectId = inserted.Id AND Processed = 0 AND OperationType = 'UPSERT'
23
+ );
24
+ END;
25
+ GO
26
+
27
+ -- 2. DELETE Trigger
28
+ IF OBJECT_ID('trg_Projects_Delete', 'TR') IS NOT NULL
29
+ DROP TRIGGER trg_Projects_Delete;
30
+ GO
31
+
32
+ CREATE TRIGGER trg_Projects_Delete
33
+ ON Projects
34
+ AFTER DELETE
35
+ AS
36
+ BEGIN
37
+ SET NOCOUNT ON;
38
+
39
+ -- Cancel any pending unprocessed UPSERT operations for these deleted projects
40
+ UPDATE SyncQueue
41
+ SET Processed = 1,
42
+ ProcessedAt = SYSUTCDATETIME(),
43
+ ErrorMessage = 'Superseded by DELETE operation'
44
+ WHERE ProjectId IN (SELECT Id FROM deleted) AND Processed = 0;
45
+
46
+ -- Enqueue DELETE operation for previously eligible deleted projects
47
+ INSERT INTO SyncQueue (ProjectId, OperationType, Processed, CreatedAt, RetryCount)
48
+ SELECT Id, 'DELETE', 0, SYSUTCDATETIME(), 0
49
+ FROM deleted
50
+ WHERE Status IN ('Completed', 'UnderReview', 'In_Progress');
51
+ END;
52
+ GO
53
+
54
+ -- 3. UPDATE Trigger
55
+ IF OBJECT_ID('trg_Projects_Update', 'TR') IS NOT NULL
56
+ DROP TRIGGER trg_Projects_Update;
57
+ GO
58
+
59
+ CREATE TRIGGER trg_Projects_Update
60
+ ON Projects
61
+ AFTER UPDATE
62
+ AS
63
+ BEGIN
64
+ SET NOCOUNT ON;
65
+
66
+ -- Case A: Project remains eligible or becomes eligible (Status in Completed, UnderReview, In_Progress)
67
+ -- Enqueue an UPSERT operation (if not already pending unprocessed UPSERT)
68
+ INSERT INTO SyncQueue (ProjectId, OperationType, Processed, CreatedAt, RetryCount)
69
+ SELECT i.Id, 'UPSERT', 0, SYSUTCDATETIME(), 0
70
+ FROM inserted i
71
+ WHERE i.Status IN ('Completed', 'UnderReview', 'In_Progress')
72
+ AND NOT EXISTS (
73
+ SELECT 1 FROM SyncQueue q
74
+ WHERE q.ProjectId = i.Id AND q.Processed = 0 AND q.OperationType = 'UPSERT'
75
+ );
76
+
77
+ -- Case B: Project transitions from eligible status to ineligible status
78
+ -- Cancel any pending unprocessed UPSERT operations
79
+ UPDATE q
80
+ SET q.Processed = 1,
81
+ q.ProcessedAt = SYSUTCDATETIME(),
82
+ q.ErrorMessage = 'Superseded by transition to Ineligible status'
83
+ FROM SyncQueue q
84
+ JOIN inserted i ON q.ProjectId = i.Id
85
+ JOIN deleted d ON i.Id = d.Id
86
+ WHERE q.Processed = 0
87
+ AND d.Status IN ('Completed', 'UnderReview', 'In_Progress')
88
+ AND i.Status NOT IN ('Completed', 'UnderReview', 'In_Progress');
89
+
90
+ -- Enqueue a DELETE operation to remove it from preprocess/embeddings
91
+ INSERT INTO SyncQueue (ProjectId, OperationType, Processed, CreatedAt, RetryCount)
92
+ SELECT i.Id, 'DELETE', 0, SYSUTCDATETIME(), 0
93
+ FROM inserted i
94
+ JOIN deleted d ON i.Id = d.Id
95
+ WHERE d.Status IN ('Completed', 'UnderReview', 'In_Progress')
96
+ AND i.Status NOT IN ('Completed', 'UnderReview', 'In_Progress')
97
+ AND NOT EXISTS (
98
+ SELECT 1 FROM SyncQueue q
99
+ WHERE q.ProjectId = i.Id AND q.Processed = 0 AND q.OperationType = 'DELETE'
100
+ );
101
+ END;
102
+ GO
src/similarity_model/__pycache__/__init__.cpython-313.pyc CHANGED
Binary files a/src/similarity_model/__pycache__/__init__.cpython-313.pyc and b/src/similarity_model/__pycache__/__init__.cpython-313.pyc differ
 
src/similarity_model/__pycache__/semantic_search.cpython-313.pyc CHANGED
Binary files a/src/similarity_model/__pycache__/semantic_search.cpython-313.pyc and b/src/similarity_model/__pycache__/semantic_search.cpython-313.pyc differ
 
src/similarity_model/semantic_search.py CHANGED
@@ -54,37 +54,56 @@ def tokenize(text: str) -> set:
54
  """
55
  return set(normalize_text(text).split())
56
 
 
 
 
 
 
 
 
 
 
57
  @lru_cache(maxsize=1)
58
  def load_model():
59
  logger.info(f"Loading model: {DEFAULT_MODEL}")
60
  return SentenceTransformer(DEFAULT_MODEL)
61
 
62
- @lru_cache(maxsize=1)
63
  def load_faiss_index():
 
64
  if not INDEX_PATH.exists():
65
  raise FileNotFoundError("FAISS index not found.")
66
 
67
- logger.info("Loading FAISS index...")
68
- return faiss.read_index(str(INDEX_PATH))
 
 
 
 
69
 
70
- @lru_cache(maxsize=1)
71
  def load_metadata():
 
 
 
72
 
73
- logger.info(
74
- "Loading metadata from Azure SQL..."
75
- )
76
-
77
- df = load_preprocessed_projects()
78
-
79
- return df.reset_index(drop=True)
80
 
81
- @lru_cache(maxsize=1)
82
  def load_embeddings():
 
83
  if not EMBED_PATH.exists():
84
  raise FileNotFoundError("Embeddings not found.")
85
 
86
- logger.info("Loading embeddings...")
87
- return np.load(str(EMBED_PATH))
 
 
 
 
88
 
89
  def build_results(
90
  df: pd.DataFrame,
 
54
  """
55
  return set(normalize_text(text).split())
56
 
57
+ import os
58
+
59
+ _cached_faiss_index = None
60
+ _cached_faiss_index_mtime = None
61
+ _cached_metadata = None
62
+ _cached_metadata_mtime = None
63
+ _cached_embeddings = None
64
+ _cached_embeddings_mtime = None
65
+
66
  @lru_cache(maxsize=1)
67
  def load_model():
68
  logger.info(f"Loading model: {DEFAULT_MODEL}")
69
  return SentenceTransformer(DEFAULT_MODEL)
70
 
 
71
  def load_faiss_index():
72
+ global _cached_faiss_index, _cached_faiss_index_mtime
73
  if not INDEX_PATH.exists():
74
  raise FileNotFoundError("FAISS index not found.")
75
 
76
+ mtime = os.path.getmtime(INDEX_PATH)
77
+ if _cached_faiss_index is None or _cached_faiss_index_mtime != mtime:
78
+ logger.info(f"Loading FAISS index from {INDEX_PATH} (mtime: {mtime})...")
79
+ _cached_faiss_index = faiss.read_index(str(INDEX_PATH))
80
+ _cached_faiss_index_mtime = mtime
81
+ return _cached_faiss_index
82
 
 
83
  def load_metadata():
84
+ global _cached_metadata, _cached_metadata_mtime
85
+ if not INDEX_PATH.exists():
86
+ raise FileNotFoundError("FAISS index not found for metadata alignment.")
87
 
88
+ mtime = os.path.getmtime(INDEX_PATH)
89
+ if _cached_metadata is None or _cached_metadata_mtime != mtime:
90
+ logger.info(f"Loading metadata from Azure SQL (syncing with FAISS index mtime: {mtime})...")
91
+ df = load_preprocessed_projects()
92
+ _cached_metadata = df.reset_index(drop=True)
93
+ _cached_metadata_mtime = mtime
94
+ return _cached_metadata
95
 
 
96
  def load_embeddings():
97
+ global _cached_embeddings, _cached_embeddings_mtime
98
  if not EMBED_PATH.exists():
99
  raise FileNotFoundError("Embeddings not found.")
100
 
101
+ mtime = os.path.getmtime(EMBED_PATH)
102
+ if _cached_embeddings is None or _cached_embeddings_mtime != mtime:
103
+ logger.info(f"Loading embeddings from {EMBED_PATH} (mtime: {mtime})...")
104
+ _cached_embeddings = np.load(str(EMBED_PATH))
105
+ _cached_embeddings_mtime = mtime
106
+ return _cached_embeddings
107
 
108
  def build_results(
109
  df: pd.DataFrame,
src/similarity_model/sync_projects.py CHANGED
@@ -1,146 +1,17 @@
1
- import json
2
- import logging
3
  import sys
4
- import pandas as pd
5
  from pathlib import Path
6
 
7
  # Ensure workspace root is in path
8
  sys.path.append(str(Path(__file__).resolve().parents[2]))
9
 
10
- from Data.database.sql_connector import engine
11
- from src.similarity_model.preprocessing import preprocess_dataset
12
- from src.similarity_model.embedding_engine import train_embedding_engine
13
-
14
- logging.basicConfig(
15
- level=logging.INFO,
16
- format="%(asctime)s | %(levelname)s | %(message)s"
17
- )
18
- logger = logging.getLogger(__name__)
19
 
20
  def sync_projects():
21
- logger.info("Initializing project synchronization service...")
22
-
23
-
24
- try:
25
-
26
- with engine.connect() as conn:
27
- logger.info("Database connection verified successfully.")
28
- except Exception as exc:
29
- logger.error(
30
- "Unable to connect to the SQL database. Please ensure you are connected "
31
- "to the university network / VPN and that your IP is whitelisted. Error: %s",
32
- exc
33
- )
34
- sys.exit(1)
35
-
36
-
37
- projects_query = """
38
- SELECT *
39
- FROM Projects
40
- WHERE Status IN (
41
- 'Completed',
42
- 'UnderReview',
43
- 'In_Progress'
44
- )
45
  """
46
- logger.info("Fetching raw active projects from 'Projects' table...")
47
- with engine.connect() as conn:
48
- projects_df = pd.read_sql(projects_query, conn)
49
- logger.info(f"Loaded {len(projects_df)} active projects from database.")
50
-
51
-
52
- logger.info("Fetching existing records from 'preprocess'...")
53
- with engine.connect() as conn:
54
- existing_df = pd.read_sql("SELECT id FROM preprocess", conn)
55
-
56
- allowed_ids = set(projects_df["Id"].tolist())
57
- processed_ids = set(existing_df["id"].tolist())
58
-
59
- changed = False
60
-
61
-
62
- ids_to_remove = processed_ids - allowed_ids
63
- if ids_to_remove:
64
- logger.info(f"Found {len(ids_to_remove)} projects to remove (status changed or deleted).")
65
- ids_str = ",".join(map(str, ids_to_remove))
66
-
67
- with engine.begin() as conn:
68
- conn.exec_driver_sql(
69
- f"DELETE FROM preprocess WHERE id IN ({ids_str})"
70
- )
71
- logger.info(f"Successfully removed {len(ids_to_remove)} projects from 'preprocess'.")
72
- changed = True
73
-
74
-
75
- new_projects = projects_df[~projects_df["Id"].isin(processed_ids)].copy()
76
-
77
- if len(new_projects) > 0:
78
- logger.info(f"Found {len(new_projects)} new projects to preprocess and insert.")
79
-
80
-
81
- processed_df = preprocess_dataset(new_projects)
82
-
83
- if len(processed_df) > 0:
84
-
85
- cols_to_keep = [
86
- "id",
87
- "submittedat",
88
- "project_title",
89
- "studentnames",
90
- "year",
91
- "abstract",
92
- "description",
93
- "problemstatement",
94
- "proposedsolution",
95
- "objectives",
96
- "full_content",
97
- "clean_text",
98
- "word_count",
99
- "features"
100
- ]
101
-
102
-
103
- for col in cols_to_keep:
104
- if col not in processed_df.columns:
105
- processed_df[col] = ""
106
-
107
- processed_df = processed_df[cols_to_keep]
108
-
109
- processed_df = processed_df.rename(
110
- columns={
111
- "submittedat": "submitted_at",
112
- "studentnames": "student_names",
113
- "problemstatement": "problem_statement",
114
- "proposedsolution": "proposed_solution"
115
- }
116
- )
117
-
118
-
119
- processed_df["features"] = processed_df["features"].apply(json.dumps)
120
-
121
-
122
- logger.info("Uploading preprocessed records to database...")
123
- with engine.begin() as conn:
124
- processed_df.to_sql(
125
- "preprocess",
126
- conn,
127
- if_exists="append",
128
- index=False
129
- )
130
- logger.info(f"Successfully processed and inserted {len(processed_df)} projects.")
131
- changed = True
132
- else:
133
- logger.warning("No new projects remained after preprocessing filters.")
134
- else:
135
- logger.info("No new projects found.")
136
-
137
-
138
- if changed:
139
- logger.info("Changes detected. Rebuilding local embeddings and FAISS index...")
140
- train_embedding_engine()
141
- logger.info("Local embeddings and index updated successfully.")
142
- else:
143
- logger.info("No database changes detected. Embeddings remain in sync.")
144
 
145
  if __name__ == "__main__":
146
  sync_projects()
 
 
 
1
  import sys
 
2
  from pathlib import Path
3
 
4
  # Ensure workspace root is in path
5
  sys.path.append(str(Path(__file__).resolve().parents[2]))
6
 
7
+ from src.similarity_model.sync_worker import run_worker
 
 
 
 
 
 
 
 
8
 
9
  def sync_projects():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
  """
11
+ Entry point to run the event-driven synchronization worker.
12
+ This replaces the old 60-second polling-based full-table scan loop.
13
+ """
14
+ run_worker()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
  if __name__ == "__main__":
17
  sync_projects()
src/similarity_model/sync_worker.py ADDED
@@ -0,0 +1,221 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import sys
3
+ import json
4
+ import time
5
+ import logging
6
+ import pandas as pd
7
+ from pathlib import Path
8
+ from sqlalchemy import text
9
+
10
+ # Ensure workspace root is in path
11
+ sys.path.append(str(Path(__file__).resolve().parents[2]))
12
+
13
+ from Data.database.sql_connector import engine
14
+ from src.similarity_model.preprocessing import preprocess_dataset
15
+ from src.similarity_model.embedding_engine import train_embedding_engine
16
+
17
+ # Setup logging
18
+ logging.basicConfig(
19
+ level=logging.INFO,
20
+ format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
21
+ )
22
+ logger = logging.getLogger("SyncWorker")
23
+
24
+ # Settings
25
+ BATCH_SIZE = 10
26
+ MAX_RETRIES = 3
27
+ POLL_INTERVAL = 5 # seconds
28
+ REBUILD_THRESHOLD = 5 # Rebuild FAISS index after 5 database changes
29
+ REBUILD_COOLDOWN = 60 # Or after 60 seconds if changes exist but threshold not met
30
+
31
+ class RebuildManager:
32
+ def __init__(self, rebuild_threshold=REBUILD_THRESHOLD, rebuild_cooldown=REBUILD_COOLDOWN):
33
+ self.rebuild_threshold = rebuild_threshold
34
+ self.rebuild_cooldown = rebuild_cooldown
35
+ self.accumulated_changes = 0
36
+ self.last_rebuild_time = time.time()
37
+ self.pending_rebuild = False
38
+
39
+ def record_change(self):
40
+ self.accumulated_changes += 1
41
+ self.pending_rebuild = True
42
+
43
+ def check_and_rebuild(self):
44
+ if not self.pending_rebuild:
45
+ return False
46
+
47
+ now = time.time()
48
+ time_elapsed = now - self.last_rebuild_time
49
+
50
+ # Trigger rebuild if we hit the change threshold, OR if the cooldown has passed
51
+ if self.accumulated_changes >= self.rebuild_threshold or time_elapsed >= self.rebuild_cooldown:
52
+ logger.info(
53
+ f"Triggering FAISS index rebuild. "
54
+ f"Accumulated changes: {self.accumulated_changes}, time elapsed: {time_elapsed:.1f}s"
55
+ )
56
+ try:
57
+ train_embedding_engine()
58
+ self.accumulated_changes = 0
59
+ self.last_rebuild_time = now
60
+ self.pending_rebuild = False
61
+ logger.info("FAISS index rebuild completed successfully.")
62
+ return True
63
+ except Exception as e:
64
+ logger.error(f"Failed to rebuild FAISS index: {e}", exc_info=True)
65
+ return False
66
+
67
+ def process_single_item(engine, item) -> bool:
68
+ queue_id = item["QueueId"]
69
+ project_id = item["ProjectId"]
70
+ operation_type = item["OperationType"]
71
+
72
+ changed = False
73
+ try:
74
+ # Start transaction for project processing
75
+ with engine.begin() as conn:
76
+ # Re-verify queue item is still unprocessed and lock it
77
+ row = conn.execute(text("""
78
+ SELECT QueueId FROM SyncQueue WITH (UPDLOCK, HOLDLOCK)
79
+ WHERE QueueId = :queue_id AND Processed = 0
80
+ """), {"queue_id": queue_id}).fetchone()
81
+ if not row:
82
+ logger.info(f"Queue item {queue_id} already processed by another worker. Skipping.")
83
+ return False
84
+
85
+ if operation_type == 'UPSERT':
86
+ # Fetch project from Projects table
87
+ project_df = pd.read_sql(
88
+ text("SELECT * FROM Projects WHERE Id = :project_id"),
89
+ conn,
90
+ params={"project_id": project_id}
91
+ )
92
+
93
+ eligible = False
94
+ if not project_df.empty:
95
+ # Support case-insensitive key retrieval
96
+ status = project_df.iloc[0].get("Status") or project_df.iloc[0].get("status")
97
+ if status in ["Completed", "UnderReview", "In_Progress"]:
98
+ eligible = True
99
+
100
+ if eligible:
101
+ logger.info(f"Preprocessing eligible project {project_id}...")
102
+ processed_df = preprocess_dataset(project_df)
103
+
104
+ if not processed_df.empty:
105
+ # Standardize columns to match preprocess table schema
106
+ cols_to_keep = [
107
+ "id", "submittedat", "project_title", "studentnames", "year",
108
+ "abstract", "description", "problemstatement", "proposedsolution",
109
+ "objectives", "full_content", "clean_text", "word_count", "features"
110
+ ]
111
+ for col in cols_to_keep:
112
+ if col not in processed_df.columns:
113
+ processed_df[col] = ""
114
+ processed_df = processed_df[cols_to_keep]
115
+ processed_df = processed_df.rename(
116
+ columns={
117
+ "submittedat": "submitted_at",
118
+ "studentnames": "student_names",
119
+ "problemstatement": "problem_statement",
120
+ "proposedsolution": "proposed_solution"
121
+ }
122
+ )
123
+ processed_df["features"] = processed_df["features"].apply(json.dumps)
124
+
125
+ # Upsert behavior: delete existing first, then append
126
+ conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id})
127
+ processed_df.to_sql("preprocess", conn, if_exists="append", index=False)
128
+ logger.info(f"Successfully preprocessed and inserted Project {project_id} into 'preprocess'.")
129
+ changed = True
130
+ else:
131
+ logger.info(f"Project {project_id} filtered out by preprocessing. Removing from 'preprocess' table.")
132
+ conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id})
133
+ changed = True
134
+ else:
135
+ logger.info(f"Project {project_id} is ineligible or deleted. Removing from 'preprocess' table.")
136
+ conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id})
137
+ changed = True
138
+
139
+ elif operation_type == 'DELETE':
140
+ logger.info(f"Removing Project {project_id} from 'preprocess' table...")
141
+ conn.execute(text("DELETE FROM preprocess WHERE id = :id"), {"id": project_id})
142
+ changed = True
143
+
144
+ # Mark queue item as processed successfully
145
+ conn.execute(text("""
146
+ UPDATE SyncQueue
147
+ SET Processed = 1, ProcessedAt = SYSUTCDATETIME(), ErrorMessage = NULL
148
+ WHERE QueueId = :queue_id
149
+ """), {"queue_id": queue_id})
150
+
151
+ return changed
152
+
153
+ except Exception as e:
154
+ logger.error(f"Error processing queue item {queue_id} (Project {project_id}): {e}", exc_info=True)
155
+ # Log failure on SyncQueue in a separate transaction to avoid rollback
156
+ try:
157
+ with engine.begin() as error_conn:
158
+ error_conn.execute(text("""
159
+ UPDATE SyncQueue
160
+ SET RetryCount = RetryCount + 1,
161
+ ErrorMessage = :error_msg,
162
+ ProcessedAt = CASE WHEN RetryCount + 1 >= :max_retries THEN SYSUTCDATETIME() ELSE NULL END,
163
+ Processed = CASE WHEN RetryCount + 1 >= :max_retries THEN 1 ELSE 0 END
164
+ WHERE QueueId = :queue_id
165
+ """), {
166
+ "queue_id": queue_id,
167
+ "error_msg": str(e)[:4000],
168
+ "max_retries": MAX_RETRIES
169
+ })
170
+ except Exception as queue_err:
171
+ logger.error(f"Failed to write error status for queue item {queue_id}: {queue_err}")
172
+ return False
173
+
174
+ def run_worker():
175
+ logger.info("Initializing Sync Worker service...")
176
+
177
+ # Verify DB connection
178
+ try:
179
+ with engine.connect() as conn:
180
+ conn.execute(text("SELECT 1"))
181
+ logger.info("Database connection verified successfully.")
182
+ except Exception as exc:
183
+ logger.critical(f"Database connection failed: {exc}")
184
+ sys.exit(1)
185
+
186
+ rebuild_manager = RebuildManager()
187
+
188
+ logger.info("Sync Worker started successfully and polling...")
189
+ while True:
190
+ try:
191
+ # Fetch batch of unprocessed items
192
+ with engine.connect() as conn:
193
+ result = conn.execute(text("""
194
+ SELECT TOP (:batch_size) QueueId, ProjectId, OperationType, RetryCount
195
+ FROM SyncQueue WITH (UPDLOCK, READPAST)
196
+ WHERE Processed = 0 AND RetryCount < :max_retries
197
+ ORDER BY CreatedAt ASC
198
+ """), {"batch_size": BATCH_SIZE, "max_retries": MAX_RETRIES})
199
+ batch = result.mappings().all()
200
+
201
+ if not batch:
202
+ # Idle period: check if we have any pending delayed rebuilds
203
+ rebuild_manager.check_and_rebuild()
204
+ time.sleep(POLL_INTERVAL)
205
+ continue
206
+
207
+ logger.info(f"Fetched {len(batch)} items from SyncQueue.")
208
+ for item in batch:
209
+ changed = process_single_item(engine, item)
210
+ if changed:
211
+ rebuild_manager.record_change()
212
+
213
+ # Post-batch check for rebuilding FAISS index
214
+ rebuild_manager.check_and_rebuild()
215
+
216
+ except Exception as e:
217
+ logger.error(f"Error in Sync Worker loop: {e}", exc_info=True)
218
+ time.sleep(POLL_INTERVAL)
219
+
220
+ if __name__ == "__main__":
221
+ run_worker()