File size: 3,125 Bytes
8f7dc55 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 | /**
* Scheduler de tareas periodicas usando node-cron.
*
* Define y ejecuta los 4 jobs principales del backend:
* 1. syncMarkets — cada 30s, sincroniza precios desde Polymarket Gamma API.
* 2. generateSignals — cada 5 min, genera senales IA para el top 20 mercados activos.
* 3. updatePositionsPnL — cada 30s, recalcula P&L de posiciones abiertas.
* 4. processAlerts — cada 60s, revisa watchlist y envia alertas por Telegram.
*
* Cada job captura sus propios errores para evitar que un fallo afecte a los demas.
* Se ejecuta inmediatamente al arrancar (llamada manual) y luego segun cron.
*/
import { schedule } from 'node-cron';
import { fetchActiveMarkets } from './markets/polymarket.client.js';
import { marketsRepository } from './markets/markets.repository.js';
import { signalsService } from './signals/signals.service.js';
import { positionsService } from './positions/positions.service.js';
import { alertsService } from './alerts/alerts.service.js';
import { emitMarketUpdate } from './socket/broadcaster.js';
import { logger } from './utils/logger.js';
async function syncMarkets() {
try {
const markets = await fetchActiveMarkets();
await Promise.all(markets.map((m) => marketsRepository.upsert(m)));
// Purga mercados activos que no aparecieron en este sync (restos de syncs previos)
const deactivated = await marketsRepository.deactivateStale(markets.map((m) => m.id));
for (const m of markets) {
emitMarketUpdate({ marketId: m.id, yesPrice: m.yesPrice, noPrice: m.noPrice, volumeEur: m.volumeEur });
}
logger.info({ count: markets.length, deactivated }, 'markets synced');
} catch (err) {
logger.error({ err: err.message }, 'syncMarkets failed');
}
}
async function generateSignals() {
try {
// Seleccion diversificada por categoria + liquidez (40 mercados/ciclo)
const markets = await marketsRepository.findDiversified(40);
const byCategory = markets.reduce((acc, m) => {
acc[m.category] = (acc[m.category] || 0) + 1;
return acc;
}, {});
logger.info({ total: markets.length, byCategory }, 'generating signals for diversified set');
for (const market of markets) {
try {
await signalsService.generateForMarket(market);
} catch (err) {
logger.error({ err: err.message, marketId: market.id }, 'signal generation failed for market');
}
}
} catch (err) {
logger.error({ err: err.message }, 'generateSignals failed');
}
}
async function updatePositionsPnL() {
try {
await positionsService.updateAllPnL();
} catch (err) {
logger.error({ err: err.message }, 'updatePositionsPnL failed');
}
}
async function processAlerts() {
try {
await alertsService.processAll();
} catch (err) {
logger.error({ err: err.message }, 'processAlerts failed');
}
}
export function startScheduler() {
syncMarkets();
schedule('*/30 * * * * *', syncMarkets);
schedule('*/5 * * * *', generateSignals);
schedule('*/30 * * * * *', updatePositionsPnL);
schedule('* * * * *', processAlerts);
logger.info('scheduler started');
}
|