tracker / app.py
Dooratre's picture
Update app.py
021f2f0 verified
import requests
import time
import datetime
import threading
import sys
import math
import json
import random
import logging
from typing import Optional, Dict, Any
try:
import winsound
HAS_WINSOUND = True
except ImportError:
HAS_WINSOUND = False
from flask import Flask, request, jsonify
# db_signals helpers you already have
from db_signals import (
fetch_authenticity_token_and_commit_oid,
update_user_json_file,
get_user_json_file, # we will call this directly
)
# ================ Configuration ================
API_URL = "https://research.titanfx.com/api/live-rate?group=forex"
API_HEADERS = {
"referer": "https://research.titanfx.com/instruments/gbpusd",
"sec-ch-ua": "\"Not)A;Brand\";v=\"8\", \"Chromium\";v=\"138\", \"Google Chrome\";v=\"138\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
"cookie" : "_gcl_au=1.1.1796701952.1753040046; _ga=GA1.1.1464196604.1753040048; _fbp=fb.1.1753040052093.29660603653281274; _hjSessionUser_4985643=eyJpZCI6ImM4ZTg0OTk3LTU5ZDEtNTgyNy1iZWM4LTU5NzY3NWM1MjRkMyIsImNyZWF0ZWQiOjE3NTMwNDAwNTIxMjEsImV4aXN0aW5nIjp0cnVlfQ==; _ga_7LY18X3L6E=GS2.1.s1753275944$o4$g1$t1753276250$j58$l0$h0; _ga_SQJ616MYN2=GS2.1.s1753275944$o4$g1$t1753276250$j58$l0$h0; cto_bundle=Bakc1F9RUThHYzllMGNLQWR2aVlCYVl6SVpkWmNsJTJCdFdTbTAwdGp6RXN1NXZHNExiVEF3ZjN1aHBKZHFYZWZMdW9XJTJCUnlSVUR0MSUyQlFMRVpTQTRZZ3BIRm1HRzlkV2pneDVWZWRON0pwbk54bnA5QmcyRThTemhuZ09Tenc4dWFWJTJGQnAlMkZXYTdla3pHNnpKTmZaNmtVREh3dTRMN1N5Vk9WaWV4bGpXdFlNdk9MME5OOUJVeVF2WGMlMkJxbFNQaWpBNmY2c0g0MUd4YVc2SWNsQVE4ZWZaRW84d2JnJTNEJTNE"
}
SYMBOL_DEFAULT = "XAUUSD"
# Polling and retry behavior
REQUEST_TIMEOUT_SECONDS = 4
MAX_RETRIES = 5
RETRY_BACKOFF_SECONDS = 1.2
# Tick cadence
MIN_TICK_SLEEP_SEC = 0.35
MAX_TICK_SLEEP_SEC = 0.6
PRINT_EVERY_N_TICKS = 12
# External systems
ANALYSIS_BASE = "https://dooratre-xauusd-pro.hf.space"
ANALYSIS_STOP_URL = f"{ANALYSIS_BASE}/stop_analysis"
ANALYSIS_NOW_URL = f"{ANALYSIS_BASE}/analysis_now"
OTHER_ALERT_CLEAR_URL = "https://dooratre-alert.hf.space/cancel"
# User message API
MESSAGE_API_URL = "https://aoamrnuwara.pythonanywhere.com/api/send-message"
MESSAGE_API_KEY = "Seakp0683asppoit"
# Flask app
app = Flask(__name__)
# Logger
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("signal-alert-pro")
# ================ Active State ================
ACTIVE_MONITOR = {
"thread": None,
"cancel_token": None,
"id": None,
"symbol": SYMBOL_DEFAULT,
"side": None,
"entry": None,
"tp": None,
"sl": None,
"timestamps": None # preserved for /ts_points
}
ACTIVE_LOCK = threading.Lock()
# New: single activator governance
ACTIVE_ACTIVATOR = {
"thread": None,
"cancel_token": None,
"id": None
}
ACTIVATOR_LOCK = threading.Lock()
# Latch to prevent duplicate activations
ACTIVATION_LATCH = threading.Event()
# ================ db_signals reader helper ================
def read_user_json_file(authenticity_token: str, commit_oid: str):
res = get_user_json_file(authenticity_token, commit_oid)
if not res.get("success"):
raise RuntimeError(f"Failed to read user json file: {res}")
content = res.get("content", "")
if not content:
return []
try:
return json.loads(content)
except Exception:
logger.warning("[GITHUB] JSON parse failed; treating as empty []")
return []
# ================ Utilities ================
def safe_float_join(whole, decimal):
try:
return float(f"{whole}.{decimal}")
except Exception:
try:
return float(f"{whole}.{int(decimal)}")
except Exception:
return None
def ts(dt=None):
return (dt or datetime.datetime.now()).strftime('%H:%M:%S')
def random_sleep_tick():
time.sleep(random.uniform(MIN_TICK_SLEEP_SEC, MAX_TICK_SLEEP_SEC))
def now_utc():
return datetime.datetime.now(datetime.timezone.utc)
def fetch_scenario_with_retries(max_attempts=5, delay_sec=3):
for attempt in range(1, max_attempts + 1):
scen = fetch_current_scenario_struct()
if scen and "scenario" in scen:
return scen
if attempt < max_attempts:
time.sleep(delay_sec)
return None
def format_zone_times(dt_utc: datetime.datetime) -> Dict[str, Any]:
def last_sunday(year, month):
d = datetime.datetime(year, month, 31, tzinfo=datetime.timezone.utc)
while d.weekday() != 6:
d -= datetime.timedelta(days=1)
return d
def nth_weekday_of_month(year, month, weekday, n):
d = datetime.datetime(year, month, 1, tzinfo=datetime.timezone.utc)
while d.weekday() != weekday:
d += datetime.timedelta(days=1)
d += datetime.timedelta(days=7*(n-1))
return d
y = dt_utc.year
last_sun_march = last_sunday(y, 3).replace(hour=1, minute=0, second=0, microsecond=0)
last_sun_oct = last_sunday(y, 10).replace(hour=1, minute=0, second=0, microsecond=0)
london_is_bst = last_sun_march <= dt_utc < last_sun_oct
london_offset = datetime.timedelta(hours=1 if london_is_bst else 0)
london_sfx = "BST" if london_is_bst else "GMT"
london_time = (dt_utc + london_offset).replace(tzinfo=None)
second_sun_march = nth_weekday_of_month(y, 3, 6, 2).replace(hour=7)
first_sun_nov = nth_weekday_of_month(y, 11, 6, 1).replace(hour=6)
ny_is_edt = second_sun_march <= dt_utc < first_sun_nov
ny_offset = datetime.timedelta(hours=-4 if ny_is_edt else -5)
ny_sfx = "EDT" if ny_is_edt else "EST"
ny_time = (dt_utc + ny_offset).replace(tzinfo=None)
tokyo_time = (dt_utc + datetime.timedelta(hours=9)).replace(tzinfo=None)
first_sun_oct = nth_weekday_of_month(y, 10, 6, 1).replace(hour=16)
first_sun_apr = nth_weekday_of_month(y, 4, 6, 1).replace(hour=16)
syd_is_aedt = (dt_utc >= first_sun_oct) or (dt_utc < first_sun_apr)
sydney_offset = datetime.timedelta(hours=11 if syd_is_aedt else 10)
sydney_sfx = "AEDT" if syd_is_aedt else "AEST"
sydney_time = (dt_utc + sydney_offset).replace(tzinfo=None)
return {
"zones": {
"Greenwich": f"{dt_utc.replace(tzinfo=None).strftime('%Y-%m-%d %H:%M:%S')} UTC",
"London": f"{london_time.strftime('%Y-%m-%d %H:%M:%S')} {london_sfx}",
"New York": f"{ny_time.strftime('%Y-%m-%d %H:%M:%S')} {ny_sfx}",
"Tokyo": f"{tokyo_time.strftime('%Y-%m-%d %H:%M:%S')} JST",
"Sydney": f"{sydney_time.strftime('%Y-%m-%d %H:%M:%S')} {sydney_sfx}",
},
"iso_utc": dt_utc.isoformat()
}
def send_message_to_users(message: str, max_retries=5, retry_delay=10):
headers = {
"Content-Type": "application/json",
"X-API-Key": MESSAGE_API_KEY
}
payload = {"message": message}
for attempt in range(1, max_retries + 1):
try:
response = requests.post(MESSAGE_API_URL, headers=headers, data=json.dumps(payload), timeout=8)
if response.status_code == 200:
logger.info(f"Message sent to users successfully on attempt {attempt}")
return {"success": True, "response": response.json() if response.content else {}}
else:
logger.warning(f"Attempt {attempt}: Users API status {response.status_code}, body: {response.text[:200]}")
except requests.exceptions.RequestException as e:
logger.warning(f"Attempt {attempt}: Users API request failed: {e}")
if attempt < max_retries:
time.sleep(retry_delay)
else:
logger.error("Max retries reached. Failed to send user message.")
return {"success": False, "error": "Failed after retries"}
def play_alert_sound():
def _beep():
if HAS_WINSOUND:
try:
for _ in range(3):
winsound.Beep(1000, 350)
time.sleep(0.15)
except Exception:
for _ in range(3):
sys.stdout.write('\a'); sys.stdout.flush()
time.sleep(0.2)
else:
for _ in range(3):
sys.stdout.write('\a'); sys.stdout.flush()
time.sleep(0.2)
threading.Thread(target=_beep, daemon=True).start()
# ================ Prices ================
def get_current_price(symbol=SYMBOL_DEFAULT):
try:
r = requests.get(API_URL, headers=API_HEADERS, timeout=REQUEST_TIMEOUT_SECONDS)
r.raise_for_status()
data = r.json()
if symbol not in data:
logger.warning(f"[DATA] Symbol {symbol} not found in feed.")
return None
row = data[symbol]
if not isinstance(row, list) or len(row) < 4:
logger.warning(f"[DATA] Unexpected data format for {symbol}.")
return None
bid = safe_float_join(row[0], row[1])
ask = safe_float_join(row[2], row[3])
if bid is None or ask is None or math.isnan(bid) or math.isnan(ask):
logger.warning(f"[DATA] Invalid price values for {symbol}.")
return None
return {"bid": bid, "ask": ask}
except requests.RequestException as e:
logger.warning(f"[NETWORK] {e}")
return None
except Exception as e:
logger.warning(f"[ERROR] Unexpected: {e}")
return None
def mid_price(bid: float, ask: float) -> float:
return (bid + ask) / 2.0
# ================ External orchestration ================
def call_stop_analysis_and_clear_others(session_id="default"):
# Stop analysis with proper POST + JSON payload
try:
headers = {"Content-Type": "application/json", "Accept": "application/json"}
payload = {"session_id": str(session_id)}
resp = requests.post(ANALYSIS_STOP_URL, headers=headers, json=payload, timeout=8)
try:
body = resp.json()
except Exception:
body = {"raw_text": resp.text[:300]}
if resp.status_code != 200 or not body.get("success", False):
logger.warning(f"[ANALYSIS] stop_analysis returned {resp.status_code}: {body}")
else:
logger.info(f"[ANALYSIS] stop_analysis OK: {body.get('message','')}")
except Exception as e:
logger.warning(f"[ANALYSIS] stop_analysis request failed: {e}")
# Clear other alerts (kept as GET unless your endpoint needs POST)
try:
resp2 = requests.get(OTHER_ALERT_CLEAR_URL, timeout=5)
if resp2.status_code != 200:
logger.warning(f"[ALERT] clear returned {resp2.status_code}: {resp2.text[:200]}")
else:
logger.info("[ALERT] clear OK")
except Exception as e:
logger.warning(f"[ALERT] clear failed: {e}")
def call_analysis_now(text: str):
try:
headers = {"Content-Type": "application/json"}
payload = {"message": text}
requests.post(ANALYSIS_NOW_URL, headers=headers, json=payload, timeout=8)
except Exception as e:
logger.warning(f"[ANALYSIS] analysis_now failed: {e}")
# ================ GitHub writers =================
def write_active_scenario_to_github(pair: str, side: str, entry: str, sl: str, tp: str, dt_utc: datetime.datetime, preserve_timestamps: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Writes signals.json as a single-element array [ { active_trade } ].
If preserve_timestamps provided, keep it; else compute fresh timestamps.
"""
timestamps = preserve_timestamps if preserve_timestamps else format_zone_times(dt_utc)
active_obj = {
"pair": pair,
"type": side,
"entry": str(entry),
"stop_loss": str(sl),
"take_profit": str(tp),
"timestamps": timestamps
}
payload_array = [active_obj]
new_content = json.dumps(payload_array, ensure_ascii=False)
try:
authenticity_token, commit_oid = fetch_authenticity_token_and_commit_oid()
if not authenticity_token or not commit_oid:
msg = "Failed to get authenticity_token/commit_oid."
logger.error(f"[GITHUB] {msg}")
return {"success": False, "message": msg}
result = update_user_json_file(authenticity_token, commit_oid, new_content)
if result.get("success"):
logger.info("[GITHUB] signals updated with active scenario (array).")
return {"success": True}
else:
logger.error(f"[GITHUB] Update failed: {result}")
return {"success": False, "message": result.get("message", "Unknown error")}
except Exception as e:
logger.error(f"[GITHUB] Exception: {e}")
return {"success": False, "message": str(e)}
def clear_github_signals_to_empty_list() -> Dict[str, Any]:
"""
Clears the db_signals file to [] after SL/TP hit.
"""
try:
authenticity_token, commit_oid = fetch_authenticity_token_and_commit_oid()
if not authenticity_token or not commit_oid:
return {"success": False, "message": "Failed to get auth/commit"}
result = update_user_json_file(authenticity_token, commit_oid, "[]")
if result.get("success"):
logger.info("[GITHUB] signals cleared to []")
return {"success": True}
return {"success": False, "message": str(result)}
except Exception as e:
logger.error(f"[GITHUB] clear exception: {e}")
return {"success": False, "message": str(e)}
# ================ Scenario fetcher ================
def fetch_current_scenario_struct() -> Optional[Dict[str, Any]]:
"""
Reads the pre-activation scenario JSON when signals.json is an array:
[ { "scenario": { "Buy": {"at": "...", "SL": "...", "TP": "..."},
"Sell": {"at": "...", "SL": "...", "TP": "..."},
"timestamps": {...} } } ]
Returns {"scenario": {...}} or None.
"""
try:
authenticity_token, commit_oid = fetch_authenticity_token_and_commit_oid()
if not authenticity_token or not commit_oid:
logger.error("[GITHUB] Missing auth tokens to read scenario")
return None
content = read_user_json_file(authenticity_token, commit_oid)
# Handle [] formats
if isinstance(content, list):
if not content:
return None
first = content[0]
if isinstance(first, dict) and "scenario" in first:
return first # {"scenario": {...}}
else:
return None
# Fallback for older dict format
if isinstance(content, dict) and "scenario" in content:
return content
return None
except Exception as e:
logger.warning(f"[SCENARIO] fetch failed: {e}")
return None
# ================ Activation helpers ================
class CancelToken:
def __init__(self):
self._cancel = False
self._lock = threading.Lock()
def cancel(self):
with self._lock:
self._cancel = True
def is_cancelled(self):
with self._lock:
return self._cancel
def init_price_snapshot(symbol) -> Optional[Dict[str, float]]:
retries, snap = 0, None
while retries < MAX_RETRIES and snap is None:
snap = get_current_price(symbol)
if snap is None:
retries += 1
if retries < MAX_RETRIES:
time.sleep(RETRY_BACKOFF_SECONDS * retries)
else:
return None
return snap
def detect_cross(prev_price: float, curr_price: float, level: float, direction: Optional[str]) -> bool:
if direction == "up":
return prev_price < level <= curr_price
elif direction == "down":
return prev_price > level >= curr_price
else:
return (prev_price < level <= curr_price) or (prev_price > level >= curr_price)
def price_crossing_to_activate(side: str, entry: float, prev: float, curr: float) -> bool:
if side == "Buy":
return prev < entry <= curr
else:
return prev > entry >= curr
def wait_until_activation_dual(buy_entry: Optional[float],
sell_entry: Optional[float],
symbol: str,
cancel_token: "CancelToken",
max_minutes: int = 240) -> Optional[Dict[str, float]]:
if buy_entry is None and sell_entry is None:
return None
snap = init_price_snapshot(symbol)
if snap is None:
logger.error("[ACTIVATOR] Initial price fetch failed.")
return None
prev = mid_price(snap["bid"], snap["ask"])
t_end = datetime.datetime.now() + datetime.timedelta(minutes=max_minutes)
tick = 0
if buy_entry is not None and abs(prev - buy_entry) < 1e-9:
return {"side": "Buy", "price": prev}
if sell_entry is not None and abs(prev - sell_entry) < 1e-9:
return {"side": "Sell", "price": prev}
while datetime.datetime.now() < t_end and not cancel_token.is_cancelled():
q = get_current_price(symbol)
if q is None:
time.sleep(0.5)
continue
curr = mid_price(q["bid"], q["ask"])
# Sell first then Buy (stable tie-break)
if sell_entry is not None and price_crossing_to_activate("Sell", sell_entry, prev, curr):
return {"side": "Sell", "price": curr}
if buy_entry is not None and price_crossing_to_activate("Buy", buy_entry, prev, curr):
return {"side": "Buy", "price": curr}
tick += 1
if tick % PRINT_EVERY_N_TICKS == 0:
logger.info(f"[{ts()}] Waiting activation Buy@{buy_entry} Sell@{sell_entry}, now {curr:.3f}")
prev = curr
random_sleep_tick()
return None
# ================ SL/TP monitor ================
def start_sl_tp_monitor(side: str, entry: float, sl: float, tp: float, symbol: str = SYMBOL_DEFAULT):
cancel_token = CancelToken()
monitor_id = f"{int(time.time()*1000)}-{random.randint(1000,9999)}"
def _runner():
try:
snap = init_price_snapshot(symbol)
if snap is None:
logger.error("[MONITOR] Unable to retrieve initial price snapshot.")
return
bid0, ask0 = snap["bid"], snap["ask"]
price0 = mid_price(bid0, ask0)
prev_price = price0
hi_bid = bid0
lo_ask = ask0
if side == "Buy":
tp_dir = "up" if tp >= entry else None
sl_dir = "down" if sl <= entry else None
else:
tp_dir = "down" if tp <= entry else None
sl_dir = "up" if sl >= entry else None
tick = 0
while not cancel_token.is_cancelled():
q = get_current_price(symbol)
if q is None:
time.sleep(0.6)
continue
bid, ask = q["bid"], q["ask"]
price_now = mid_price(bid, ask)
hi_bid = max(hi_bid, bid)
lo_ask = min(lo_ask, ask)
hit = None
if detect_cross(prev_price, price_now, sl, sl_dir):
hit = ("SL", sl)
if hit is None and detect_cross(prev_price, price_now, tp, tp_dir):
hit = ("TP", tp)
if hit:
play_alert_sound()
hit_type, hit_price = hit
# 1) Notify user immediately
msg = f"{side.upper()} SIGNAL {hit_type} hit at {hit_price}\nEntry: {entry}\nTP: {tp}\nSL: {sl}\nPair: {symbol}"
send_message_to_users(msg)
# 2) Clear db_signals to []
try:
clear_github_signals_to_empty_list()
except Exception as e:
logger.warning(f"[GITHUB] clearing after hit failed: {e}")
# 3) Stop analysis and other alert system again
call_stop_analysis_and_clear_others()
# 4) Notify analysis of the result
analysis_msg = f"{symbol} {side} trade {hit_type} reached at {hit_price} (entry {entry}, TP {tp}, SL {sl})."
call_analysis_now(analysis_msg)
logger.info(f"[MONITOR-END] {side} {hit_type} hit.")
return
tick += 1
if tick % PRINT_EVERY_N_TICKS == 0:
logger.info(f"[{ts()}] {symbol} {side} Mid {price_now:.3f} | Bid {bid:.3f}(hi {hi_bid:.3f}) | Ask {ask:.3f}(lo {lo_ask:.3f}) | TP {tp} / SL {sl}")
prev_price = price_now
random_sleep_tick()
except Exception as e:
logger.error(f"[MONITOR-THREAD] Error: {e}")
finally:
with ACTIVE_LOCK:
if ACTIVE_MONITOR.get("id") == monitor_id:
ACTIVE_MONITOR["thread"] = None
ACTIVE_MONITOR["cancel_token"] = None
t = threading.Thread(target=_runner, daemon=True)
with ACTIVE_LOCK:
# Cancel any running monitor before starting a new one
if ACTIVE_MONITOR["cancel_token"]:
ACTIVE_MONITOR["cancel_token"].cancel()
ACTIVE_MONITOR["thread"] = t
ACTIVE_MONITOR["cancel_token"] = cancel_token
ACTIVE_MONITOR["id"] = monitor_id
ACTIVE_MONITOR["symbol"] = symbol
ACTIVE_MONITOR["side"] = side
ACTIVE_MONITOR["entry"] = entry
ACTIVE_MONITOR["tp"] = tp
ACTIVE_MONITOR["sl"] = sl
# timestamps remains as is (set by activator)
t.start()
# ================ API Endpoints ================
@app.route("/track", methods=["POST"])
def track_signal():
"""
Accepts: {"Buy": number?, "Sell": number?}
Behavior:
- Stop analysis and clear other alert (immediately).
- Cancel any existing activation/monitor (newest wins).
- Start watcher for activation of Buy/Sell. On first hit (latched):
* Play sound
* Use cached SL/TP from scenario read at start
* Write active array [ {...} ] with timestamps for the start moment
* Send user "BUY NOW/SELL NOW..." message (once)
* Start SL/TP monitor
* Notify analysis via analysis_now
- Respond 200 immediately after accepting.
"""
try:
data = request.get_json(force=True)
except Exception:
return jsonify({"status": "error", "message": "Invalid JSON"}), 400
buy_at = data.get("Buy")
sell_at = data.get("Sell")
if buy_at is None and sell_at is None:
return jsonify({"status": "error", "message": "Provide Buy or Sell or both"}), 400
if buy_at is not None:
try:
buy_at = float(buy_at)
except Exception:
return jsonify({"status": "error", "message": "Buy must be numeric"}), 400
if sell_at is not None:
try:
sell_at = float(sell_at)
except Exception:
return jsonify({"status": "error", "message": "Sell must be numeric"}), 400
# 1) Stop external analysis and other alert now
call_stop_analysis_and_clear_others()
# 2) Cancel any current SL/TP monitor operation and reset active monitor state
with ACTIVE_LOCK:
if ACTIVE_MONITOR["cancel_token"]:
ACTIVE_MONITOR["cancel_token"].cancel()
ACTIVE_MONITOR["thread"] = None
ACTIVE_MONITOR["cancel_token"] = None
ACTIVE_MONITOR["id"] = None
ACTIVE_MONITOR["symbol"] = SYMBOL_DEFAULT
ACTIVE_MONITOR["side"] = None
ACTIVE_MONITOR["entry"] = None
ACTIVE_MONITOR["tp"] = None
ACTIVE_MONITOR["sl"] = None
ACTIVE_MONITOR["timestamps"] = None
# 3) Cancel any existing activator and set up new one
with ACTIVATOR_LOCK:
if ACTIVE_ACTIVATOR["cancel_token"]:
ACTIVE_ACTIVATOR["cancel_token"].cancel()
ACTIVATION_LATCH.clear() # new cycle, no activation yet
activator_cancel = CancelToken()
activator_id = f"act-{int(time.time()*1000)}-{random.randint(1000,9999)}"
def _activator():
try:
# Read scenario once and cache SL/TP locally to avoid race
scenario_blob = fetch_scenario_with_retries(max_attempts=5, delay_sec=3)
if not scenario_blob or "scenario" not in scenario_blob:
logger.error("[ACTIVATOR] Missing scenario data before waiting (after retries).")
send_message_to_users("ERROR: Missing scenario structure before activation (after retries). Please set scenario and retry.")
return
scen = scenario_blob["scenario"]
# Extract SL/TP for both sides if present
buy_sl = buy_tp = sell_sl = sell_tp = None
if "Buy" in scen:
try:
buy_sl = float(scen["Buy"].get("SL")) if scen["Buy"].get("SL") is not None else None
buy_tp = float(scen["Buy"].get("TP")) if scen["Buy"].get("TP") is not None else None
except Exception:
logger.error("[ACTIVATOR] Invalid Buy SL/TP in scenario.")
if "Sell" in scen:
try:
sell_sl = float(scen["Sell"].get("SL")) if scen["Sell"].get("SL") is not None else None
sell_tp = float(scen["Sell"].get("TP")) if scen["Sell"].get("TP") is not None else None
except Exception:
logger.error("[ACTIVATOR] Invalid Sell SL/TP in scenario.")
# Optional timestamps from scenario pre-activation are not used here; we create fresh on activate
act = wait_until_activation_dual(
buy_entry=buy_at,
sell_entry=sell_at,
symbol=SYMBOL_DEFAULT,
cancel_token=activator_cancel,
max_minutes=240
)
if act is None:
logger.info("[ACTIVATOR] Cancelled or timed out.")
return
# Latch: only the first activator proceeds
if ACTIVATION_LATCH.is_set():
logger.info("[ACTIVATOR] Activation already handled by another thread. Exiting.")
return
ACTIVATION_LATCH.set()
chosen_side = act["side"]
entry_used = act["price"]
# Choose SL/TP from cached scenario
if chosen_side == "Buy":
sl_f, tp_f = buy_sl, buy_tp
else:
sl_f, tp_f = sell_sl, sell_tp
if sl_f is None or tp_f is None:
# Cannot proceed without points
logger.error(f"[ACTIVATOR] Missing SL/TP for {chosen_side}.")
send_message_to_users(f"ERROR: Missing scenario SL/TP for {chosen_side} at {entry_used}")
return
play_alert_sound()
# 3) Write active array with fresh timestamps
dt_utc = now_utc()
ts_block = format_zone_times(dt_utc)
wr = write_active_scenario_to_github(
pair=SYMBOL_DEFAULT,
side=chosen_side,
entry=str(entry_used),
sl=str(sl_f),
tp=str(tp_f),
dt_utc=dt_utc
)
if not wr.get("success"):
logger.warning(f"[GITHUB] Write failed: {wr}")
# Save timestamps in memory to preserve on /ts_points updates
with ACTIVE_LOCK:
ACTIVE_MONITOR["timestamps"] = ts_block
# 4) Notify users (once)
user_msg = f"{chosen_side.upper()} NOW at {entry_used}\nSL: {sl_f}\nTP: {tp_f}\nPair: {SYMBOL_DEFAULT}"
send_message_to_users(user_msg)
# 5) Inform analysis
call_analysis_now(f"Scenario {chosen_side} activated for {SYMBOL_DEFAULT} at {entry_used}. SL {sl_f}, TP {tp_f}.")
# 6) Start SL/TP monitor
start_sl_tp_monitor(side=chosen_side, entry=entry_used, sl=sl_f, tp=tp_f, symbol=SYMBOL_DEFAULT)
except Exception as e:
logger.error(f"[ACTIVATOR] Error: {e}")
finally:
with ACTIVATOR_LOCK:
if ACTIVE_ACTIVATOR["id"] == activator_id:
ACTIVE_ACTIVATOR["thread"] = None
ACTIVE_ACTIVATOR["cancel_token"] = None
ACTIVE_ACTIVATOR["id"] = None
t = threading.Thread(target=_activator, daemon=True)
ACTIVE_ACTIVATOR["thread"] = t
ACTIVE_ACTIVATOR["cancel_token"] = activator_cancel
ACTIVE_ACTIVATOR["id"] = activator_id
t.start()
return jsonify({"status": "accepted", "message": f"Tracking activation for Buy at {buy_at} and/or Sell at {sell_at} started."}), 200
@app.route("/ts_points", methods=["POST"])
def ts_points():
"""
Accepts: {"TP": "...", "SL": "..."} to update active trade points.
Behavior:
- Requires an active trade.
- Cancel current monitor and restart with the updated levels.
- DO NOT change the activation timestamps (trade start time).
- Rewrite GitHub active array with preserved timestamps.
- Notify users and analysis.
"""
try:
data = request.get_json(force=True)
except Exception:
return jsonify({"status": "error", "message": "Invalid JSON"}), 400
tp_new = data.get("TP")
sl_new = data.get("SL")
if tp_new is None and sl_new is None:
return jsonify({"status": "error", "message": "Provide TP or SL"}), 400
with ACTIVE_LOCK:
if ACTIVE_MONITOR["id"] is None or ACTIVE_MONITOR["side"] is None:
return jsonify({"status": "error", "message": "No active trade to update"}), 400
# Parse and stage updates
if tp_new is not None:
try:
ACTIVE_MONITOR["tp"] = float(tp_new)
except Exception:
return jsonify({"status": "error", "message": "Invalid TP"}), 400
if sl_new is not None:
try:
ACTIVE_MONITOR["sl"] = float(sl_new)
except Exception:
return jsonify({"status": "error", "message": "Invalid SL"}), 400
side = ACTIVE_MONITOR["side"]
entry = ACTIVE_MONITOR["entry"]
symbol = ACTIVE_MONITOR["symbol"]
tp = ACTIVE_MONITOR["tp"]
sl = ACTIVE_MONITOR["sl"]
preserved_ts = ACTIVE_MONITOR["timestamps"]
if ACTIVE_MONITOR["cancel_token"]:
ACTIVE_MONITOR["cancel_token"].cancel()
# Restart monitor with new TP/SL
start_sl_tp_monitor(side=side, entry=entry, sl=sl, tp=tp, symbol=symbol)
# Update GitHub record preserving timestamps
wr = write_active_scenario_to_github(
pair=symbol,
side=side,
entry=str(entry),
sl=str(sl),
tp=str(tp),
dt_utc=now_utc(), # ignored when preserve_timestamps provided
preserve_timestamps=preserved_ts
)
if not wr.get("success"):
logger.warning(f"[GITHUB] Update after TS change failed: {wr}")
# Notify users and analysis about the update (optional; you can enable if needed)
try:
send_message_to_users(f"{side.upper()} TP/SL updated\nEntry: {entry}\nTP: {tp}\nSL: {sl}\nPair: {symbol}")
call_analysis_now(f"{symbol} {side} TP/SL updated. Entry {entry}, TP {tp}, SL {sl}.")
except Exception:
pass
return jsonify({"status": "ok", "message": "TP/SL updated and tracking restarted"}), 200
@app.route("/", methods=["GET"])
def root():
return jsonify({
"service": "Signal Trade Activation & SL/TP Tracking (PRO)",
"symbol": SYMBOL_DEFAULT,
"routes": {
"POST /track": "Payload: { 'Buy': price?, 'Sell': price? }. Starts activation tracking. Newest wins.",
"POST /ts_points": "Payload: { 'TP': new_tp?, 'SL': new_sl? }. Updates active trade without changing start time."
},
"notes": [
"Only one activator and one monitor run at a time; newest /track cancels previous.",
"Activation uses a latch so only one activation proceeds even under concurrency.",
"Scenario SL/TP are cached at activator start; no GitHub reads at activation moment.",
"On SL/TP hit: notify user, clear db_signals to [], stop analysis & clear other alert, and inform analysis.",
"On /ts_points: timestamps in GitHub are preserved (trade start time)."
]
}), 200
# ================ Main ================
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=False, threaded=True)