alert / app.py
Dooratre's picture
Update app.py
66d26ff verified
import requests
import time
import datetime
import threading
import sys
import math
import json
import random
import logging
from typing import Optional, Dict, Any, List, Union
try:
import winsound
HAS_WINSOUND = True
except ImportError:
HAS_WINSOUND = False
from flask import Flask, request, jsonify
# Import GitHub signals helpers
from db_signals import (
fetch_authenticity_token_and_commit_oid,
update_user_json_file,
)
# ================= Configuration =================
API_URL = "https://research.titanfx.com/api/live-rate?group=forex"
API_HEADERS = {
"referer": "https://research.titanfx.com/instruments/gbpusd",
"sec-ch-ua": "\"Not)A;Brand\";v=\"8\", \"Chromium\";v=\"138\", \"Google Chrome\";v=\"138\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
"cookie" : "_gcl_au=1.1.1796701952.1753040046; _ga=GA1.1.1464196604.1753040048; _fbp=fb.1.1753040052093.29660603653281274; _hjSessionUser_4985643=eyJpZCI6ImM4ZTg0OTk3LTU5ZDEtNTgyNy1iZWM4LTU5NzY3NWM1MjRkMyIsImNyZWF0ZWQiOjE3NTMwNDAwNTIxMjEsImV4aXN0aW5nIjp0cnVlfQ==; _ga_7LY18X3L6E=GS2.1.s1753275944$o4$g1$t1753276250$j58$l0$h0; _ga_SQJ616MYN2=GS2.1.s1753275944$o4$g1$t1753276250$j58$l0$h0; cto_bundle=Bakc1F9RUThHYzllMGNLQWR2aVlCYVl6SVpkWmNsJTJCdFdTbTAwdGp6RXN1NXZHNExiVEF3ZjN1aHBKZHFYZWZMdW9XJTJCUnlSVUR0MSUyQlFMRVpTQTRZZ3BIRm1HRzlkV2pneDVWZWRON0pwbk54bnA5QmcyRThTemhuZ09Tenc4dWFWJTJGQnAlMkZXYTdla3pHNnpKTmZaNmtVREh3dTRMN1N5Vk9WaWV4bGpXdFlNdk9MME5OOUJVeVF2WGMlMkJxbFNQaWpBNmY2c0g0MUd4YVc2SWNsQVE4ZWZaRW84d2JnJTNEJTNE"
}
SYMBOL_DEFAULT = "XAUUSD"
# Polling and retry behavior
REQUEST_TIMEOUT_SECONDS = 4
MAX_RETRIES = 5
RETRY_BACKOFF_SECONDS = 1.2
# Faster tick cadence
MIN_TICK_SLEEP_SEC = 0.3
MAX_TICK_SLEEP_SEC = 0.5
# Lightweight status print cadence
PRINT_EVERY_N_TICKS = 12
# Analysis system webhook
ALERT_WEBHOOK_URL = "https://dooratre-xauusd-pro2.hf.space/analysis_now"
# User message API
MESSAGE_API_URL = "https://aoamrnuwara.pythonanywhere.com/api/send-message"
MESSAGE_API_KEY = "Seakp0683asppoit2"
# Flask app
app = Flask(__name__)
# Logger
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("forex-alert")
# ================= Single Active Monitor Store =================
# Only one monitor can be active at a time.
ACTIVE_MONITOR = {"id": None, "token": None}
ACTIVE_MONITOR_LOCK = threading.Lock()
# ================= Utilities =================
def safe_float_join(whole, decimal):
try:
return float(f"{whole}.{decimal}")
except Exception:
try:
return float(f"{whole}.{int(decimal)}")
except Exception:
return None
def ts(dt=None):
return (dt or datetime.datetime.now()).strftime('%H:%M:%S')
def random_sleep_tick():
time.sleep(random.uniform(MIN_TICK_SLEEP_SEC, MAX_TICK_SLEEP_SEC))
def get_current_price(symbol=SYMBOL_DEFAULT):
"""
Fetch current bid/ask for the given symbol.
Returns: dict {bid: float, ask: float} or None on failure.
"""
try:
r = requests.get(API_URL, headers=API_HEADERS, timeout=REQUEST_TIMEOUT_SECONDS)
r.raise_for_status()
data = r.json()
if symbol not in data:
logger.warning(f"[DATA] Symbol {symbol} not found in feed.")
return None
row = data[symbol]
if not isinstance(row, list) or len(row) < 4:
logger.warning(f"[DATA] Unexpected data format for {symbol}.")
return None
bid = safe_float_join(row[0], row[1])
ask = safe_float_join(row[2], row[3])
if bid is None or ask is None or math.isnan(bid) or math.isnan(ask):
logger.warning(f"[DATA] Invalid price values for {symbol}.")
return None
return {"bid": bid, "ask": ask}
except requests.RequestException as e:
logger.warning(f"[NETWORK] {e}")
return None
except Exception as e:
logger.warning(f"[ERROR] Unexpected: {e}")
return None
def mid_price(bid: float, ask: float) -> float:
return (bid + ask) / 2.0
def play_alert_sound():
def _beep():
if HAS_WINSOUND:
try:
for _ in range(3):
winsound.Beep(1000, 350)
time.sleep(0.15)
except Exception:
for _ in range(3):
sys.stdout.write('\a'); sys.stdout.flush()
time.sleep(0.2)
else:
for _ in range(3):
sys.stdout.write('\a'); sys.stdout.flush()
time.sleep(0.2)
threading.Thread(target=_beep, daemon=True).start()
def send_alert_message_to_analysis(text: str):
"""
Send message to analysis system, fire-and-forget.
"""
def _post():
try:
headers = {"Content-Type": "application/json"}
payload = {"message": text}
requests.post(ALERT_WEBHOOK_URL, headers=headers, json=payload, timeout=5)
except Exception as e:
logger.warning(f"[WEBHOOK] Failed to POST to {ALERT_WEBHOOK_URL}: {e}")
threading.Thread(target=_post, daemon=True).start()
def send_message_to_users(message: str, max_retries=5, retry_delay=10):
"""
Send message to users with automatic retries.
Blocking in monitor thread.
"""
headers = {
"Content-Type": "application/json",
"X-API-Key": MESSAGE_API_KEY
}
payload = {"message": message}
for attempt in range(1, max_retries + 1):
try:
response = requests.post(MESSAGE_API_URL, headers=headers, data=json.dumps(payload), timeout=8)
if response.status_code == 200:
logger.info(f"Message sent to users successfully on attempt {attempt}")
return {"success": True, "response": response.json()}
else:
logger.warning(f"Attempt {attempt}: Users API status {response.status_code}, body: {response.text[:200]}")
except requests.exceptions.RequestException as e:
logger.warning(f"Attempt {attempt}: Users API request failed: {e}")
if attempt < max_retries:
time.sleep(retry_delay)
else:
logger.error("Max retries reached. Failed to send user message.")
return {"success": False, "error": "Failed after retries"}
# ================= GitHub signals auto-close =================
def close_trade_signals() -> Dict[str, Any]:
"""
Auto-close trades by clearing signals2.json to [] on GitHub.
Returns dict with success flag and message.
"""
try:
authenticity_token, commit_oid = fetch_authenticity_token_and_commit_oid()
if not authenticity_token or not commit_oid:
msg = "Failed to get authenticity_token/commit_oid."
logger.error(f"[GITHUB] {msg}")
return {"success": False, "message": msg}
new_content = "[]"
result = update_user_json_file(authenticity_token, commit_oid, new_content)
if result.get("success"):
logger.info("[GITHUB] signals2.json cleared to [] (trade closed).")
return {"success": True, "message": "signals2.json cleared"}
else:
logger.error(f"[GITHUB] Failed to clear signals2.json: {result}")
return {"success": False, "message": result.get("message", "Unknown error")}
except Exception as e:
logger.error(f"[GITHUB] Exception during close_trade_signals: {e}")
return {"success": False, "message": str(e)}
# ================= Arabic Message Builders =================
def build_tp_user_message(symbol, price, tp):
return f"يا سلام! ✅ تم الوصول إلى هدف الربح TP عند السعر {tp} على {symbol}. السعر الحالي: {price}. مبروك! 🎉💰"
def build_sl_user_message(symbol, price, sl):
return f"للأسف ❌ تم ضرب وقف الخسارة SL عند السعر {sl} على {symbol}. السعر الحالي: {price}. لا تيأس، فرص قادمة بإذن الله. 😔"
def build_waypoint_user_message(symbol, price_now, waypoint_price, waypoint_text):
if waypoint_text and waypoint_text.strip():
return waypoint_text
return f"تنبيه: وصل {symbol} إلى المستوى {waypoint_price}. السعر الحالي: {price_now}. الرجاء المتابعة."
# Updated to include auto-close statement directly
def build_tp_analysis_message(symbol, price, tp):
return (
f"{symbol} just hit TP at {tp} (current {price}). Trade closed automatically. "
f"Users got the happy message 🎉 and signals cleared. Stay sharp"
)
def build_sl_analysis_message(symbol, price, sl):
return (
f"{symbol} touched SL at {sl} (current {price}). Position closed. "
f"Users received the sad update 😔, signals cleared. Analysis perfect and find good Trade in another Time GOOD LUCK"
)
def build_waypoint_analysis_message(symbol, price_now, waypoint_price, user_msg):
return (
f"{symbol} reached {waypoint_price} (now {price_now}). "
f"We told users: '{user_msg}'. Watch closely — momentum could set up the next move."
)
# ================= Core Monitor (Gap-aware crossing) =================
class CancelToken:
def __init__(self):
self._cancel = False
self._lock = threading.Lock()
def cancel(self):
with self._lock:
self._cancel = True
def is_cancelled(self):
with self._lock:
return self._cancel
def init_price_snapshot(symbol) -> Optional[Dict[str, float]]:
retries, snap = 0, None
while retries < MAX_RETRIES and snap is None:
snap = get_current_price(symbol)
if snap is None:
retries += 1
if retries < MAX_RETRIES:
delay = RETRY_BACKOFF_SECONDS * retries
time.sleep(delay)
else:
return None
return snap
def detect_cross(prev_price: float, curr_price: float, level: float, direction: Optional[str]) -> bool:
"""
Gap-aware crossing logic.
"""
if direction == "up":
return prev_price < level <= curr_price
elif direction == "down":
return prev_price > level >= curr_price
else:
return (prev_price < level <= curr_price) or (prev_price > level >= curr_price)
def auto_close_and_notify_for_tp_sl(analysis_message: str):
"""
Clears signals2.json to [] and sends the provided analysis message (which already mentions auto-close).
"""
result = close_trade_signals()
if result.get("success"):
send_alert_message_to_analysis(analysis_message)
else:
send_alert_message_to_analysis(analysis_message + f" Auto-close FAILED: {result.get('message')}")
def monitor_prices(
symbol: str,
duration_minutes: int,
tp: Optional[float] = None,
sl: Optional[float] = None,
price_messages: Optional[List[Dict[str, Union[float, str]]]] = None,
cancel_token: Optional[CancelToken] = None
) -> Dict[str, Any]:
"""
Monitors TP, SL, and multiple price message levels.
Ends on first trigger (TP/SL/waypoint).
On TP/SL:
- send user message,
- auto-close signals (clear signals2.json),
- send one analysis message that already states auto-close was executed.
On waypoint:
- send user message and analysis message, no auto-close.
"""
if duration_minutes is None or duration_minutes <= 0:
return {"status": "error", "message": "Invalid duration_minutes (must be > 0)."}
# Initial snapshot
snap = init_price_snapshot(symbol)
if snap is None:
msg = "Unable to retrieve initial price snapshot."
send_alert_message_to_analysis(f"[ERROR] {msg}")
return {"status": "error", "message": msg}
t_start = datetime.datetime.now()
t_end = t_start + datetime.timedelta(minutes=duration_minutes)
bid0, ask0 = snap["bid"], snap["ask"]
price0 = mid_price(bid0, ask0)
hi_bid = bid0
lo_ask = ask0
levels = []
# TP
if tp is not None:
try:
tp = float(tp)
direction = "up" if tp > price0 else ("down" if tp < price0 else None)
levels.append({"type": "tp", "level": tp, "direction": direction})
except Exception:
pass
# SL
if sl is not None:
try:
sl = float(sl)
direction = "up" if sl > price0 else ("down" if sl < price0 else None)
levels.append({"type": "sl", "level": sl, "direction": direction})
except Exception:
pass
# Price messages (waypoints)
remaining_waypoints = []
if price_messages:
for w in price_messages:
try:
p = float(w["price"])
m = str(w.get("message", "")).strip()
direction = "up" if p > price0 else ("down" if p < price0 else None)
remaining_waypoints.append({"type": "waypoint", "level": p, "direction": direction, "message": m})
except Exception:
continue
# Immediate hits if any level equals the initial price
for entry in list(levels) + list(remaining_waypoints):
if entry["direction"] is None: # means level == price0
play_alert_sound()
if entry["type"] == "tp":
user_msg = build_tp_user_message(symbol, price0, entry["level"])
send_message_to_users(user_msg)
analysis_msg = build_tp_analysis_message(symbol, price0, entry["level"])
auto_close_and_notify_for_tp_sl(analysis_msg)
return {"status": "hit", "side": "tp", "message": "TP hit immediately."}
elif entry["type"] == "sl":
user_msg = build_sl_user_message(symbol, price0, entry["level"])
send_message_to_users(user_msg)
analysis_msg = build_sl_analysis_message(symbol, price0, entry["level"])
auto_close_and_notify_for_tp_sl(analysis_msg)
return {"status": "hit", "side": "sl", "message": "SL hit immediately."}
else:
user_msg = build_waypoint_user_message(symbol, price0, entry["level"], entry.get("message", ""))
send_message_to_users(user_msg)
analysis_msg = build_waypoint_analysis_message(symbol, price0, entry["level"], user_msg)
send_alert_message_to_analysis(analysis_msg)
return {"status": "hit", "side": "waypoint", "message": f"Waypoint {entry['level']} hit immediately."}
# Main loop
tick = 0
prev_price = price0
try:
while datetime.datetime.now() < t_end:
if cancel_token and cancel_token.is_cancelled():
return {"status": "cancelled", "message": "Monitor cancelled."}
q = get_current_price(symbol)
if q is None:
time.sleep(0.6)
continue
bid, ask = q["bid"], q["ask"]
price_now = mid_price(bid, ask)
hi_bid = max(hi_bid, bid)
lo_ask = min(lo_ask, ask)
# 1) Check TP/SL first
for entry in levels:
lvl = entry["level"]
direction = entry["direction"]
if detect_cross(prev_price, price_now, lvl, direction):
play_alert_sound()
if entry["type"] == "tp":
user_msg = build_tp_user_message(symbol, price_now, lvl)
send_message_to_users(user_msg)
analysis_msg = build_tp_analysis_message(symbol, price_now, lvl)
auto_close_and_notify_for_tp_sl(analysis_msg)
return {"status": "hit", "side": "tp", "message": "TP hit."}
else: # sl
user_msg = build_sl_user_message(symbol, price_now, lvl)
send_message_to_users(user_msg)
analysis_msg = build_sl_analysis_message(symbol, price_now, lvl)
auto_close_and_notify_for_tp_sl(analysis_msg)
return {"status": "hit", "side": "sl", "message": "SL hit."}
# 2) Then price message waypoints
for w in remaining_waypoints:
lvl = w["level"]
direction = w["direction"]
if detect_cross(prev_price, price_now, lvl, direction):
play_alert_sound()
user_msg = build_waypoint_user_message(symbol, price_now, lvl, w.get("message", ""))
send_message_to_users(user_msg)
analysis_msg = build_waypoint_analysis_message(symbol, price_now, lvl, user_msg)
send_alert_message_to_analysis(analysis_msg)
return {"status": "hit", "side": "waypoint", "message": f"Waypoint {lvl} fired."}
# Logging and tick control
tick += 1
if tick % PRINT_EVERY_N_TICKS == 0:
mins_left = max(0.0, (t_end - datetime.datetime.now()).total_seconds() / 60)
logger.info(f"[{ts()}] {symbol} Mid {price_now:.3f} | Bid {bid:.3f} (hi {hi_bid:.3f}) | Ask {ask:.3f} (lo {lo_ask:.3f}) | TP {tp} / SL {sl} | {len(remaining_waypoints)} waypoints | {mins_left:.1f}m left")
prev_price = price_now
random_sleep_tick()
# Expired
send_alert_message_to_analysis(
f"[EXPIRED] {symbol} window ended without trigger. TP {tp}, SL {sl}, remaining waypoints {len(remaining_waypoints)}."
)
return {"status": "expired", "message": "No levels were triggered within duration."}
except Exception as e:
message = f"Unexpected error: {e}"
send_alert_message_to_analysis(f"[ERROR] {message}")
return {"status": "error", "message": message}
# ================= Monitor Management =================
class CancelTokenStore:
pass
def start_monitor_async(
symbol: str,
duration_minutes: int,
tp: Optional[float],
sl: Optional[float],
price_messages: Optional[List[Dict[str, Union[float, str]]]]
) -> str:
"""
Start monitor in a daemon thread.
Behavior: Only one monitor can be active.
If another monitor is running, cancel it and replace with the new one.
Returns the new monitor_id.
"""
monitor_id = f"{int(time.time()*1000)}-{random.randint(1000,9999)}"
cancel_token = CancelToken()
# If there is an active monitor, cancel it first
with ACTIVE_MONITOR_LOCK:
old_token = ACTIVE_MONITOR.get("token")
if old_token is not None:
try:
old_token.cancel()
except Exception:
pass
def _runner():
try:
result = monitor_prices(
symbol=symbol,
duration_minutes=duration_minutes,
tp=tp,
sl=sl,
price_messages=price_messages,
cancel_token=cancel_token
)
logger.info(f"[MONITOR-END] {symbol} result: {result}")
except Exception as e:
logger.error(f"[MONITOR-THREAD] Error: {e}")
finally:
# Clear slot if this monitor is still the active one
with ACTIVE_MONITOR_LOCK:
if ACTIVE_MONITOR.get("id") == monitor_id:
ACTIVE_MONITOR["id"] = None
ACTIVE_MONITOR["token"] = None
t = threading.Thread(target=_runner, daemon=True)
# Set the new monitor as active (replace previous)
with ACTIVE_MONITOR_LOCK:
ACTIVE_MONITOR["id"] = monitor_id
ACTIVE_MONITOR["token"] = cancel_token
t.start()
return monitor_id
# ================= API Endpoints =================
@app.route("/monitor", methods=["POST"])
def start_monitor():
"""
Start monitoring asynchronously and immediately return 200.
Only one monitor is allowed at a time; a new request cancels and replaces the previous task.
"""
try:
data = request.get_json(force=True)
except Exception:
return jsonify({"status": "error", "message": "Invalid JSON"}), 400
symbol = data.get("symbol", SYMBOL_DEFAULT)
duration_minutes = data.get("duration_minutes")
tp = data.get("tp")
sl = data.get("sl")
price_messages = data.get("price_messages")
if duration_minutes is None:
return jsonify({"status": "error", "message": "duration_minutes is required"}), 400
try:
duration_minutes = int(duration_minutes)
if duration_minutes <= 0:
raise ValueError
except Exception:
return jsonify({"status": "error", "message": "duration_minutes must be a positive integer"}), 400
def to_float_or_none(x):
if x is None:
return None
try:
return float(x)
except Exception:
return None
tp = to_float_or_none(tp)
sl = to_float_or_none(sl)
final_waypoints = None
if price_messages is not None:
if not isinstance(price_messages, list):
return jsonify({"status": "error", "message": "price_messages must be a list"}), 400
final_waypoints = []
for w in price_messages:
if not isinstance(w, dict) or "price" not in w:
return jsonify({"status": "error", "message": "Each price_messages item must be an object with 'price' field"}), 400
try:
p = float(w["price"])
m = str(w.get("message", "")).strip()
final_waypoints.append({"price": p, "message": m})
except Exception:
return jsonify({"status": "error", "message": "Invalid price in price_messages"}), 400
# At least one trigger must be provided
if tp is None and sl is None and not final_waypoints:
return jsonify({"status": "error", "message": "Provide at least one of tp, sl, or price_messages"}), 400
monitor_id = start_monitor_async(
symbol=symbol,
duration_minutes=duration_minutes,
tp=tp,
sl=sl,
price_messages=final_waypoints
)
return jsonify({"status": "accepted", "monitor_id": monitor_id, "message": "Alert started. Any previous task was cancelled and replaced."}), 200
@app.route("/cancel", methods=["POST"])
def cancel_monitor():
"""
Cancel the active monitor or a specific one if monitor_id matches.
Payload: { "monitor_id": "..." } (optional)
"""
try:
data = request.get_json(force=True)
except Exception:
return jsonify({"status": "error", "message": "Invalid JSON"}), 400
req_id = data.get("monitor_id")
with ACTIVE_MONITOR_LOCK:
active_id = ACTIVE_MONITOR.get("id")
token = ACTIVE_MONITOR.get("token")
if not token:
return jsonify({"status": "error", "message": "No active monitor"}), 404
if req_id and req_id != active_id:
return jsonify({"status": "error", "message": "monitor_id does not match active task"}), 404
token.cancel()
with ACTIVE_MONITOR_LOCK:
ACTIVE_MONITOR["id"] = None
ACTIVE_MONITOR["token"] = None
return jsonify({"status": "cancelled", "monitor_id": active_id}), 200
@app.route("/", methods=["GET"])
def root():
return jsonify({
"service": "Forex Trade Alert API (async, gap-aware levels)",
"fixed_alert_webhook": ALERT_WEBHOOK_URL,
"user_message_api": MESSAGE_API_URL,
"endpoints": {
"POST /monitor": "Start monitoring with TP/SL and/or multiple price levels (waypoints). Replaces any existing task.",
"POST /cancel": "Cancel the active monitor"
},
"notes": [
"Only one task can run at a time. A new /monitor request cancels the current task and replaces it.",
"Ends on the first trigger (TP, SL, or any price message level).",
"Gap-aware: if price jumps over a level between ticks, it still triggers.",
"Faster polling (~0.3-0.5s) to catch fast moves.",
"Direction is set based on initial price to avoid double triggering.",
"On TP/SL trigger, signals2.json is cleared to [] automatically and analysis is notified in the same message."
]
}), 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860, debug=False, threaded=True)