ifieryarrows commited on
Commit
b3b36f7
·
verified ·
1 Parent(s): 0b7d144

Sync from GitHub (tests passed)

Browse files
Dockerfile CHANGED
@@ -2,19 +2,38 @@ FROM python:3.11-slim
2
 
3
  WORKDIR /code
4
 
5
- RUN apt-get update && apt-get install -y \
 
6
  gcc \
7
  libpq-dev \
 
 
 
8
  && rm -rf /var/lib/apt/lists/*
9
 
 
10
  COPY ./requirements.txt /code/requirements.txt
11
-
12
  RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
13
 
 
14
  COPY ./app /code/app
15
  COPY ./config /code/config
 
 
16
 
17
  # Copy pre-trained model files (from Kaggle)
18
  COPY ./data/models /data/models
19
 
20
- CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860"]
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
  WORKDIR /code
4
 
5
+ # OS deps: redis-server + supervisor + build tools
6
+ RUN apt-get update && apt-get install -y --no-install-recommends \
7
  gcc \
8
  libpq-dev \
9
+ redis-server \
10
+ supervisor \
11
+ curl \
12
  && rm -rf /var/lib/apt/lists/*
13
 
14
+ # Python deps
15
  COPY ./requirements.txt /code/requirements.txt
 
16
  RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
17
 
18
+ # App code
19
  COPY ./app /code/app
20
  COPY ./config /code/config
21
+ COPY ./adapters /code/adapters
22
+ COPY ./worker /code/worker
23
 
24
  # Copy pre-trained model files (from Kaggle)
25
  COPY ./data/models /data/models
26
 
27
+ # Supervisor config
28
+ COPY ./supervisord.conf /etc/supervisor/conf.d/supervisord.conf
29
+
30
+ # HF Spaces default port: 7860
31
+ EXPOSE 7860
32
+
33
+ # Environment
34
+ ENV PYTHONUNBUFFERED=1 \
35
+ PYTHONPATH=/code \
36
+ REDIS_URL=redis://127.0.0.1:6379/0
37
+
38
+ # Run supervisord (manages redis + api + worker)
39
+ CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
adapters/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ """
2
+ Adapters: External service integrations.
3
+ """
adapters/db/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ """
2
+ Database adapters.
3
+ """
adapters/db/lock.py ADDED
@@ -0,0 +1,197 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Distributed lock using PostgreSQL advisory locks.
3
+
4
+ Advisory locks are:
5
+ - Session-based: automatically released when connection closes
6
+ - Non-blocking: can check without waiting
7
+ - Reliable: no stale locks after crash
8
+
9
+ This is the AUTHORITY for pipeline locking.
10
+ `pipeline_locks` table is for VISIBILITY only (best-effort).
11
+ """
12
+
13
+ import hashlib
14
+ import logging
15
+ from contextlib import contextmanager
16
+ from datetime import datetime, timezone
17
+ from typing import Optional
18
+
19
+ from sqlalchemy import text
20
+ from sqlalchemy.orm import Session
21
+
22
+ logger = logging.getLogger(__name__)
23
+
24
+
25
+ def _lock_key_to_id(lock_key: str) -> int:
26
+ """
27
+ Convert string lock key to bigint for pg_advisory_lock.
28
+ Uses first 15 hex chars of SHA-256 to fit in signed bigint.
29
+ """
30
+ hash_hex = hashlib.sha256(lock_key.encode()).hexdigest()[:15]
31
+ return int(hash_hex, 16)
32
+
33
+
34
+ def try_acquire_lock(session: Session, lock_key: str) -> bool:
35
+ """
36
+ Try to acquire advisory lock (non-blocking).
37
+
38
+ Args:
39
+ session: SQLAlchemy session (lock is tied to this connection)
40
+ lock_key: String identifier for the lock (e.g., "pipeline:daily")
41
+
42
+ Returns:
43
+ True if lock acquired, False if already held by another session
44
+
45
+ IMPORTANT: Lock is held until session.close() or explicit release.
46
+ Keep the same session alive for the entire pipeline run.
47
+ """
48
+ lock_id = _lock_key_to_id(lock_key)
49
+
50
+ result = session.execute(
51
+ text("SELECT pg_try_advisory_lock(:lock_id)"),
52
+ {"lock_id": lock_id}
53
+ ).scalar()
54
+
55
+ if result:
56
+ logger.info(f"Advisory lock acquired: {lock_key} (id={lock_id})")
57
+ else:
58
+ logger.warning(f"Advisory lock NOT acquired (held by another): {lock_key}")
59
+
60
+ return bool(result)
61
+
62
+
63
+ def release_lock(session: Session, lock_key: str) -> bool:
64
+ """
65
+ Release advisory lock explicitly.
66
+
67
+ Usually not needed - lock auto-releases on session close.
68
+ Use this for early release if pipeline completes before session ends.
69
+ """
70
+ lock_id = _lock_key_to_id(lock_key)
71
+
72
+ result = session.execute(
73
+ text("SELECT pg_advisory_unlock(:lock_id)"),
74
+ {"lock_id": lock_id}
75
+ ).scalar()
76
+
77
+ if result:
78
+ logger.info(f"Advisory lock released: {lock_key}")
79
+ else:
80
+ logger.warning(f"Advisory lock release failed (not held?): {lock_key}")
81
+
82
+ return bool(result)
83
+
84
+
85
+ def is_lock_held(session: Session, lock_key: str) -> bool:
86
+ """
87
+ Check if lock is currently held by ANY session.
88
+
89
+ This is a weak check - another session could acquire between check and use.
90
+ Use try_acquire_lock for actual locking.
91
+ """
92
+ lock_id = _lock_key_to_id(lock_key)
93
+
94
+ # Try to acquire, then immediately release if successful
95
+ acquired = session.execute(
96
+ text("SELECT pg_try_advisory_lock(:lock_id)"),
97
+ {"lock_id": lock_id}
98
+ ).scalar()
99
+
100
+ if acquired:
101
+ session.execute(
102
+ text("SELECT pg_advisory_unlock(:lock_id)"),
103
+ {"lock_id": lock_id}
104
+ )
105
+ return False # Was NOT held
106
+ else:
107
+ return True # IS held by another
108
+
109
+
110
+ @contextmanager
111
+ def advisory_lock(session: Session, lock_key: str, raise_on_fail: bool = True):
112
+ """
113
+ Context manager for advisory lock.
114
+
115
+ Usage:
116
+ with advisory_lock(session, "pipeline:daily"):
117
+ # Do work - lock held
118
+ pass
119
+ # Lock released
120
+
121
+ Args:
122
+ session: SQLAlchemy session
123
+ lock_key: Lock identifier
124
+ raise_on_fail: If True, raise RuntimeError if lock not acquired
125
+
126
+ Raises:
127
+ RuntimeError: If lock not acquired and raise_on_fail=True
128
+ """
129
+ acquired = try_acquire_lock(session, lock_key)
130
+
131
+ if not acquired:
132
+ if raise_on_fail:
133
+ raise RuntimeError(f"Could not acquire lock: {lock_key}")
134
+ else:
135
+ yield False
136
+ return
137
+
138
+ try:
139
+ yield True
140
+ finally:
141
+ release_lock(session, lock_key)
142
+
143
+
144
+ # Lock key constants
145
+ PIPELINE_LOCK_KEY = "pipeline:daily"
146
+
147
+
148
+ def write_lock_visibility(
149
+ session: Session,
150
+ lock_key: str,
151
+ run_id: str,
152
+ holder_id: Optional[str] = None
153
+ ) -> None:
154
+ """
155
+ Write lock info to pipeline_locks table for visibility.
156
+
157
+ This is BEST-EFFORT only - not the authority.
158
+ If this fails, pipeline continues.
159
+ """
160
+ try:
161
+ # Upsert lock info
162
+ session.execute(
163
+ text("""
164
+ INSERT INTO pipeline_locks (lock_key, holder_id, run_id, acquired_at)
165
+ VALUES (:lock_key, :holder_id, :run_id, :acquired_at)
166
+ ON CONFLICT (lock_key) DO UPDATE SET
167
+ holder_id = EXCLUDED.holder_id,
168
+ run_id = EXCLUDED.run_id,
169
+ acquired_at = EXCLUDED.acquired_at
170
+ """),
171
+ {
172
+ "lock_key": lock_key,
173
+ "holder_id": holder_id,
174
+ "run_id": run_id,
175
+ "acquired_at": datetime.now(timezone.utc),
176
+ }
177
+ )
178
+ session.commit()
179
+ except Exception as e:
180
+ logger.debug(f"Failed to write lock visibility (best-effort): {e}")
181
+ session.rollback()
182
+
183
+
184
+ def clear_lock_visibility(session: Session, lock_key: str) -> None:
185
+ """
186
+ Clear lock info from pipeline_locks table.
187
+ Best-effort only.
188
+ """
189
+ try:
190
+ session.execute(
191
+ text("DELETE FROM pipeline_locks WHERE lock_key = :lock_key"),
192
+ {"lock_key": lock_key}
193
+ )
194
+ session.commit()
195
+ except Exception as e:
196
+ logger.debug(f"Failed to clear lock visibility (best-effort): {e}")
197
+ session.rollback()
adapters/queue/__init__.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ """
2
+ Queue adapters for Redis/arq.
3
+ """
4
+ from adapters.queue.redis import get_redis_pool, RedisSettings
5
+ from adapters.queue.jobs import enqueue_pipeline_job
6
+
7
+ __all__ = ["get_redis_pool", "RedisSettings", "enqueue_pipeline_job"]
adapters/queue/jobs.py ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Job enqueue/dequeue functions for pipeline tasks.
3
+ """
4
+
5
+ import logging
6
+ from datetime import datetime, timezone
7
+ from typing import Optional
8
+ from uuid import uuid4
9
+
10
+ from arq import create_pool
11
+
12
+ from adapters.queue.redis import get_redis_settings
13
+
14
+ logger = logging.getLogger(__name__)
15
+
16
+
17
+ async def enqueue_pipeline_job(
18
+ train_model: bool = False,
19
+ trigger_source: str = "manual",
20
+ run_id: Optional[str] = None,
21
+ ) -> dict:
22
+ """
23
+ Enqueue a pipeline job to Redis.
24
+
25
+ Args:
26
+ train_model: Whether to train/retrain the XGBoost model
27
+ trigger_source: Source of trigger (manual, cron, api)
28
+ run_id: Optional run ID, generated if not provided
29
+
30
+ Returns:
31
+ dict with run_id and job_id
32
+ """
33
+ if run_id is None:
34
+ run_id = str(uuid4())
35
+
36
+ try:
37
+ redis = await create_pool(get_redis_settings())
38
+
39
+ job = await redis.enqueue_job(
40
+ "run_pipeline",
41
+ run_id=run_id,
42
+ train_model=train_model,
43
+ trigger_source=trigger_source,
44
+ enqueued_at=datetime.now(timezone.utc).isoformat(),
45
+ )
46
+
47
+ await redis.close()
48
+
49
+ logger.info(f"Pipeline job enqueued: run_id={run_id}, job_id={job.job_id}")
50
+
51
+ return {
52
+ "run_id": run_id,
53
+ "job_id": job.job_id,
54
+ "enqueued": True,
55
+ "trigger_source": trigger_source,
56
+ "train_model": train_model,
57
+ }
58
+
59
+ except Exception as e:
60
+ logger.error(f"Failed to enqueue pipeline job: {e}")
61
+ raise
62
+
63
+
64
+ async def get_job_status(job_id: str) -> Optional[dict]:
65
+ """
66
+ Get status of a queued job.
67
+
68
+ Returns:
69
+ dict with job status or None if not found
70
+ """
71
+ try:
72
+ redis = await create_pool(get_redis_settings())
73
+ job = await redis.job(job_id)
74
+ await redis.close()
75
+
76
+ if job is None:
77
+ return None
78
+
79
+ return {
80
+ "job_id": job_id,
81
+ "status": job.status,
82
+ }
83
+
84
+ except Exception as e:
85
+ logger.error(f"Failed to get job status: {e}")
86
+ return None
adapters/queue/redis.py ADDED
@@ -0,0 +1,114 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Redis connection and settings for arq queue.
3
+ """
4
+
5
+ import logging
6
+ from typing import Optional
7
+
8
+ from arq.connections import RedisSettings as ArqRedisSettings
9
+ from redis.asyncio import Redis
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+ # Module-level pool cache
14
+ _redis_pool: Optional[Redis] = None
15
+
16
+
17
+ def get_redis_settings() -> ArqRedisSettings:
18
+ """
19
+ Get Redis settings for arq worker.
20
+
21
+ Reads from environment:
22
+ REDIS_URL: Full Redis URL (redis://host:port/db)
23
+
24
+ Falls back to localhost:6379 for development.
25
+ """
26
+ import os
27
+
28
+ redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
29
+
30
+ # Parse URL for arq settings
31
+ # Format: redis://[user:password@]host:port/db
32
+ from urllib.parse import urlparse
33
+ parsed = urlparse(redis_url)
34
+
35
+ return ArqRedisSettings(
36
+ host=parsed.hostname or "localhost",
37
+ port=parsed.port or 6379,
38
+ database=int(parsed.path.lstrip("/") or 0),
39
+ password=parsed.password,
40
+ )
41
+
42
+
43
+ async def get_redis_pool(max_retries: int = 5, retry_delay: float = 1.0) -> Redis:
44
+ """
45
+ Get async Redis connection pool.
46
+ Lazy initialization, cached at module level.
47
+
48
+ Includes retry logic for HF Spaces where Redis might start
49
+ slightly after API/Worker due to supervisord startup order.
50
+ """
51
+ global _redis_pool
52
+
53
+ if _redis_pool is None:
54
+ import os
55
+ import asyncio
56
+
57
+ redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
58
+
59
+ for attempt in range(max_retries):
60
+ try:
61
+ pool = Redis.from_url(
62
+ redis_url,
63
+ decode_responses=True,
64
+ socket_connect_timeout=5.0,
65
+ socket_timeout=5.0,
66
+ )
67
+ # Test connection
68
+ await pool.ping()
69
+ _redis_pool = pool
70
+ logger.info(f"Redis pool created: {redis_url.split('@')[-1]}")
71
+ break
72
+ except Exception as e:
73
+ if attempt < max_retries - 1:
74
+ logger.warning(f"Redis connection attempt {attempt + 1} failed, retrying in {retry_delay}s: {e}")
75
+ await asyncio.sleep(retry_delay)
76
+ else:
77
+ logger.error(f"Redis connection failed after {max_retries} attempts: {e}")
78
+ raise
79
+
80
+ return _redis_pool
81
+
82
+
83
+ async def close_redis_pool():
84
+ """Close Redis pool on shutdown."""
85
+ global _redis_pool
86
+ if _redis_pool is not None:
87
+ await _redis_pool.close()
88
+ _redis_pool = None
89
+ logger.info("Redis pool closed")
90
+
91
+
92
+ async def redis_healthcheck() -> dict:
93
+ """
94
+ Check Redis connectivity.
95
+
96
+ Returns:
97
+ dict with 'ok' bool and 'latency_ms' float
98
+ """
99
+ import time
100
+
101
+ try:
102
+ pool = await get_redis_pool()
103
+ start = time.monotonic()
104
+ await pool.ping()
105
+ latency = (time.monotonic() - start) * 1000
106
+
107
+ return {"ok": True, "latency_ms": round(latency, 2)}
108
+ except Exception as e:
109
+ logger.warning(f"Redis healthcheck failed: {e}")
110
+ return {"ok": False, "error": str(e)}
111
+
112
+
113
+ # Re-export for convenience
114
+ RedisSettings = ArqRedisSettings
app/main.py CHANGED
@@ -22,15 +22,11 @@ from fastapi.middleware.cors import CORSMiddleware
22
  from sqlalchemy import func
23
 
24
  from app.db import init_db, SessionLocal, get_db_type
25
- from app.models import NewsArticle, PriceBar, DailySentiment
26
  from app.settings import get_settings
27
  from app.lock import is_pipeline_locked
28
- from app.inference import (
29
- generate_analysis_report,
30
- save_analysis_snapshot,
31
- get_latest_snapshot,
32
- get_any_snapshot,
33
- )
34
  from app.schemas import (
35
  AnalysisReport,
36
  HistoryResponse,
@@ -59,21 +55,21 @@ async def lifespan(app: FastAPI):
59
  init_db()
60
  logger.info("Database initialized")
61
 
62
- # Start scheduler if enabled
63
- settings = get_settings()
64
- if settings.scheduler_enabled:
65
- from app.scheduler import start_scheduler
66
- start_scheduler()
67
- logger.info("Scheduler started")
68
 
69
  yield
70
 
71
  # Shutdown
72
  logger.info("Shutting down CopperMind API...")
73
- if settings.scheduler_enabled:
74
- from app.scheduler import stop_scheduler
75
- stop_scheduler()
76
- logger.info("Scheduler stopped")
 
 
 
77
 
78
 
79
  # =============================================================================
@@ -108,11 +104,11 @@ app.add_middleware(
108
  "/api/analysis",
109
  response_model=AnalysisReport,
110
  responses={
111
- 404: {"model": ErrorResponse, "description": "Model or data not found"},
112
- 503: {"model": ErrorResponse, "description": "Pipeline locked, snapshot unavailable"},
113
  },
114
- summary="Get current analysis report",
115
- description="Returns the latest analysis report with predictions, sentiment, and influencers."
116
  )
117
  async def get_analysis(
118
  symbol: str = Query(default="HG=F", description="Trading symbol")
@@ -120,165 +116,88 @@ async def get_analysis(
120
  """
121
  Get current analysis report.
122
 
123
- Behavior:
124
- - If fresh snapshot exists (within TTL), return it
125
- - If pipeline is not locked, generate fresh report
126
- - If pipeline is locked, return stale snapshot or 503
 
 
 
 
 
 
 
127
  """
128
- settings = get_settings()
129
 
130
  with SessionLocal() as session:
131
- # Check for fresh snapshot first
132
- cached = get_latest_snapshot(
133
- session,
134
- symbol,
135
- max_age_minutes=settings.analysis_ttl_minutes
136
- )
137
 
138
- if cached:
139
- logger.debug(f"Cached snapshot exists, but running live prediction for accuracy")
140
- import yfinance as yf
141
- import xgboost as xgb
142
-
143
- try:
144
- # Get live price from yfinance
145
- ticker = yf.Ticker(symbol)
146
- info = ticker.info
147
- live_price = info.get('regularMarketPrice') or info.get('currentPrice')
148
-
149
- if live_price is not None:
150
- cached['current_price'] = round(float(live_price), 4)
151
-
152
- # Get latest DB close price for prediction base
153
- # Model predicts based on historical closes, not intraday prices
154
- latest_bar = session.query(PriceBar).filter(
155
- PriceBar.symbol == symbol
156
- ).order_by(PriceBar.date.desc()).first()
157
- if live_price is not None:
158
- # Prioritize live price for prediction base
159
- prediction_base = float(live_price)
160
- elif latest_bar:
161
- # Fallback to DB close
162
- prediction_base = latest_bar.close
163
- else:
164
- prediction_base = 0.0
165
-
166
- # Run LIVE model prediction
167
- from app.ai_engine import load_model, load_model_metadata
168
- from app.inference import build_features_for_prediction
169
-
170
- model = load_model(symbol)
171
- metadata = load_model_metadata(symbol)
172
- features = metadata.get("features", [])
173
-
174
- if model and features:
175
- # Build features and predict
176
- X = build_features_for_prediction(session, symbol, features)
177
- if X is not None and not X.empty:
178
- dmatrix = xgb.DMatrix(X, feature_names=features)
179
- predicted_return = float(model.predict(dmatrix)[0])
180
-
181
- # Update with live prediction
182
- # Apply futures-spot adjustment (HG=F is ~1.5% higher than XCU/USD)
183
- adjustment = settings.futures_spot_adjustment
184
- adjusted_base = float(prediction_base) * adjustment
185
-
186
- cached['predicted_return'] = round(predicted_return, 6)
187
- cached['predicted_price'] = round(
188
- adjusted_base * (1 + predicted_return),
189
- 4
190
- )
191
-
192
- # Also adjust current_price for consistency
193
- cached['current_price'] = round(adjusted_base, 4)
194
-
195
- # Update confidence bounds (based on adjusted base)
196
- std_mult = 1.0 # 1 standard deviation
197
- cached['confidence_lower'] = round(adjusted_base * (1 - std_mult * abs(predicted_return)), 4)
198
- cached['confidence_upper'] = round(adjusted_base * (1 + std_mult * abs(predicted_return) * 2), 4)
199
-
200
- logger.info(f"LIVE prediction: HG=F=${prediction_base:.4f} -> XCU/USD≈${adjusted_base:.4f}, predicted=${cached['predicted_price']:.4f} ({predicted_return*100:.2f}%)")
201
-
202
- except Exception as e:
203
- logger.error(f"Live prediction failed, using cached: {e}")
204
-
205
- # Update top_influencers from current model metadata
206
- try:
207
- from app.ai_engine import load_model_metadata
208
- from app.features import get_feature_descriptions
209
-
210
- metadata = load_model_metadata(symbol)
211
- importance = metadata.get("importance", [])
212
-
213
- if importance:
214
- descriptions = get_feature_descriptions()
215
- top_influencers = []
216
-
217
- for item in importance[:10]:
218
- feat = item["feature"]
219
- desc = None
220
- for key, value in descriptions.items():
221
- if key in feat:
222
- desc = value
223
- break
224
- if desc is None:
225
- desc = feat.replace("_", " ").replace(" ", " ").title()
226
-
227
- top_influencers.append({
228
- "feature": feat,
229
- "importance": item["importance"],
230
- "description": desc,
231
- })
232
-
233
- cached['top_influencers'] = top_influencers
234
- logger.info(f"Updated cached snapshot with fresh influencers from model")
235
- except Exception as e:
236
- logger.debug(f"Could not update influencers in cached snapshot: {e}")
237
-
238
- return cached
239
 
240
- # Check if pipeline is locked
241
- if is_pipeline_locked():
242
- # Try to return stale snapshot
243
- stale = get_any_snapshot(session, symbol)
244
- if stale:
245
- logger.info(f"Pipeline locked, returning stale snapshot for {symbol}")
246
- return stale
247
-
248
- raise HTTPException(
249
- status_code=503,
250
- detail="Pipeline is currently running. No cached snapshot available. Please try again later."
251
- )
252
 
253
- # Generate fresh report
254
- try:
255
- report = generate_analysis_report(session, symbol)
256
-
257
- if report is None:
258
- raise HTTPException(
259
- status_code=404,
260
- detail=f"Could not generate analysis for {symbol}. "
261
- "Please ensure data has been fetched (make seed) and model trained (make train)."
262
- )
263
-
264
- # Save as snapshot
265
- save_analysis_snapshot(session, report, symbol)
266
-
267
- return report
268
-
269
- except Exception as e:
270
- logger.error(f"Error generating analysis: {e}")
271
-
272
- # Try stale snapshot as fallback
273
- stale = get_any_snapshot(session, symbol)
274
- if stale:
275
- logger.info(f"Error in fresh generation, returning stale snapshot")
276
- return stale
277
-
278
- raise HTTPException(
279
- status_code=500,
280
- detail=f"Error generating analysis: {str(e)}"
281
- )
 
 
 
282
 
283
 
284
  @app.get(
@@ -366,13 +285,14 @@ async def get_history(
366
  "/api/health",
367
  response_model=HealthResponse,
368
  summary="System health check",
369
- description="Returns system status including database, models, and pipeline lock state."
370
  )
371
  async def health_check():
372
  """
373
  Perform system health check.
374
 
375
  Returns status information useful for monitoring and debugging.
 
376
  """
377
  settings = get_settings()
378
  model_dir = Path(settings.model_dir)
@@ -382,17 +302,42 @@ async def health_check():
382
  if model_dir.exists():
383
  models_found = len(list(model_dir.glob("xgb_*_latest.json")))
384
 
385
- # Get counts
386
  news_count = None
387
  price_count = None
 
388
 
389
  try:
390
  with SessionLocal() as session:
391
  news_count = session.query(func.count(NewsArticle.id)).scalar()
392
  price_count = session.query(func.count(PriceBar.id)).scalar()
 
 
 
 
 
 
 
 
 
 
 
393
  except Exception as e:
394
  logger.error(f"Error getting counts: {e}")
395
 
 
 
 
 
 
 
 
 
 
 
 
 
 
396
  # Determine status
397
  pipeline_locked = is_pipeline_locked()
398
 
@@ -400,6 +345,8 @@ async def health_check():
400
  status = "degraded"
401
  elif pipeline_locked:
402
  status = "degraded"
 
 
403
  else:
404
  status = "healthy"
405
 
@@ -410,7 +357,9 @@ async def health_check():
410
  pipeline_locked=pipeline_locked,
411
  timestamp=datetime.now(timezone.utc).isoformat(),
412
  news_count=news_count,
413
- price_bars_count=price_count
 
 
414
  )
415
 
416
 
@@ -716,148 +665,61 @@ def verify_pipeline_secret(authorization: Optional[str] = Header(None)) -> None:
716
 
717
  @app.post(
718
  "/api/pipeline/trigger",
719
- summary="Trigger data pipeline (requires authentication)",
720
- description="Manually trigger data fetch and AI pipeline. Requires Authorization: Bearer <PIPELINE_TRIGGER_SECRET> header.",
721
  responses={
722
- 200: {"description": "Pipeline triggered successfully"},
723
  401: {"description": "Unauthorized - missing or invalid token"},
724
  409: {"description": "Pipeline already running"},
 
725
  },
726
  )
727
  async def trigger_pipeline(
728
- fetch_data: bool = Query(default=True, description="Fetch new data from sources"),
729
- train_model: bool = Query(default=True, description="Train/retrain XGBoost model"),
730
  _auth: None = Depends(verify_pipeline_secret),
731
  ):
732
  """
733
- Manually trigger the pipeline.
734
 
735
- This will:
736
- 1. Fetch new news and price data (if fetch_data=True)
737
- 2. Run sentiment scoring
738
- 3. Train XGBoost model (if train_model=True)
739
- """
740
- from threading import Thread
741
 
 
 
 
 
 
 
742
  if is_pipeline_locked():
743
  raise HTTPException(
744
  status_code=409,
745
  detail="Pipeline is already running. Please wait for it to complete."
746
  )
747
 
748
- def run_pipeline():
749
- try:
750
- from app.lock import PipelineLock
751
- from app.inference import generate_analysis_report, save_analysis_snapshot
752
- from app.db import SessionLocal
753
-
754
- lock = PipelineLock(timeout=0)
755
- if not lock.acquire():
756
- logger.error("Could not acquire pipeline lock")
757
- return
758
-
759
- try:
760
- settings = get_settings()
761
-
762
- if fetch_data:
763
- logger.info("Step 1: Fetching data...")
764
- from app.data_manager import fetch_all
765
- fetch_all(news=True, prices=True)
766
- logger.info("Data fetch complete")
767
-
768
- logger.info(f"Step 2: Running AI pipeline (train_model={train_model})...")
769
- from app.ai_engine import run_full_pipeline
770
- ai_result = run_full_pipeline(
771
- target_symbol="HG=F",
772
- score_sentiment=True,
773
- aggregate_sentiment=True,
774
- train_model=train_model
775
- )
776
- logger.info(f"AI pipeline complete: scored={ai_result.get('scored_articles', 0)}, aggregated={ai_result.get('aggregated_days', 0)}")
777
-
778
- # Log model training result specifically
779
- if train_model:
780
- model_result = ai_result.get('model_result')
781
- if model_result:
782
- logger.info(f"Model training SUCCESS: {model_result.get('model_path')}")
783
- logger.info(f"Top influencers updated: {[i['feature'] for i in model_result.get('top_influencers', [])[:3]]}")
784
- else:
785
- logger.warning("Model training returned None - check for errors above")
786
-
787
- # Step 3: Generate snapshot
788
- logger.info("Step 3: Generating analysis snapshot...")
789
- with SessionLocal() as session:
790
- # Clear old snapshots for this symbol to ensure fresh data
791
- from app.models import AnalysisSnapshot
792
- deleted = session.query(AnalysisSnapshot).filter(
793
- AnalysisSnapshot.symbol == settings.target_symbol
794
- ).delete()
795
- if deleted:
796
- session.commit()
797
- logger.info(f"Cleared {deleted} old snapshot(s) for {settings.target_symbol}")
798
-
799
- report = generate_analysis_report(session, settings.target_symbol)
800
- if report:
801
- save_analysis_snapshot(session, report, settings.target_symbol)
802
- logger.info(f"Snapshot generated")
803
-
804
- # Step 4: Generate AI Commentary
805
- logger.info("Step 4: Generating AI commentary...")
806
- try:
807
- import asyncio
808
- from app.commentary import generate_and_save_commentary
809
- from sqlalchemy import func
810
- from app.models import NewsArticle
811
- from datetime import timedelta
812
-
813
- # Get news count for last 7 days
814
- week_ago = datetime.now() - timedelta(days=7)
815
- news_count = session.query(func.count(NewsArticle.id)).filter(
816
- NewsArticle.published_at >= week_ago
817
- ).scalar() or 0
818
-
819
- # Run async function in sync context
820
- loop = asyncio.new_event_loop()
821
- asyncio.set_event_loop(loop)
822
- try:
823
- commentary = loop.run_until_complete(
824
- generate_and_save_commentary(
825
- session=session,
826
- symbol=settings.target_symbol,
827
- current_price=report.get('current_price', 0),
828
- predicted_price=report.get('predicted_price', 0),
829
- predicted_return=report.get('predicted_return', 0),
830
- sentiment_index=report.get('sentiment_index', 0),
831
- sentiment_label=report.get('sentiment_label', 'Neutral'),
832
- top_influencers=report.get('top_influencers', []),
833
- news_count=news_count,
834
- )
835
- )
836
- if commentary:
837
- logger.info("AI commentary generated and saved")
838
- else:
839
- logger.warning("AI commentary skipped (API key not configured or failed)")
840
- finally:
841
- loop.close()
842
- except Exception as ce:
843
- logger.error(f"AI commentary generation failed: {ce}")
844
- else:
845
- logger.warning("Could not generate analysis snapshot")
846
-
847
- finally:
848
- lock.release()
849
-
850
- except Exception as e:
851
- logger.error(f"Pipeline error: {e}", exc_info=True)
852
-
853
- # Run in background thread
854
- thread = Thread(target=run_pipeline, daemon=True)
855
- thread.start()
856
-
857
- return {
858
- "status": "triggered",
859
- "message": "Pipeline started in background. Check /api/health for status.",
860
- "fetch_data": fetch_data,
861
- "train_model": train_model
862
- }
863
 
 
22
  from sqlalchemy import func
23
 
24
  from app.db import init_db, SessionLocal, get_db_type
25
+ from app.models import NewsArticle, PriceBar, DailySentiment, AnalysisSnapshot
26
  from app.settings import get_settings
27
  from app.lock import is_pipeline_locked
28
+ # NOTE: Faz 1 - API is snapshot-only, no report generation
29
+ # generate_analysis_report and save_analysis_snapshot are now worker-only
 
 
 
 
30
  from app.schemas import (
31
  AnalysisReport,
32
  HistoryResponse,
 
55
  init_db()
56
  logger.info("Database initialized")
57
 
58
+ # NOTE: Scheduler is NO LONGER started here.
59
+ # Pipeline scheduling is now external (GitHub Actions cron).
60
+ # This API only reads data and enqueues jobs.
 
 
 
61
 
62
  yield
63
 
64
  # Shutdown
65
  logger.info("Shutting down CopperMind API...")
66
+ # Close Redis pool if initialized
67
+ try:
68
+ from adapters.queue.redis import close_redis_pool
69
+ import asyncio
70
+ asyncio.create_task(close_redis_pool())
71
+ except ImportError:
72
+ pass
73
 
74
 
75
  # =============================================================================
 
104
  "/api/analysis",
105
  response_model=AnalysisReport,
106
  responses={
107
+ 200: {"description": "Analysis report (may include quality_state for degraded modes)"},
108
+ 404: {"model": ErrorResponse, "description": "No snapshot available"},
109
  },
110
+ summary="Get current analysis report (snapshot-only)",
111
+ description="Returns the latest cached analysis snapshot. No live computation - all heavy work is done by the worker."
112
  )
113
  async def get_analysis(
114
  symbol: str = Query(default="HG=F", description="Trading symbol")
 
116
  """
117
  Get current analysis report.
118
 
119
+ SNAPSHOT-ONLY MODE (Faz 1):
120
+ - Reads the latest snapshot from database
121
+ - NO yfinance calls
122
+ - NO model loading
123
+ - NO feature building
124
+ - All heavy computation is done by the worker pipeline
125
+
126
+ Response includes quality_state:
127
+ - "ok": Fresh snapshot available
128
+ - "stale": Snapshot older than 36 hours
129
+ - "missing": No snapshot found
130
  """
131
+ STALE_THRESHOLD_HOURS = 36
132
 
133
  with SessionLocal() as session:
134
+ # Get latest snapshot - any age
135
+ snapshot = session.query(AnalysisSnapshot).filter(
136
+ AnalysisSnapshot.symbol == symbol
137
+ ).order_by(AnalysisSnapshot.generated_at.desc()).first()
 
 
138
 
139
+ if snapshot is None:
140
+ # No snapshot at all - return minimal response for UI compatibility
141
+ logger.warning(f"No snapshot found for {symbol}")
142
+ return {
143
+ "symbol": symbol,
144
+ "quality_state": "missing",
145
+ "model_state": "offline",
146
+ "current_price": 0.0,
147
+ "predicted_return": 0.0,
148
+ "predicted_price": 0.0,
149
+ "confidence_lower": 0.0,
150
+ "confidence_upper": 0.0,
151
+ "sentiment_index": 0.0,
152
+ "sentiment_label": "Neutral",
153
+ "top_influencers": [],
154
+ "data_quality": {
155
+ "news_count_7d": 0,
156
+ "missing_days": 0,
157
+ "coverage_pct": 0,
158
+ },
159
+ "generated_at": None,
160
+ "message": "No analysis available. Pipeline may not have run yet.",
161
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
162
 
163
+ # Calculate snapshot age
164
+ now = datetime.now(timezone.utc)
165
+ generated_at = snapshot.generated_at
166
+ if generated_at.tzinfo is None:
167
+ generated_at = generated_at.replace(tzinfo=timezone.utc)
 
 
 
 
 
 
 
168
 
169
+ age_hours = (now - generated_at).total_seconds() / 3600
170
+
171
+ # Determine quality state
172
+ if age_hours > STALE_THRESHOLD_HOURS:
173
+ quality_state = "stale"
174
+ else:
175
+ quality_state = "ok"
176
+
177
+ # Build response from snapshot
178
+ report = snapshot.report_json.copy() if snapshot.report_json else {}
179
+
180
+ # Add/override metadata
181
+ report["quality_state"] = quality_state
182
+ report["model_state"] = "ok" if quality_state == "ok" else "degraded"
183
+ report["snapshot_age_hours"] = round(age_hours, 1)
184
+ report["generated_at"] = generated_at.isoformat()
185
+
186
+ # Ensure required fields exist (backward compatibility)
187
+ if "symbol" not in report:
188
+ report["symbol"] = symbol
189
+ if "data_quality" not in report:
190
+ report["data_quality"] = {
191
+ "news_count_7d": 0,
192
+ "missing_days": 0,
193
+ "coverage_pct": 0,
194
+ }
195
+ if "top_influencers" not in report:
196
+ report["top_influencers"] = []
197
+
198
+ logger.info(f"Returning snapshot for {symbol}: age={age_hours:.1f}h, state={quality_state}")
199
+
200
+ return report
201
 
202
 
203
  @app.get(
 
285
  "/api/health",
286
  response_model=HealthResponse,
287
  summary="System health check",
288
+ description="Returns system status including database, Redis queue, models, and pipeline lock state."
289
  )
290
  async def health_check():
291
  """
292
  Perform system health check.
293
 
294
  Returns status information useful for monitoring and debugging.
295
+ Includes Redis queue status and snapshot age for Faz 1 observability.
296
  """
297
  settings = get_settings()
298
  model_dir = Path(settings.model_dir)
 
302
  if model_dir.exists():
303
  models_found = len(list(model_dir.glob("xgb_*_latest.json")))
304
 
305
+ # Get counts and snapshot age
306
  news_count = None
307
  price_count = None
308
+ last_snapshot_age = None
309
 
310
  try:
311
  with SessionLocal() as session:
312
  news_count = session.query(func.count(NewsArticle.id)).scalar()
313
  price_count = session.query(func.count(PriceBar.id)).scalar()
314
+
315
+ # Get latest snapshot age
316
+ from app.models import AnalysisSnapshot
317
+ latest_snapshot = session.query(AnalysisSnapshot).order_by(
318
+ AnalysisSnapshot.generated_at.desc()
319
+ ).first()
320
+
321
+ if latest_snapshot and latest_snapshot.generated_at:
322
+ age = datetime.now(timezone.utc) - latest_snapshot.generated_at.replace(tzinfo=timezone.utc)
323
+ last_snapshot_age = int(age.total_seconds())
324
+
325
  except Exception as e:
326
  logger.error(f"Error getting counts: {e}")
327
 
328
+ # Check Redis connectivity
329
+ redis_ok = None
330
+ try:
331
+ from adapters.queue.redis import redis_healthcheck
332
+ redis_result = await redis_healthcheck()
333
+ redis_ok = redis_result.get("ok", False)
334
+ except ImportError:
335
+ # Redis adapter not available yet
336
+ redis_ok = None
337
+ except Exception as e:
338
+ logger.warning(f"Redis healthcheck failed: {e}")
339
+ redis_ok = False
340
+
341
  # Determine status
342
  pipeline_locked = is_pipeline_locked()
343
 
 
345
  status = "degraded"
346
  elif pipeline_locked:
347
  status = "degraded"
348
+ elif redis_ok is False:
349
+ status = "degraded"
350
  else:
351
  status = "healthy"
352
 
 
357
  pipeline_locked=pipeline_locked,
358
  timestamp=datetime.now(timezone.utc).isoformat(),
359
  news_count=news_count,
360
+ price_bars_count=price_count,
361
+ redis_ok=redis_ok,
362
+ last_snapshot_age_seconds=last_snapshot_age,
363
  )
364
 
365
 
 
665
 
666
  @app.post(
667
  "/api/pipeline/trigger",
668
+ summary="Enqueue pipeline job (requires authentication)",
669
+ description="Enqueue a pipeline job to Redis queue. Worker executes the job. Requires Authorization: Bearer <PIPELINE_TRIGGER_SECRET> header.",
670
  responses={
671
+ 200: {"description": "Pipeline job enqueued successfully"},
672
  401: {"description": "Unauthorized - missing or invalid token"},
673
  409: {"description": "Pipeline already running"},
674
+ 503: {"description": "Redis queue unavailable"},
675
  },
676
  )
677
  async def trigger_pipeline(
678
+ train_model: bool = Query(default=False, description="Train/retrain XGBoost model"),
679
+ trigger_source: str = Query(default="api", description="Source of trigger (api, cron, manual)"),
680
  _auth: None = Depends(verify_pipeline_secret),
681
  ):
682
  """
683
+ Enqueue a pipeline job to Redis queue.
684
 
685
+ This endpoint does NOT run the pipeline - it only enqueues a job.
686
+ The worker service consumes and executes the job.
 
 
 
 
687
 
688
+ Returns:
689
+ run_id: UUID for tracking this pipeline run
690
+ enqueued: True if job was enqueued successfully
691
+ """
692
+ # Check if pipeline is already running (advisory lock check)
693
+ # Note: This is a weak check - the worker will do the authoritative lock check
694
  if is_pipeline_locked():
695
  raise HTTPException(
696
  status_code=409,
697
  detail="Pipeline is already running. Please wait for it to complete."
698
  )
699
 
700
+ try:
701
+ from adapters.queue.jobs import enqueue_pipeline_job
702
+
703
+ result = await enqueue_pipeline_job(
704
+ train_model=train_model,
705
+ trigger_source=trigger_source,
706
+ )
707
+
708
+ logger.info(f"Pipeline job enqueued: run_id={result['run_id']}, trigger={trigger_source}")
709
+
710
+ return {
711
+ "status": "enqueued",
712
+ "message": "Pipeline job enqueued. Worker will execute. Check /api/health for status.",
713
+ "run_id": result["run_id"],
714
+ "job_id": result["job_id"],
715
+ "train_model": train_model,
716
+ "trigger_source": trigger_source,
717
+ }
718
+
719
+ except Exception as e:
720
+ logger.error(f"Failed to enqueue pipeline job: {e}")
721
+ raise HTTPException(
722
+ status_code=503,
723
+ detail=f"Failed to enqueue job. Redis may be unavailable: {str(e)}"
724
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
725
 
app/models.py CHANGED
@@ -15,6 +15,7 @@ from typing import Optional
15
  from sqlalchemy import (
16
  Column,
17
  Integer,
 
18
  String,
19
  Float,
20
  DateTime,
@@ -24,7 +25,9 @@ from sqlalchemy import (
24
  Index,
25
  UniqueConstraint,
26
  JSON,
 
27
  )
 
28
  from sqlalchemy.orm import relationship
29
 
30
  from app.db import Base
@@ -302,19 +305,125 @@ class PipelineRunMetrics(Base):
302
  train_samples = Column(Integer, nullable=True)
303
  val_samples = Column(Integer, nullable=True)
304
 
305
- # Data quality
306
  news_imported = Column(Integer, nullable=True)
307
  news_duplicates = Column(Integer, nullable=True)
308
  price_bars_updated = Column(Integer, nullable=True)
309
  missing_price_days = Column(Integer, nullable=True)
310
 
 
 
 
 
 
 
311
  # Snapshot info
312
  snapshot_generated = Column(Boolean, default=False)
313
  commentary_generated = Column(Boolean, default=False)
314
 
 
 
 
 
 
 
315
  # Status
316
  status = Column(String(20), nullable=False, default="running") # running/success/failed
317
  error_message = Column(Text, nullable=True)
318
 
319
  def __repr__(self):
320
  return f"<PipelineRunMetrics(run_id={self.run_id}, status={self.status})>"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  from sqlalchemy import (
16
  Column,
17
  Integer,
18
+ BigInteger,
19
  String,
20
  Float,
21
  DateTime,
 
25
  Index,
26
  UniqueConstraint,
27
  JSON,
28
+ func,
29
  )
30
+ from sqlalchemy.dialects.postgresql import UUID, JSONB
31
  from sqlalchemy.orm import relationship
32
 
33
  from app.db import Base
 
305
  train_samples = Column(Integer, nullable=True)
306
  val_samples = Column(Integer, nullable=True)
307
 
308
+ # Data quality (legacy - news_articles table)
309
  news_imported = Column(Integer, nullable=True)
310
  news_duplicates = Column(Integer, nullable=True)
311
  price_bars_updated = Column(Integer, nullable=True)
312
  missing_price_days = Column(Integer, nullable=True)
313
 
314
+ # Faz 2: Reproducible news pipeline stats
315
+ news_raw_inserted = Column(Integer, nullable=True)
316
+ news_raw_duplicates = Column(Integer, nullable=True)
317
+ news_processed_inserted = Column(Integer, nullable=True)
318
+ news_processed_duplicates = Column(Integer, nullable=True)
319
+
320
  # Snapshot info
321
  snapshot_generated = Column(Boolean, default=False)
322
  commentary_generated = Column(Boolean, default=False)
323
 
324
+ # Faz 2: News cut-off time
325
+ news_cutoff_time = Column(DateTime(timezone=True), nullable=True)
326
+
327
+ # Quality state for degraded runs
328
+ quality_state = Column(String(20), nullable=True, default="ok") # ok/stale/degraded/failed
329
+
330
  # Status
331
  status = Column(String(20), nullable=False, default="running") # running/success/failed
332
  error_message = Column(Text, nullable=True)
333
 
334
  def __repr__(self):
335
  return f"<PipelineRunMetrics(run_id={self.run_id}, status={self.status})>"
336
+
337
+
338
+ # =============================================================================
339
+ # Faz 2: Reproducible News Pipeline
340
+ # =============================================================================
341
+
342
+ class NewsRaw(Base):
343
+ """
344
+ Ham haber verisi - RSS/API'den geldiği gibi saklanır.
345
+
346
+ Faz 2: Reproducibility için "golden source".
347
+
348
+ Dedup stratejisi:
349
+ - url_hash: nullable + partial unique index (WHERE url_hash IS NOT NULL)
350
+ - URL eksikse title-based fallback processed seviyesinde yapılır
351
+ """
352
+ __tablename__ = "news_raw"
353
+
354
+ id = Column(BigInteger, primary_key=True, autoincrement=True)
355
+
356
+ # URL (nullable - RSS'te eksik olabilir)
357
+ url = Column(String(2000), nullable=True)
358
+ url_hash = Column(String(64), nullable=True, index=True) # sha256, partial unique
359
+
360
+ # Content
361
+ title = Column(String(500), nullable=False)
362
+ description = Column(Text, nullable=True)
363
+
364
+ # Metadata
365
+ source = Column(String(200), nullable=True) # "google_news", "newsapi"
366
+ source_feed = Column(String(500), nullable=True) # Exact RSS URL or query
367
+ published_at = Column(DateTime(timezone=True), nullable=False, index=True)
368
+ fetched_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
369
+
370
+ # Pipeline run tracking (UUID)
371
+ run_id = Column(UUID(as_uuid=True), nullable=True, index=True)
372
+
373
+ # Raw payload (debug/audit)
374
+ raw_payload = Column(JSONB, nullable=True)
375
+
376
+ # Relationship
377
+ processed_items = relationship("NewsProcessed", back_populates="raw")
378
+
379
+ def __repr__(self):
380
+ return f"<NewsRaw(id={self.id}, title='{self.title[:30]}...')>"
381
+
382
+
383
+ class NewsProcessed(Base):
384
+ """
385
+ İşlenmiş haber - dedup, cleaning, language filter sonrası.
386
+
387
+ Faz 2: Sentiment scoring için input.
388
+
389
+ Dedup stratejisi:
390
+ - dedup_key: NOT NULL + UNIQUE - asıl dedup otoritesi
391
+ - Öncelik: url_hash varsa kullan, yoksa sha256(source + canonical_title_hash)
392
+ """
393
+ __tablename__ = "news_processed"
394
+
395
+ id = Column(BigInteger, primary_key=True, autoincrement=True)
396
+
397
+ # FK to raw (RESTRICT - raw silinirse processed da silinmemeli)
398
+ raw_id = Column(
399
+ BigInteger,
400
+ ForeignKey("news_raw.id", ondelete="RESTRICT"),
401
+ nullable=False,
402
+ index=True
403
+ )
404
+
405
+ # Canonical content
406
+ canonical_title = Column(String(500), nullable=False)
407
+ canonical_title_hash = Column(String(64), nullable=False, index=True) # sha256
408
+ cleaned_text = Column(Text, nullable=True) # title + description, cleaned
409
+
410
+ # Dedup key - ASIL OTORİTE
411
+ dedup_key = Column(String(64), unique=True, nullable=False, index=True) # sha256
412
+
413
+ # Language
414
+ language = Column(String(10), nullable=True, default="en")
415
+ language_confidence = Column(Float, nullable=True)
416
+
417
+ # Processing metadata
418
+ processed_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
419
+ run_id = Column(UUID(as_uuid=True), nullable=True, index=True)
420
+
421
+ # Future: Tone/Impact scores (Faz 3)
422
+ # tone_score = Column(Float, nullable=True)
423
+ # impact_direction = Column(String(20), nullable=True) # bullish/bearish/neutral
424
+
425
+ # Relationship
426
+ raw = relationship("NewsRaw", back_populates="processed_items")
427
+
428
+ def __repr__(self):
429
+ return f"<NewsProcessed(id={self.id}, dedup_key='{self.dedup_key[:16]}...')>"
app/schemas.py CHANGED
@@ -24,18 +24,37 @@ class DataQuality(BaseModel):
24
 
25
 
26
  class AnalysisReport(BaseModel):
27
- """Full analysis report returned by /api/analysis."""
 
 
 
 
 
28
  symbol: str = Field(..., description="Trading symbol (e.g., HG=F)")
29
- current_price: float = Field(..., description="Most recent closing price")
30
- predicted_return: float = Field(..., description="Predicted next-day return")
31
- predicted_price: float = Field(..., description="Predicted next-day price")
32
- confidence_lower: float = Field(..., description="Lower bound of confidence interval")
33
- confidence_upper: float = Field(..., description="Upper bound of confidence interval")
34
- sentiment_index: float = Field(..., description="Current sentiment index (-1 to 1)")
35
- sentiment_label: str = Field(..., description="Sentiment label: Bullish, Bearish, or Neutral")
36
- top_influencers: list[Influencer] = Field(..., description="Top feature influencers")
37
- data_quality: DataQuality = Field(..., description="Data quality metrics")
38
- generated_at: str = Field(..., description="ISO timestamp of report generation")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
 
40
  class Config:
41
  json_schema_extra = {
@@ -57,7 +76,10 @@ class AnalysisReport(BaseModel):
57
  "missing_days": 0,
58
  "coverage_pct": 100
59
  },
60
- "generated_at": "2026-01-02T09:00:00Z"
 
 
 
61
  }
62
  }
63
 
@@ -96,17 +118,23 @@ class HealthResponse(BaseModel):
96
  timestamp: str = Field(..., description="Current server timestamp")
97
  news_count: Optional[int] = Field(None, description="Total news articles in database")
98
  price_bars_count: Optional[int] = Field(None, description="Total price bars in database")
 
 
 
 
99
 
100
  class Config:
101
  json_schema_extra = {
102
  "example": {
103
  "status": "healthy",
104
- "db_type": "sqlite",
105
  "models_found": 1,
106
  "pipeline_locked": False,
107
  "timestamp": "2026-01-02T10:00:00Z",
108
  "news_count": 1250,
109
- "price_bars_count": 1460
 
 
110
  }
111
  }
112
 
 
24
 
25
 
26
  class AnalysisReport(BaseModel):
27
+ """
28
+ Full analysis report returned by /api/analysis.
29
+
30
+ Faz 1: Snapshot-only mode - fields may be null in degraded states.
31
+ Check quality_state to determine data freshness.
32
+ """
33
  symbol: str = Field(..., description="Trading symbol (e.g., HG=F)")
34
+
35
+ # Core prediction data (nullable for degraded modes)
36
+ current_price: Optional[float] = Field(0.0, description="Most recent closing price")
37
+ predicted_return: Optional[float] = Field(0.0, description="Predicted next-day return")
38
+ predicted_price: Optional[float] = Field(0.0, description="Predicted next-day price")
39
+ confidence_lower: Optional[float] = Field(0.0, description="Lower bound of confidence interval")
40
+ confidence_upper: Optional[float] = Field(0.0, description="Upper bound of confidence interval")
41
+ sentiment_index: Optional[float] = Field(0.0, description="Current sentiment index (-1 to 1)")
42
+ sentiment_label: Optional[str] = Field("Neutral", description="Sentiment label: Bullish, Bearish, or Neutral")
43
+
44
+ # Feature influencers (may be empty)
45
+ top_influencers: list[Influencer] = Field(default_factory=list, description="Top feature influencers")
46
+
47
+ # Data quality (always present)
48
+ data_quality: Optional[DataQuality] = Field(None, description="Data quality metrics")
49
+
50
+ # Timestamps
51
+ generated_at: Optional[str] = Field(None, description="ISO timestamp of report generation")
52
+
53
+ # Faz 1: Quality state fields
54
+ quality_state: Optional[str] = Field("ok", description="Snapshot quality: ok, stale, missing")
55
+ model_state: Optional[str] = Field("ok", description="Model status: ok, degraded, offline")
56
+ snapshot_age_hours: Optional[float] = Field(None, description="Hours since snapshot was generated")
57
+ message: Optional[str] = Field(None, description="Human-readable status message")
58
 
59
  class Config:
60
  json_schema_extra = {
 
76
  "missing_days": 0,
77
  "coverage_pct": 100
78
  },
79
+ "generated_at": "2026-01-02T09:00:00Z",
80
+ "quality_state": "ok",
81
+ "model_state": "ok",
82
+ "snapshot_age_hours": 2.5
83
  }
84
  }
85
 
 
118
  timestamp: str = Field(..., description="Current server timestamp")
119
  news_count: Optional[int] = Field(None, description="Total news articles in database")
120
  price_bars_count: Optional[int] = Field(None, description="Total price bars in database")
121
+
122
+ # Faz 1: Queue and snapshot observability
123
+ redis_ok: Optional[bool] = Field(None, description="Redis queue connectivity")
124
+ last_snapshot_age_seconds: Optional[int] = Field(None, description="Age of last analysis snapshot in seconds")
125
 
126
  class Config:
127
  json_schema_extra = {
128
  "example": {
129
  "status": "healthy",
130
+ "db_type": "postgresql",
131
  "models_found": 1,
132
  "pipeline_locked": False,
133
  "timestamp": "2026-01-02T10:00:00Z",
134
  "news_count": 1250,
135
+ "price_bars_count": 1460,
136
+ "redis_ok": True,
137
+ "last_snapshot_age_seconds": 3600
138
  }
139
  }
140
 
app/settings.py CHANGED
@@ -57,10 +57,14 @@ class Settings(BaseSettings):
57
  # Futures vs Spot adjustment factor
58
  futures_spot_adjustment: float = 0.985
59
 
60
- # Scheduler
 
61
  schedule_time: str = "02:00"
62
  tz: str = "Europe/Istanbul"
63
- scheduler_enabled: bool = True
 
 
 
64
 
65
  # OpenRouter AI Commentary
66
  openrouter_api_key: Optional[str] = None
@@ -75,6 +79,12 @@ class Settings(BaseSettings):
75
  # Pipeline trigger authentication
76
  pipeline_trigger_secret: Optional[str] = None
77
 
 
 
 
 
 
 
78
  def _load_symbol_set_file(self, set_name: str) -> Optional[dict]:
79
  """Load symbol set from JSON file. Returns None on error."""
80
  try:
 
57
  # Futures vs Spot adjustment factor
58
  futures_spot_adjustment: float = 0.985
59
 
60
+ # Scheduler (DEPRECATED in API - external scheduler only)
61
+ # These are kept for backward compatibility but scheduler no longer runs in API
62
  schedule_time: str = "02:00"
63
  tz: str = "Europe/Istanbul"
64
+ scheduler_enabled: bool = False # Default to False - scheduler is external now
65
+
66
+ # Redis Queue (for worker)
67
+ redis_url: str = "redis://localhost:6379/0"
68
 
69
  # OpenRouter AI Commentary
70
  openrouter_api_key: Optional[str] = None
 
79
  # Pipeline trigger authentication
80
  pipeline_trigger_secret: Optional[str] = None
81
 
82
+ # Faz 2: Market cut-off for news aggregation
83
+ # Defines when "today's news" ends for sentiment calculation
84
+ market_timezone: str = "America/New_York" # NYSE timezone
85
+ market_close_time: str = "16:00" # 4 PM ET
86
+ cutoff_buffer_minutes: int = 30 # Allow 30 min after close for late news
87
+
88
  def _load_symbol_set_file(self, set_name: str) -> Optional[dict]:
89
  """Load symbol set from JSON file. Returns None on error."""
90
  try:
migrations/001_add_news_raw_processed.sql ADDED
@@ -0,0 +1,121 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Migration: 001_add_news_raw_processed.sql
2
+ -- Faz 2: Ham/İşlenmiş haber tabloları + reproducible news pipeline
3
+ --
4
+ -- Run on: Supabase PostgreSQL
5
+ -- Date: 2026-01-28
6
+ --
7
+ -- IMPORTANT: Run this migration BEFORE deploying Faz 2 pipeline code.
8
+
9
+ -- =============================================================================
10
+ -- 1. news_raw - Ham haber verisi (golden source)
11
+ -- =============================================================================
12
+
13
+ CREATE TABLE IF NOT EXISTS news_raw (
14
+ id BIGSERIAL PRIMARY KEY,
15
+
16
+ -- URL (nullable - RSS'te eksik olabilir)
17
+ url VARCHAR(2000),
18
+ url_hash VARCHAR(64), -- sha256, nullable for partial unique
19
+
20
+ -- Content
21
+ title VARCHAR(500) NOT NULL,
22
+ description TEXT,
23
+
24
+ -- Metadata
25
+ source VARCHAR(200), -- "google_news", "newsapi", etc.
26
+ source_feed VARCHAR(500), -- Exact RSS URL or API query
27
+ published_at TIMESTAMPTZ NOT NULL,
28
+ fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
29
+
30
+ -- Pipeline run tracking
31
+ run_id UUID,
32
+
33
+ -- Raw payload for debugging
34
+ raw_payload JSONB
35
+ );
36
+
37
+ -- Basic indexes
38
+ CREATE INDEX IF NOT EXISTS ix_news_raw_published ON news_raw(published_at);
39
+ CREATE INDEX IF NOT EXISTS ix_news_raw_run ON news_raw(run_id);
40
+ CREATE INDEX IF NOT EXISTS ix_news_raw_url_hash ON news_raw(url_hash);
41
+
42
+ -- PARTIAL UNIQUE INDEX: url_hash must be unique IF it exists
43
+ -- This allows NULL url_hash (for articles without URL) while preventing duplicates
44
+ CREATE UNIQUE INDEX IF NOT EXISTS ux_news_raw_url_hash
45
+ ON news_raw(url_hash)
46
+ WHERE url_hash IS NOT NULL;
47
+
48
+
49
+ -- =============================================================================
50
+ -- 2. news_processed - İşlenmiş haber (dedup authority)
51
+ -- =============================================================================
52
+
53
+ CREATE TABLE IF NOT EXISTS news_processed (
54
+ id BIGSERIAL PRIMARY KEY,
55
+
56
+ -- FK to raw (RESTRICT - don't allow deleting raw if processed exists)
57
+ raw_id BIGINT NOT NULL REFERENCES news_raw(id) ON DELETE RESTRICT,
58
+
59
+ -- Canonical content
60
+ canonical_title VARCHAR(500) NOT NULL,
61
+ canonical_title_hash VARCHAR(64) NOT NULL, -- sha256
62
+ cleaned_text TEXT, -- title + description, cleaned
63
+
64
+ -- Dedup key - THE AUTHORITY
65
+ -- Priority: url_hash if available, else sha256(source + canonical_title_hash)
66
+ dedup_key VARCHAR(64) NOT NULL UNIQUE,
67
+
68
+ -- Language
69
+ language VARCHAR(10) DEFAULT 'en',
70
+ language_confidence FLOAT,
71
+
72
+ -- Processing metadata
73
+ processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
74
+ run_id UUID
75
+ );
76
+
77
+ -- Indexes
78
+ CREATE INDEX IF NOT EXISTS ix_news_processed_raw_id ON news_processed(raw_id);
79
+ CREATE INDEX IF NOT EXISTS ix_news_processed_run ON news_processed(run_id);
80
+ CREATE INDEX IF NOT EXISTS ix_news_processed_title_hash ON news_processed(canonical_title_hash);
81
+
82
+
83
+ -- =============================================================================
84
+ -- 3. Add Faz 2 columns to pipeline_run_metrics
85
+ -- =============================================================================
86
+
87
+ -- Cut-off time
88
+ ALTER TABLE pipeline_run_metrics
89
+ ADD COLUMN IF NOT EXISTS news_cutoff_time TIMESTAMPTZ;
90
+
91
+ -- Raw stats
92
+ ALTER TABLE pipeline_run_metrics
93
+ ADD COLUMN IF NOT EXISTS news_raw_inserted INTEGER;
94
+
95
+ ALTER TABLE pipeline_run_metrics
96
+ ADD COLUMN IF NOT EXISTS news_raw_duplicates INTEGER;
97
+
98
+ -- Processed stats
99
+ ALTER TABLE pipeline_run_metrics
100
+ ADD COLUMN IF NOT EXISTS news_processed_inserted INTEGER;
101
+
102
+ ALTER TABLE pipeline_run_metrics
103
+ ADD COLUMN IF NOT EXISTS news_processed_duplicates INTEGER;
104
+
105
+ -- Quality state for degraded runs
106
+ ALTER TABLE pipeline_run_metrics
107
+ ADD COLUMN IF NOT EXISTS quality_state VARCHAR(20) DEFAULT 'ok';
108
+
109
+
110
+ -- =============================================================================
111
+ -- Verification queries (run after migration to verify)
112
+ -- =============================================================================
113
+
114
+ -- Check tables exist:
115
+ -- SELECT table_name FROM information_schema.tables WHERE table_name IN ('news_raw', 'news_processed');
116
+
117
+ -- Check partial unique index:
118
+ -- SELECT indexname FROM pg_indexes WHERE tablename = 'news_raw' AND indexname = 'ux_news_raw_url_hash';
119
+
120
+ -- Check FK constraint:
121
+ -- SELECT conname FROM pg_constraint WHERE conrelid = 'news_processed'::regclass AND confrelid = 'news_raw'::regclass;
pipelines/__init__.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Pipelines package for Faz 2 reproducible data processing.
3
+
4
+ Modules:
5
+ - ingestion/news: RSS/API -> news_raw
6
+ - processing/news: news_raw -> news_processed
7
+ - cutoff: Market cut-off calculation
8
+ """
pipelines/cutoff.py ADDED
@@ -0,0 +1,186 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Market cut-off calculation for news aggregation.
3
+
4
+ Faz 2: Defines which news articles belong to "today's" sentiment.
5
+ Uses market close time with buffer to determine cut-off.
6
+ """
7
+
8
+ import logging
9
+ from datetime import datetime, time, timedelta, timezone
10
+ from typing import Optional
11
+
12
+ try:
13
+ from zoneinfo import ZoneInfo
14
+ except ImportError:
15
+ from backports.zoneinfo import ZoneInfo # Python < 3.9
16
+
17
+ from app.settings import get_settings
18
+
19
+ logger = logging.getLogger(__name__)
20
+
21
+
22
+ def compute_news_cutoff(
23
+ run_datetime: Optional[datetime] = None,
24
+ market_tz: Optional[str] = None,
25
+ market_close: Optional[str] = None,
26
+ buffer_minutes: Optional[int] = None,
27
+ ) -> datetime:
28
+ """
29
+ Compute news cut-off datetime for a pipeline run.
30
+
31
+ Logic:
32
+ 1. Convert run_datetime to market timezone
33
+ 2. Calculate today's close + buffer
34
+ 3. If run is before today's close+buffer, use yesterday's close+buffer
35
+ 4. If run is on weekend, roll back to Friday
36
+
37
+ Args:
38
+ run_datetime: When pipeline started (UTC). Defaults to now.
39
+ market_tz: Market timezone (e.g., "America/New_York"). Defaults to settings.
40
+ market_close: Market close time "HH:MM". Defaults to settings.
41
+ buffer_minutes: Minutes after close to allow. Defaults to settings.
42
+
43
+ Returns:
44
+ Cut-off datetime in UTC
45
+
46
+ Example:
47
+ Pipeline runs at 2026-01-28 10:00 UTC (05:00 ET)
48
+ → Before 16:30 ET → use 2026-01-27 16:30 ET → 2026-01-27 21:30 UTC
49
+
50
+ Pipeline runs at 2026-01-28 22:00 UTC (17:00 ET)
51
+ → After 16:30 ET → use 2026-01-28 16:30 ET → 2026-01-28 21:30 UTC
52
+ """
53
+ settings = get_settings()
54
+
55
+ # Defaults from settings
56
+ if run_datetime is None:
57
+ run_datetime = datetime.now(timezone.utc)
58
+ if market_tz is None:
59
+ market_tz = settings.market_timezone
60
+ if market_close is None:
61
+ market_close = settings.market_close_time
62
+ if buffer_minutes is None:
63
+ buffer_minutes = settings.cutoff_buffer_minutes
64
+
65
+ # Parse market close time
66
+ close_hour, close_minute = map(int, market_close.split(":"))
67
+ buffer = timedelta(minutes=buffer_minutes)
68
+
69
+ # Get timezone
70
+ tz = ZoneInfo(market_tz)
71
+
72
+ # Convert run_datetime to market timezone
73
+ if run_datetime.tzinfo is None:
74
+ run_datetime = run_datetime.replace(tzinfo=timezone.utc)
75
+ run_local = run_datetime.astimezone(tz)
76
+
77
+ # Today's close + buffer in market timezone
78
+ today_close = run_local.replace(
79
+ hour=close_hour,
80
+ minute=close_minute,
81
+ second=0,
82
+ microsecond=0,
83
+ ) + buffer
84
+
85
+ # Determine which day's close to use
86
+ if run_local >= today_close:
87
+ # After today's close+buffer → use today's close
88
+ cutoff_local = today_close
89
+ else:
90
+ # Before today's close+buffer → use yesterday's close
91
+ yesterday_close = today_close - timedelta(days=1)
92
+ cutoff_local = yesterday_close
93
+
94
+ # Weekend guard: roll back to Friday if cutoff falls on weekend
95
+ cutoff_local = _adjust_for_weekend(cutoff_local)
96
+
97
+ # Convert back to UTC
98
+ cutoff_utc = cutoff_local.astimezone(timezone.utc)
99
+
100
+ logger.debug(
101
+ f"Cut-off computed: run={run_datetime.isoformat()}, "
102
+ f"cutoff={cutoff_utc.isoformat()} (local: {cutoff_local.isoformat()})"
103
+ )
104
+
105
+ return cutoff_utc
106
+
107
+
108
+ def _adjust_for_weekend(dt: datetime) -> datetime:
109
+ """
110
+ Adjust datetime to Friday if it falls on weekend.
111
+
112
+ Args:
113
+ dt: Datetime to adjust
114
+
115
+ Returns:
116
+ Adjusted datetime (Friday if input was Sat/Sun)
117
+ """
118
+ weekday = dt.weekday() # 0=Mon, 5=Sat, 6=Sun
119
+
120
+ if weekday == 5: # Saturday
121
+ return dt - timedelta(days=1) # Roll back to Friday
122
+ elif weekday == 6: # Sunday
123
+ return dt - timedelta(days=2) # Roll back to Friday
124
+
125
+ return dt
126
+
127
+
128
+ def get_news_window(
129
+ cutoff_dt: datetime,
130
+ lookback_days: int = 7,
131
+ ) -> tuple[datetime, datetime]:
132
+ """
133
+ Get the time window for news aggregation.
134
+
135
+ Args:
136
+ cutoff_dt: Cut-off datetime (latest news to include)
137
+ lookback_days: How many days back to look
138
+
139
+ Returns:
140
+ Tuple of (start_dt, end_dt) for news query
141
+ """
142
+ end_dt = cutoff_dt
143
+ start_dt = cutoff_dt - timedelta(days=lookback_days)
144
+
145
+ return (start_dt, end_dt)
146
+
147
+
148
+ def is_market_open(
149
+ dt: Optional[datetime] = None,
150
+ market_tz: Optional[str] = None,
151
+ ) -> bool:
152
+ """
153
+ Check if market is currently open (approximate).
154
+
155
+ Note: Does not account for holidays, just weekdays 9:30-16:00 ET.
156
+
157
+ Args:
158
+ dt: Datetime to check. Defaults to now.
159
+ market_tz: Market timezone. Defaults to settings.
160
+
161
+ Returns:
162
+ True if market is likely open
163
+ """
164
+ settings = get_settings()
165
+
166
+ if dt is None:
167
+ dt = datetime.now(timezone.utc)
168
+ if market_tz is None:
169
+ market_tz = settings.market_timezone
170
+
171
+ tz = ZoneInfo(market_tz)
172
+
173
+ if dt.tzinfo is None:
174
+ dt = dt.replace(tzinfo=timezone.utc)
175
+ local = dt.astimezone(tz)
176
+
177
+ # Weekend
178
+ if local.weekday() >= 5:
179
+ return False
180
+
181
+ # Market hours (approximate: 9:30 - 16:00)
182
+ market_open = time(9, 30)
183
+ market_close = time(16, 0)
184
+
185
+ current_time = local.time()
186
+ return market_open <= current_time <= market_close
pipelines/ingestion/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Ingestion subpackage - data source fetching."""
pipelines/ingestion/news.py ADDED
@@ -0,0 +1,271 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ News ingestion to news_raw table.
3
+
4
+ Faz 2: Reproducible news pipeline - first stage.
5
+ Fetches from RSS/API and stores raw data for audit trail.
6
+ """
7
+
8
+ import hashlib
9
+ import logging
10
+ import uuid
11
+ from datetime import datetime, timezone
12
+ from typing import Optional
13
+
14
+ from sqlalchemy import text
15
+ from sqlalchemy.dialects.postgresql import insert as pg_insert
16
+ from sqlalchemy.orm import Session
17
+
18
+ from app.models import NewsRaw
19
+ from app.settings import get_settings
20
+ from app.utils import normalize_url, clean_text
21
+ from app.rss_ingest import fetch_google_news
22
+ from app.db import get_db_type
23
+
24
+ logger = logging.getLogger(__name__)
25
+
26
+
27
+ def compute_url_hash(url: Optional[str]) -> Optional[str]:
28
+ """
29
+ Compute deterministic hash of normalized URL.
30
+
31
+ Args:
32
+ url: Raw URL string (may be None or empty)
33
+
34
+ Returns:
35
+ sha256 hex64 of normalized URL, or None if URL is empty/invalid
36
+ """
37
+ if not url or not url.strip():
38
+ return None
39
+
40
+ normalized = normalize_url(url)
41
+ if not normalized:
42
+ return None
43
+
44
+ return hashlib.sha256(normalized.encode()).hexdigest()
45
+
46
+
47
+ def insert_raw_article(
48
+ session: Session,
49
+ url: Optional[str],
50
+ title: str,
51
+ description: Optional[str],
52
+ source: str,
53
+ source_feed: str,
54
+ published_at: datetime,
55
+ run_id: uuid.UUID,
56
+ raw_payload: Optional[dict] = None,
57
+ ) -> Optional[int]:
58
+ """
59
+ Insert single article to news_raw.
60
+
61
+ Uses ON CONFLICT DO NOTHING for url_hash to handle duplicates gracefully.
62
+
63
+ Args:
64
+ session: Database session
65
+ url: Article URL (can be None)
66
+ title: Article title
67
+ description: Article description
68
+ source: Source name (e.g., "google_news", "newsapi")
69
+ source_feed: Exact feed URL or query string
70
+ published_at: Publication timestamp (UTC)
71
+ run_id: Pipeline run UUID
72
+ raw_payload: Original response fragment for debugging
73
+
74
+ Returns:
75
+ raw_id if inserted, None if duplicate or error
76
+ """
77
+ if not title or not title.strip():
78
+ return None
79
+
80
+ title = clean_text(title)[:500] # Truncate to column limit
81
+ url_hash = compute_url_hash(url)
82
+
83
+ try:
84
+ db_type = get_db_type()
85
+
86
+ if db_type == "postgresql":
87
+ # Use INSERT ... ON CONFLICT for PostgreSQL
88
+ stmt = pg_insert(NewsRaw).values(
89
+ url=url,
90
+ url_hash=url_hash,
91
+ title=title,
92
+ description=description[:2000] if description else None,
93
+ source=source,
94
+ source_feed=source_feed[:500] if source_feed else None,
95
+ published_at=published_at,
96
+ run_id=run_id,
97
+ raw_payload=raw_payload,
98
+ )
99
+
100
+ # Only conflict on url_hash if it's not None
101
+ if url_hash:
102
+ stmt = stmt.on_conflict_do_nothing(index_elements=["url_hash"])
103
+
104
+ result = session.execute(stmt)
105
+
106
+ if result.rowcount > 0:
107
+ # Get the inserted ID
108
+ # For PostgreSQL, we need to query it
109
+ row = session.execute(
110
+ text("SELECT id FROM news_raw WHERE url_hash = :hash ORDER BY id DESC LIMIT 1"),
111
+ {"hash": url_hash}
112
+ ).fetchone()
113
+ return row[0] if row else None
114
+
115
+ return None # Duplicate
116
+
117
+ else:
118
+ # SQLite fallback - simple insert with error handling
119
+ article = NewsRaw(
120
+ url=url,
121
+ url_hash=url_hash,
122
+ title=title,
123
+ description=description[:2000] if description else None,
124
+ source=source,
125
+ source_feed=source_feed[:500] if source_feed else None,
126
+ published_at=published_at,
127
+ run_id=run_id,
128
+ raw_payload=raw_payload,
129
+ )
130
+ session.add(article)
131
+ session.flush()
132
+ return article.id
133
+
134
+ except Exception as e:
135
+ logger.debug(f"Insert raw article failed: {e}")
136
+ session.rollback()
137
+ return None
138
+
139
+
140
+ def ingest_news_to_raw(
141
+ session: Session,
142
+ run_id: uuid.UUID,
143
+ sources: Optional[list[str]] = None,
144
+ ) -> dict:
145
+ """
146
+ Ingest news from all sources into news_raw.
147
+
148
+ Currently supports:
149
+ - google_news: RSS feed from Google News
150
+ - newsapi: NewsAPI.org (if API key configured)
151
+
152
+ Args:
153
+ session: Database session
154
+ run_id: Pipeline run UUID
155
+ sources: List of source types to fetch (default: all configured)
156
+
157
+ Returns:
158
+ dict with stats:
159
+ - fetched: Total items fetched from sources
160
+ - inserted: New items inserted to news_raw
161
+ - duplicates: Skipped due to url_hash conflict
162
+ - errors: Items that failed to insert
163
+ """
164
+ settings = get_settings()
165
+ sources = sources or ["google_news"]
166
+
167
+ # Add newsapi if key is configured
168
+ if settings.newsapi_key and "newsapi" not in sources:
169
+ sources.append("newsapi")
170
+
171
+ stats = {
172
+ "fetched": 0,
173
+ "inserted": 0,
174
+ "duplicates": 0,
175
+ "errors": 0,
176
+ "sources": sources,
177
+ }
178
+
179
+ # Strategic queries for copper market
180
+ QUERIES = [
181
+ "copper supply deficit",
182
+ "copper price forecast",
183
+ "copper mining production",
184
+ "copper demand China",
185
+ "copper EV battery",
186
+ "Freeport-McMoRan copper",
187
+ "BHP copper",
188
+ "Rio Tinto copper",
189
+ ]
190
+
191
+ logger.info(f"[run_id={run_id}] Ingesting news from {sources} with {len(QUERIES)} queries")
192
+
193
+ for source in sources:
194
+ if source == "google_news":
195
+ for query in QUERIES:
196
+ try:
197
+ articles = fetch_google_news(
198
+ query=query,
199
+ language=settings.news_language,
200
+ )
201
+
202
+ stats["fetched"] += len(articles)
203
+
204
+ for article in articles:
205
+ raw_id = insert_raw_article(
206
+ session=session,
207
+ url=article.get("url"),
208
+ title=article.get("title", ""),
209
+ description=article.get("description"),
210
+ source="google_news",
211
+ source_feed=f"google_news:{query}",
212
+ published_at=article.get("published_at", datetime.now(timezone.utc)),
213
+ run_id=run_id,
214
+ raw_payload={"query": query, "source": article.get("source")},
215
+ )
216
+
217
+ if raw_id:
218
+ stats["inserted"] += 1
219
+ else:
220
+ stats["duplicates"] += 1
221
+
222
+ except Exception as e:
223
+ logger.warning(f"Error fetching {source} for '{query}': {e}")
224
+ stats["errors"] += 1
225
+
226
+ elif source == "newsapi" and settings.newsapi_key:
227
+ # NewsAPI implementation - reuse existing fetch
228
+ from app.data_manager import fetch_newsapi_articles
229
+
230
+ for query in QUERIES[:3]: # Limit API calls
231
+ try:
232
+ articles = fetch_newsapi_articles(
233
+ api_key=settings.newsapi_key,
234
+ query=query,
235
+ language=settings.news_language,
236
+ lookback_days=settings.lookback_days,
237
+ )
238
+
239
+ stats["fetched"] += len(articles)
240
+
241
+ for article in articles:
242
+ raw_id = insert_raw_article(
243
+ session=session,
244
+ url=article.get("url"),
245
+ title=article.get("title", ""),
246
+ description=article.get("description"),
247
+ source="newsapi",
248
+ source_feed=f"newsapi:{query}",
249
+ published_at=article.get("published_at", datetime.now(timezone.utc)),
250
+ run_id=run_id,
251
+ raw_payload={"query": query, "author": article.get("author")},
252
+ )
253
+
254
+ if raw_id:
255
+ stats["inserted"] += 1
256
+ else:
257
+ stats["duplicates"] += 1
258
+
259
+ except Exception as e:
260
+ logger.warning(f"Error fetching newsapi for '{query}': {e}")
261
+ stats["errors"] += 1
262
+
263
+ session.commit()
264
+
265
+ logger.info(
266
+ f"[run_id={run_id}] News ingestion complete: "
267
+ f"{stats['fetched']} fetched, {stats['inserted']} inserted, "
268
+ f"{stats['duplicates']} duplicates"
269
+ )
270
+
271
+ return stats
pipelines/processing/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Processing subpackage - data transformation and dedup."""
pipelines/processing/news.py ADDED
@@ -0,0 +1,221 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Process news_raw -> news_processed with deterministic dedup.
3
+
4
+ Faz 2: Reproducible news pipeline - second stage.
5
+ Applies canonicalization, language detection, and deterministic dedup.
6
+ """
7
+
8
+ import hashlib
9
+ import logging
10
+ import uuid
11
+ from datetime import datetime, timezone
12
+ from typing import Optional
13
+
14
+ from sqlalchemy import text
15
+ from sqlalchemy.dialects.postgresql import insert as pg_insert
16
+ from sqlalchemy.orm import Session
17
+
18
+ from app.models import NewsRaw, NewsProcessed
19
+ from app.utils import canonical_title, clean_text
20
+ from app.db import get_db_type
21
+
22
+ logger = logging.getLogger(__name__)
23
+
24
+
25
+ def compute_canonical_title_hash(title: str) -> str:
26
+ """
27
+ Compute hash of canonical title.
28
+
29
+ Args:
30
+ title: Raw title string
31
+
32
+ Returns:
33
+ sha256 hex64 of canonical_title(title)
34
+ """
35
+ canon = canonical_title(title)
36
+ return hashlib.sha256(canon.encode()).hexdigest()
37
+
38
+
39
+ def compute_dedup_key(
40
+ url_hash: Optional[str],
41
+ source: str,
42
+ canonical_title_hash: str,
43
+ ) -> str:
44
+ """
45
+ Compute deterministic dedup key.
46
+
47
+ Priority:
48
+ 1. url_hash if not None (URL is the best identifier)
49
+ 2. sha256(source + "|" + canonical_title_hash) as fallback
50
+
51
+ Args:
52
+ url_hash: Hash of normalized URL (may be None)
53
+ source: Source name (e.g., "google_news")
54
+ canonical_title_hash: Hash of canonical title
55
+
56
+ Returns:
57
+ sha256 hex64 dedup key
58
+ """
59
+ if url_hash:
60
+ return url_hash
61
+
62
+ # Fallback: combine source + title hash
63
+ fallback_input = f"{source}|{canonical_title_hash}"
64
+ return hashlib.sha256(fallback_input.encode()).hexdigest()
65
+
66
+
67
+ def process_single_raw(
68
+ session: Session,
69
+ raw: NewsRaw,
70
+ run_id: uuid.UUID,
71
+ ) -> Optional[int]:
72
+ """
73
+ Process a single NewsRaw into NewsProcessed.
74
+
75
+ Args:
76
+ session: Database session
77
+ raw: NewsRaw object to process
78
+ run_id: Pipeline run UUID
79
+
80
+ Returns:
81
+ processed_id if inserted, None if duplicate
82
+ """
83
+ # Canonicalize
84
+ canon = canonical_title(raw.title)
85
+ canon_hash = compute_canonical_title_hash(raw.title)
86
+
87
+ # Clean text (title + description)
88
+ cleaned = clean_text(raw.title)
89
+ if raw.description:
90
+ cleaned += " " + clean_text(raw.description)
91
+ cleaned = cleaned[:5000] # Reasonable limit
92
+
93
+ # Compute dedup key
94
+ dedup = compute_dedup_key(
95
+ url_hash=raw.url_hash,
96
+ source=raw.source or "unknown",
97
+ canonical_title_hash=canon_hash,
98
+ )
99
+
100
+ # Detect language (optional, use simple heuristic for now)
101
+ # Full langdetect is slow; Faz 3 can improve this
102
+ language = "en" # Assume English for now
103
+ language_confidence = None
104
+
105
+ try:
106
+ db_type = get_db_type()
107
+
108
+ if db_type == "postgresql":
109
+ stmt = pg_insert(NewsProcessed).values(
110
+ raw_id=raw.id,
111
+ canonical_title=canon[:500],
112
+ canonical_title_hash=canon_hash,
113
+ cleaned_text=cleaned,
114
+ dedup_key=dedup,
115
+ language=language,
116
+ language_confidence=language_confidence,
117
+ run_id=run_id,
118
+ ).on_conflict_do_nothing(index_elements=["dedup_key"])
119
+
120
+ result = session.execute(stmt)
121
+
122
+ if result.rowcount > 0:
123
+ return raw.id # Successfully inserted
124
+ return None # Duplicate
125
+
126
+ else:
127
+ # SQLite fallback
128
+ processed = NewsProcessed(
129
+ raw_id=raw.id,
130
+ canonical_title=canon[:500],
131
+ canonical_title_hash=canon_hash,
132
+ cleaned_text=cleaned,
133
+ dedup_key=dedup,
134
+ language=language,
135
+ language_confidence=language_confidence,
136
+ run_id=run_id,
137
+ )
138
+ session.add(processed)
139
+ session.flush()
140
+ return processed.id
141
+
142
+ except Exception as e:
143
+ logger.debug(f"Process raw article failed: {e}")
144
+ session.rollback()
145
+ return None
146
+
147
+
148
+ def process_raw_to_processed(
149
+ session: Session,
150
+ run_id: uuid.UUID,
151
+ batch_size: int = 100,
152
+ ) -> dict:
153
+ """
154
+ Process unprocessed raw articles.
155
+
156
+ Finds news_raw records that don't have corresponding news_processed,
157
+ canonicalizes them, and inserts to news_processed with dedup.
158
+
159
+ Args:
160
+ session: Database session
161
+ run_id: Pipeline run UUID
162
+ batch_size: Number of records to process per batch
163
+
164
+ Returns:
165
+ dict with stats:
166
+ - processed: Total items attempted
167
+ - inserted: New items in news_processed
168
+ - duplicates: Skipped due to dedup_key conflict
169
+ """
170
+ stats = {
171
+ "processed": 0,
172
+ "inserted": 0,
173
+ "duplicates": 0,
174
+ }
175
+
176
+ # Find unprocessed raw articles
177
+ # LEFT JOIN to find raw records without processed counterparts
178
+ unprocessed_query = (
179
+ session.query(NewsRaw)
180
+ .outerjoin(NewsProcessed, NewsRaw.id == NewsProcessed.raw_id)
181
+ .filter(NewsProcessed.id.is_(None))
182
+ .order_by(NewsRaw.id)
183
+ )
184
+
185
+ total = unprocessed_query.count()
186
+ logger.info(f"[run_id={run_id}] Found {total} unprocessed raw articles")
187
+
188
+ if total == 0:
189
+ return stats
190
+
191
+ # Process in batches
192
+ offset = 0
193
+ while True:
194
+ batch = unprocessed_query.limit(batch_size).offset(offset).all()
195
+
196
+ if not batch:
197
+ break
198
+
199
+ for raw in batch:
200
+ stats["processed"] += 1
201
+
202
+ result = process_single_raw(session, raw, run_id)
203
+
204
+ if result:
205
+ stats["inserted"] += 1
206
+ else:
207
+ stats["duplicates"] += 1
208
+
209
+ session.commit()
210
+ offset += batch_size
211
+
212
+ if offset >= total:
213
+ break
214
+
215
+ logger.info(
216
+ f"[run_id={run_id}] Processing complete: "
217
+ f"{stats['processed']} processed, {stats['inserted']} inserted, "
218
+ f"{stats['duplicates']} duplicates"
219
+ )
220
+
221
+ return stats
requirements.txt CHANGED
@@ -34,6 +34,10 @@ beautifulsoup4>=4.12.2
34
  # Scheduling
35
  apscheduler>=3.10.4
36
 
 
 
 
 
37
  # Utilities
38
  python-dateutil>=2.8.2
39
  filelock>=3.13.1
 
34
  # Scheduling
35
  apscheduler>=3.10.4
36
 
37
+ # Queue (arq + Redis)
38
+ arq>=0.25.0
39
+ redis>=5.0.0
40
+
41
  # Utilities
42
  python-dateutil>=2.8.2
43
  filelock>=3.13.1
supervisord.conf ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [supervisord]
2
+ nodaemon=true
3
+ logfile=/dev/null
4
+ logfile_maxbytes=0
5
+
6
+ [program:redis]
7
+ command=redis-server --save "" --appendonly no --bind 127.0.0.1 --port 6379
8
+ autorestart=true
9
+ startsecs=1
10
+ priority=1
11
+ stdout_logfile=/dev/stdout
12
+ stdout_logfile_maxbytes=0
13
+ stderr_logfile=/dev/stderr
14
+ stderr_logfile_maxbytes=0
15
+
16
+ [program:api]
17
+ directory=/code
18
+ command=python -m uvicorn app.main:app --host 0.0.0.0 --port 7860
19
+ autorestart=true
20
+ startsecs=3
21
+ priority=10
22
+ stdout_logfile=/dev/stdout
23
+ stdout_logfile_maxbytes=0
24
+ stderr_logfile=/dev/stderr
25
+ stderr_logfile_maxbytes=0
26
+ environment=REDIS_URL="redis://127.0.0.1:6379/0",PYTHONPATH="/code"
27
+
28
+ [program:worker]
29
+ directory=/code
30
+ command=python -m worker.runner
31
+ autorestart=true
32
+ startsecs=5
33
+ priority=20
34
+ stdout_logfile=/dev/stdout
35
+ stdout_logfile_maxbytes=0
36
+ stderr_logfile=/dev/stderr
37
+ stderr_logfile_maxbytes=0
38
+ environment=REDIS_URL="redis://127.0.0.1:6379/0",PYTHONPATH="/code"
worker/__init__.py ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ """
2
+ Worker service for Terra Rara pipeline execution.
3
+ Consumes jobs from Redis queue via arq.
4
+ """
worker/runner.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ arq Worker Runner.
3
+
4
+ This is the entrypoint for the worker process.
5
+ Run with: python -m worker.runner
6
+
7
+ The worker:
8
+ - Consumes jobs from Redis queue
9
+ - Executes pipeline tasks
10
+ - Has NO scheduler - scheduling is external (GitHub Actions, cron, etc.)
11
+ """
12
+
13
+ import logging
14
+ import os
15
+ import sys
16
+
17
+ # Add backend to path for imports
18
+ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
19
+
20
+ from arq import run_worker
21
+
22
+ from adapters.queue.redis import get_redis_settings
23
+ from worker.tasks import run_pipeline, startup, shutdown
24
+
25
+ # Configure logging
26
+ logging.basicConfig(
27
+ level=logging.INFO,
28
+ format="%(asctime)s - %(levelname)s - %(name)s - [worker] - %(message)s"
29
+ )
30
+ logger = logging.getLogger(__name__)
31
+
32
+
33
+ class WorkerSettings:
34
+ """
35
+ arq worker settings.
36
+
37
+ This class is discovered by arq when running:
38
+ arq worker.runner.WorkerSettings
39
+ """
40
+
41
+ # Redis connection
42
+ redis_settings = get_redis_settings()
43
+
44
+ # Task functions
45
+ functions = [run_pipeline]
46
+
47
+ # Lifecycle hooks
48
+ on_startup = startup
49
+ on_shutdown = shutdown
50
+
51
+ # Job settings
52
+ max_jobs = 1 # Only one pipeline at a time per worker
53
+ job_timeout = 3600 # 1 hour max
54
+ max_tries = 1 # No automatic retries - cron will retry next cycle
55
+
56
+ # Health check
57
+ health_check_interval = 30
58
+
59
+
60
+ def main():
61
+ """Run the worker."""
62
+ logger.info("Starting Terra Rara worker...")
63
+ logger.info(f"Redis: {WorkerSettings.redis_settings.host}:{WorkerSettings.redis_settings.port}")
64
+
65
+ # Run worker (blocking)
66
+ run_worker(WorkerSettings)
67
+
68
+
69
+ if __name__ == "__main__":
70
+ main()
worker/tasks.py ADDED
@@ -0,0 +1,522 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Worker tasks for arq.
3
+
4
+ This module defines the tasks that the worker executes.
5
+ The main task is `run_pipeline` which orchestrates the entire pipeline.
6
+
7
+ Faz 2: Integrated news_raw/news_processed pipeline with proper
8
+ commit boundaries, metrics tracking, and degraded mode handling.
9
+ """
10
+
11
+ import logging
12
+ import os
13
+ import socket
14
+ import uuid
15
+ from datetime import datetime, timezone
16
+ from typing import Any, Optional
17
+
18
+ from sqlalchemy.orm import Session
19
+
20
+ # These imports will be updated as we refactor
21
+ import sys
22
+ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
23
+
24
+ from app.db import SessionLocal, init_db, get_db_type
25
+ from app.settings import get_settings
26
+ from app.models import PipelineRunMetrics
27
+ from adapters.db.lock import (
28
+ PIPELINE_LOCK_KEY,
29
+ try_acquire_lock,
30
+ release_lock,
31
+ write_lock_visibility,
32
+ clear_lock_visibility,
33
+ )
34
+
35
+ logger = logging.getLogger(__name__)
36
+
37
+
38
+ # =============================================================================
39
+ # Helper functions for metrics tracking
40
+ # =============================================================================
41
+
42
+ def create_run_metrics(
43
+ session: Session,
44
+ run_id: str,
45
+ started_at: datetime,
46
+ ) -> PipelineRunMetrics:
47
+ """Create initial pipeline_run_metrics record."""
48
+ metrics = PipelineRunMetrics(
49
+ run_id=run_id,
50
+ run_started_at=started_at,
51
+ status="running",
52
+ )
53
+ session.add(metrics)
54
+ session.flush()
55
+ return metrics
56
+
57
+
58
+ def update_run_metrics(
59
+ session: Session,
60
+ run_id: str,
61
+ **kwargs,
62
+ ) -> None:
63
+ """Update pipeline_run_metrics with new values."""
64
+ metrics = session.query(PipelineRunMetrics).filter(
65
+ PipelineRunMetrics.run_id == run_id
66
+ ).first()
67
+
68
+ if metrics:
69
+ for key, value in kwargs.items():
70
+ if hasattr(metrics, key):
71
+ setattr(metrics, key, value)
72
+ session.flush()
73
+
74
+
75
+ def finalize_run_metrics(
76
+ session: Session,
77
+ run_id: str,
78
+ status: str,
79
+ quality_state: str = "ok",
80
+ error_message: Optional[str] = None,
81
+ ) -> None:
82
+ """Finalize run metrics with completion status."""
83
+ completed_at = datetime.now(timezone.utc)
84
+
85
+ metrics = session.query(PipelineRunMetrics).filter(
86
+ PipelineRunMetrics.run_id == run_id
87
+ ).first()
88
+
89
+ if metrics:
90
+ metrics.run_completed_at = completed_at
91
+ metrics.status = status
92
+ metrics.quality_state = quality_state
93
+ if metrics.run_started_at:
94
+ metrics.duration_seconds = (completed_at - metrics.run_started_at).total_seconds()
95
+ if error_message:
96
+ metrics.error_message = error_message
97
+ session.flush()
98
+
99
+
100
+ # =============================================================================
101
+ # Main pipeline task
102
+ # =============================================================================
103
+
104
+ async def run_pipeline(
105
+ ctx: dict,
106
+ run_id: str,
107
+ train_model: bool = False,
108
+ trigger_source: str = "unknown",
109
+ enqueued_at: str = None,
110
+ ) -> dict:
111
+ """
112
+ Main pipeline task - executed by arq worker.
113
+
114
+ This is the ONLY entrypoint for pipeline execution.
115
+
116
+ Faz 2 Flow:
117
+ Stage 1a: News ingestion → news_raw
118
+ Stage 1b: Raw processing → news_processed
119
+ Stage 1c: Cut-off calculation
120
+ Stage 1d: Price ingestion
121
+ Stage 2: Sentiment scoring
122
+ Stage 3: Sentiment aggregation
123
+ Stage 4: Model training (optional)
124
+ Stage 5: Snapshot generation
125
+ Stage 6: Commentary generation
126
+
127
+ Args:
128
+ ctx: arq context (contains redis connection)
129
+ run_id: Unique identifier for this run
130
+ train_model: Whether to train the XGBoost model
131
+ trigger_source: Where the trigger came from (cron, manual, api)
132
+ enqueued_at: ISO timestamp when job was enqueued
133
+
134
+ Returns:
135
+ dict with run results
136
+ """
137
+ started_at = datetime.now(timezone.utc)
138
+ holder_id = f"{socket.gethostname()}:{os.getpid()}"
139
+ run_uuid = uuid.UUID(run_id) if isinstance(run_id, str) else run_id
140
+
141
+ logger.info(f"[run_id={run_id}] Pipeline starting: trigger={trigger_source}, train_model={train_model}")
142
+
143
+ # Initialize database
144
+ init_db()
145
+
146
+ # Get a dedicated session for this pipeline run
147
+ # IMPORTANT: This session holds the advisory lock
148
+ session: Session = get_session()
149
+ quality_state = "ok"
150
+ result = {}
151
+
152
+ try:
153
+ # 0. Create run metrics record
154
+ create_run_metrics(session, run_id, started_at)
155
+ session.commit()
156
+
157
+ # 1. Acquire distributed lock
158
+ if not try_acquire_lock(session, PIPELINE_LOCK_KEY):
159
+ logger.warning(f"[run_id={run_id}] Pipeline skipped: lock held by another process")
160
+ finalize_run_metrics(session, run_id, status="skipped_locked", quality_state="skipped")
161
+ session.commit()
162
+ return {
163
+ "run_id": run_id,
164
+ "status": "skipped_locked",
165
+ "message": "Another pipeline is running",
166
+ }
167
+
168
+ # Write lock visibility (best-effort)
169
+ write_lock_visibility(session, PIPELINE_LOCK_KEY, run_id, holder_id)
170
+ session.commit()
171
+
172
+ logger.info(f"[run_id={run_id}] Lock acquired, executing pipeline...")
173
+
174
+ # 2. Execute pipeline stages with proper commit boundaries
175
+ result = await _execute_pipeline_stages_v2(
176
+ session=session,
177
+ run_id=run_id,
178
+ run_uuid=run_uuid,
179
+ train_model=train_model,
180
+ )
181
+
182
+ # Determine quality state from result
183
+ # More nuanced logic to avoid false alarms
184
+ raw_inserted = result.get("news_raw_inserted", 0)
185
+ proc_inserted = result.get("news_processed_inserted", 0)
186
+ raw_error = result.get("news_raw_error")
187
+ proc_error = result.get("news_processed_error")
188
+
189
+ if raw_error or proc_error:
190
+ # Actual errors during ingestion/processing
191
+ quality_state = "degraded"
192
+ result["message"] = f"Pipeline errors: {raw_error or ''} {proc_error or ''}".strip()
193
+ elif raw_inserted == 0 and proc_inserted == 0:
194
+ # No new data at all - could be dedup working or sources haven't updated
195
+ quality_state = "stale"
196
+ result["message"] = "No new articles - sources may not have updated"
197
+ elif raw_inserted > 0 and proc_inserted == 0:
198
+ # Got raw but nothing processed - potential dedup anomaly
199
+ quality_state = "ok" # This is actually fine - all duplicates
200
+ result["message"] = f"All {raw_inserted} articles were duplicates"
201
+ else:
202
+ quality_state = "ok"
203
+
204
+ # 3. Record success
205
+ finished_at = datetime.now(timezone.utc)
206
+ duration = (finished_at - started_at).total_seconds()
207
+
208
+ finalize_run_metrics(
209
+ session, run_id,
210
+ status="success",
211
+ quality_state=quality_state,
212
+ )
213
+ session.commit()
214
+
215
+ logger.info(f"[run_id={run_id}] Pipeline completed in {duration:.1f}s")
216
+
217
+ return {
218
+ "run_id": run_id,
219
+ "status": "success",
220
+ "quality_state": quality_state,
221
+ "started_at": started_at.isoformat(),
222
+ "finished_at": finished_at.isoformat(),
223
+ "duration_seconds": duration,
224
+ "train_model": train_model,
225
+ **result,
226
+ }
227
+
228
+ except Exception as e:
229
+ logger.error(f"[run_id={run_id}] Pipeline failed: {e}", exc_info=True)
230
+
231
+ try:
232
+ finalize_run_metrics(
233
+ session, run_id,
234
+ status="failed",
235
+ quality_state="failed",
236
+ error_message=str(e)[:1000],
237
+ )
238
+ session.commit()
239
+ except Exception:
240
+ session.rollback()
241
+
242
+ return {
243
+ "run_id": run_id,
244
+ "status": "failed",
245
+ "error": str(e),
246
+ }
247
+
248
+ finally:
249
+ # Always release lock and cleanup
250
+ try:
251
+ release_lock(session, PIPELINE_LOCK_KEY)
252
+ clear_lock_visibility(session, PIPELINE_LOCK_KEY)
253
+ session.commit()
254
+ except Exception:
255
+ session.rollback()
256
+ finally:
257
+ session.close()
258
+
259
+
260
+ async def _execute_pipeline_stages_v2(
261
+ session: Session,
262
+ run_id: str,
263
+ run_uuid: uuid.UUID,
264
+ train_model: bool,
265
+ ) -> dict:
266
+ """
267
+ Execute pipeline stages with Faz 2 news pipeline integration.
268
+
269
+ Each stage has proper commit boundaries and metrics updates.
270
+ """
271
+ from app.settings import get_settings
272
+
273
+ settings = get_settings()
274
+ result = {}
275
+
276
+ # -------------------------------------------------------------------------
277
+ # Stage 1a: News ingestion → news_raw (FAZ 2)
278
+ # -------------------------------------------------------------------------
279
+ logger.info(f"[run_id={run_id}] Stage 1a: News ingestion → news_raw")
280
+ try:
281
+ from pipelines.ingestion.news import ingest_news_to_raw
282
+
283
+ raw_stats = ingest_news_to_raw(
284
+ session=session,
285
+ run_id=run_uuid,
286
+ )
287
+ session.commit()
288
+
289
+ result["news_raw_inserted"] = raw_stats.get("inserted", 0)
290
+ result["news_raw_duplicates"] = raw_stats.get("duplicates", 0)
291
+
292
+ update_run_metrics(
293
+ session, run_id,
294
+ news_raw_inserted=raw_stats.get("inserted", 0),
295
+ news_raw_duplicates=raw_stats.get("duplicates", 0),
296
+ )
297
+ session.commit()
298
+
299
+ logger.info(f"[run_id={run_id}] news_raw: {raw_stats.get('inserted', 0)} inserted")
300
+
301
+ except Exception as e:
302
+ logger.error(f"[run_id={run_id}] Stage 1a failed: {e}")
303
+ result["news_raw_error"] = str(e)
304
+ session.rollback()
305
+
306
+ # -------------------------------------------------------------------------
307
+ # Stage 1b: Raw → Processed (FAZ 2)
308
+ # -------------------------------------------------------------------------
309
+ logger.info(f"[run_id={run_id}] Stage 1b: news_raw → news_processed")
310
+ try:
311
+ from pipelines.processing.news import process_raw_to_processed
312
+
313
+ proc_stats = process_raw_to_processed(
314
+ session=session,
315
+ run_id=run_uuid,
316
+ batch_size=200,
317
+ )
318
+ session.commit()
319
+
320
+ result["news_processed_inserted"] = proc_stats.get("inserted", 0)
321
+ result["news_processed_duplicates"] = proc_stats.get("duplicates", 0)
322
+
323
+ update_run_metrics(
324
+ session, run_id,
325
+ news_processed_inserted=proc_stats.get("inserted", 0),
326
+ news_processed_duplicates=proc_stats.get("duplicates", 0),
327
+ )
328
+ session.commit()
329
+
330
+ logger.info(f"[run_id={run_id}] news_processed: {proc_stats.get('inserted', 0)} inserted")
331
+
332
+ except Exception as e:
333
+ logger.error(f"[run_id={run_id}] Stage 1b failed: {e}")
334
+ result["news_processed_error"] = str(e)
335
+ session.rollback()
336
+
337
+ # -------------------------------------------------------------------------
338
+ # Stage 1c: Cut-off calculation (FAZ 2)
339
+ # -------------------------------------------------------------------------
340
+ logger.info(f"[run_id={run_id}] Stage 1c: Computing news cut-off")
341
+ try:
342
+ from pipelines.cutoff import compute_news_cutoff
343
+
344
+ cutoff_dt = compute_news_cutoff(
345
+ run_datetime=datetime.now(timezone.utc),
346
+ market_tz=settings.market_timezone,
347
+ market_close=settings.market_close_time,
348
+ buffer_minutes=settings.cutoff_buffer_minutes,
349
+ )
350
+
351
+ result["news_cutoff_time"] = cutoff_dt.isoformat()
352
+
353
+ update_run_metrics(session, run_id, news_cutoff_time=cutoff_dt)
354
+ session.commit()
355
+
356
+ logger.info(f"[run_id={run_id}] Cut-off: {cutoff_dt.isoformat()}")
357
+
358
+ except Exception as e:
359
+ logger.error(f"[run_id={run_id}] Stage 1c failed: {e}")
360
+ result["cutoff_error"] = str(e)
361
+
362
+ # -------------------------------------------------------------------------
363
+ # Stage 1d: Price ingestion (existing)
364
+ # -------------------------------------------------------------------------
365
+ logger.info(f"[run_id={run_id}] Stage 1d: Price ingestion")
366
+ try:
367
+ from app.data_manager import ingest_prices
368
+
369
+ price_stats = ingest_prices(session)
370
+ session.commit()
371
+
372
+ result["symbols_fetched"] = len(price_stats)
373
+ result["price_bars_updated"] = sum(
374
+ s.get("imported", 0) for s in price_stats.values()
375
+ )
376
+
377
+ update_run_metrics(
378
+ session, run_id,
379
+ price_bars_updated=result["price_bars_updated"],
380
+ )
381
+ session.commit()
382
+
383
+ except Exception as e:
384
+ logger.error(f"[run_id={run_id}] Stage 1d failed: {e}")
385
+ result["price_error"] = str(e)
386
+ session.rollback()
387
+
388
+ # -------------------------------------------------------------------------
389
+ # Stage 2: Sentiment scoring (existing - uses news_articles for now)
390
+ # -------------------------------------------------------------------------
391
+ logger.info(f"[run_id={run_id}] Stage 2: Sentiment scoring")
392
+ try:
393
+ from app.ai_engine import score_unscored_articles
394
+
395
+ scored = score_unscored_articles(session)
396
+ session.commit()
397
+
398
+ result["articles_scored"] = scored
399
+
400
+ except Exception as e:
401
+ logger.error(f"[run_id={run_id}] Stage 2 failed: {e}")
402
+ result["scoring_error"] = str(e)
403
+ session.rollback()
404
+
405
+ # -------------------------------------------------------------------------
406
+ # Stage 3: Sentiment aggregation (existing)
407
+ # -------------------------------------------------------------------------
408
+ logger.info(f"[run_id={run_id}] Stage 3: Sentiment aggregation")
409
+ try:
410
+ from app.ai_engine import aggregate_daily_sentiment
411
+
412
+ days_aggregated = aggregate_daily_sentiment(session)
413
+ session.commit()
414
+
415
+ result["days_aggregated"] = days_aggregated
416
+
417
+ except Exception as e:
418
+ logger.error(f"[run_id={run_id}] Stage 3 failed: {e}")
419
+ result["aggregation_error"] = str(e)
420
+ session.rollback()
421
+
422
+ # -------------------------------------------------------------------------
423
+ # Stage 4: Model training (optional)
424
+ # -------------------------------------------------------------------------
425
+ if train_model:
426
+ logger.info(f"[run_id={run_id}] Stage 4: Model training")
427
+ try:
428
+ from app.ai_engine import train_xgboost_model, save_model_metadata_to_db
429
+
430
+ train_result = train_xgboost_model(session)
431
+ save_model_metadata_to_db(
432
+ session,
433
+ symbol="HG=F",
434
+ importance=train_result.get("importance", []),
435
+ features=train_result.get("features", []),
436
+ metrics=train_result.get("metrics", {}),
437
+ )
438
+ session.commit()
439
+
440
+ result["model_trained"] = True
441
+ result["model_metrics"] = train_result.get("metrics", {})
442
+
443
+ update_run_metrics(
444
+ session, run_id,
445
+ train_mae=train_result.get("metrics", {}).get("mae"),
446
+ val_mae=train_result.get("metrics", {}).get("val_mae"),
447
+ )
448
+ session.commit()
449
+
450
+ except Exception as e:
451
+ logger.error(f"[run_id={run_id}] Stage 4 failed: {e}")
452
+ result["training_error"] = str(e)
453
+ result["model_trained"] = False
454
+ session.rollback()
455
+ else:
456
+ result["model_trained"] = False
457
+
458
+ # -------------------------------------------------------------------------
459
+ # Stage 5: Generate snapshot
460
+ # -------------------------------------------------------------------------
461
+ logger.info(f"[run_id={run_id}] Stage 5: Generate snapshot")
462
+ try:
463
+ from app.inference import generate_analysis_report, save_analysis_snapshot
464
+
465
+ report = generate_analysis_report(session, "HG=F")
466
+
467
+ if report:
468
+ # Add Faz 2 metadata
469
+ report["quality_state"] = "ok"
470
+ if result.get("news_processed_inserted", 0) == 0:
471
+ report["quality_state"] = "degraded"
472
+ report["message"] = "No fresh news data"
473
+
474
+ save_analysis_snapshot(session, report, "HG=F")
475
+ session.commit()
476
+
477
+ result["snapshot_generated"] = True
478
+ update_run_metrics(session, run_id, snapshot_generated=True)
479
+ session.commit()
480
+ else:
481
+ result["snapshot_generated"] = False
482
+
483
+ except Exception as e:
484
+ logger.error(f"[run_id={run_id}] Stage 5 failed: {e}")
485
+ result["snapshot_error"] = str(e)
486
+ result["snapshot_generated"] = False
487
+ session.rollback()
488
+
489
+ # -------------------------------------------------------------------------
490
+ # Stage 6: Generate commentary
491
+ # -------------------------------------------------------------------------
492
+ logger.info(f"[run_id={run_id}] Stage 6: Generate commentary")
493
+ try:
494
+ from app.commentary import generate_and_save_commentary
495
+
496
+ generate_and_save_commentary(session, "HG=F")
497
+ session.commit()
498
+
499
+ result["commentary_generated"] = True
500
+ update_run_metrics(session, run_id, commentary_generated=True)
501
+ session.commit()
502
+
503
+ except Exception as e:
504
+ logger.warning(f"[run_id={run_id}] Stage 6 failed: {e}")
505
+ result["commentary_generated"] = False
506
+
507
+ return result
508
+
509
+
510
+ # =============================================================================
511
+ # arq worker lifecycle
512
+ # =============================================================================
513
+
514
+ async def startup(ctx: dict) -> None:
515
+ """Called when worker starts."""
516
+ logger.info("Worker starting up...")
517
+ init_db()
518
+
519
+
520
+ async def shutdown(ctx: dict) -> None:
521
+ """Called when worker shuts down."""
522
+ logger.info("Worker shutting down...")