Strategy / app.py
Dooratre's picture
Update app.py
47c0363 verified
import json
import time
import requests
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from fetch_news import get_recent_news_items, build_news_summary
from get_price import get_live_rates_for_pair
from ai_api import call_o1_ai_api
from db_news import (
fetch_json_from_github as fetch_news_json_from_github,
fetch_authenticity_token_and_commit_oid as fetch_news_auth_token_and_oid,
update_user_json_file as update_news_json_file
)
# NEW: import twitter db helpers
from db_twiter import (
fetch_json_from_github as fetch_twitter_json_from_github,
fetch_authenticity_token_and_commit_oid as fetch_twiter_auth_token_and_oid,
update_user_json_file as update_twiter_json_file
)
# Create Flask app
app = Flask(__name__)
# Global variable to store the last update result
last_update_result = {
"success": False,
"timestamp": None,
"message": "No updates yet"
}
# Simple logger helper
def log(step, msg):
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [{step}] {msg}", flush=True)
# =========================
# Screenshot helper
# =========================
SCREENSHOT_API = "https://corvo-ai-xx-sc.hf.space/capture"
def capture_screenshots(urls, width=1080, height=1920, full_page=True, timeout=400):
"""
Calls the screenshot API with one or more URLs and returns a dict:
{
'success': True/False,
'images': [ { 'srcUrl': <original>, 'imageUrl': <png> }, ... ],
'errors': [ ...raw errors... ]
}
"""
step = "SCREENSHOT"
payload = {
"urls": urls,
"width": width,
"height": height,
"fullPage": full_page
}
try:
log(step, f"Requesting screenshots for {len(urls)} URL(s): {urls} w={width} h={height} fullPage={full_page}")
resp = requests.post(SCREENSHOT_API, json=payload, timeout=timeout)
log(step, f"API status code: {resp.status_code}")
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
errors = data.get("errors", [])
log(step, f"API results count: {len(results)}, errors count: {len(errors)}")
images = []
for item in results:
out = item.get("output", {})
img_url = out.get("imageUrl")
src = item.get("url")
log(step, f"Parsed result for {src}: imageUrl={'OK' if img_url else 'MISSING'}")
if img_url:
images.append({
"srcUrl": src,
"imageUrl": img_url
})
success = len(images) > 0
log(step, f"Collected {len(images)} image(s). Success={success}")
if errors:
log(step, f"Errors: {errors}")
return {"success": success, "images": images, "errors": errors}
except Exception as e:
log(step, f"Exception: {e}")
return {"success": False, "images": [], "errors": [str(e)]}
# =========================
# Chat history builder
# =========================
def build_formatted_chat_history(
old_ai_response=None,
news_summary="",
twitter_summary="",
gold_price_data=None,
chart_images=None
):
step = "CHAT_BUILD"
log(step, "Start building chat history")
chat_history = []
# 1) role=system
log(step, "Append system role prompt")
chat_history.append({"role": "system", "content": """أنت نظام ذكاء اصطناعي احترافي متخصص في تلخيص تداول ذكية ومباشرة على الذهب XAUUSD.
مسؤولياتك الأساسية:
الأخبار والسياق والسعر:
جمع آخر النقاط الجوهرية من الأخبار المؤثرة على الذهب (الفيدرالي، التضخم، PMI، التوترات الجيوسياسية، النمو، الدولار، العوائد، سيولة السوق).
استدلال على دفة المخاطر Risk-on/Risk-off.
تحديد الاتجاه المرجّح قصير/متوسط المدى إن أمكن.
إضافة ملخص تويتر مختصر مع ذكر الحسابات ذات التأثير على الذهب/الفوركس.
التركيز على تحليل فني دقيق وتنبؤ واضح لحركة السعر الآن.
يجب أن يكون التحليل مباشرًا حول الاتجاه الذي سيذهب إليه السعر بناءً على المعطيات الحالية، مع الابتعاد عن تقديم استراتيجيات بناءً على أسعار مستقبلية.
جودة المخرجات وتنسيق XML:
استخدم العربية الفصيحة الموجزة والعملية.
لا تضف نصاً خارج عناصر XML المطلوبة.
التزم بالبنية التالية حصراً:
<news>…</news>
<twiter>…</twiter>
<ُExpired>..<ُ/Expired>
تفاصيل العناصر:
<news>
أساسية: توجه الفيدرالي، بيانات تضخم/وظائف حديثة، تحركات عوائد وسندات، شهية المخاطرة، أحداث جيوسياسية، وضع الدولار.
اختتم باستنتاج اتجاهي مرجّح وتأثيره المتوقع على XAUUSD.
إذا كانت البيانات غير محدثة، اذكر ذلك بوضوح وقدّم إطار احتمالات بدل الجزم.
<twiter>
قدم ملخصاً مختصراً لأبرز ما يتداوله على تويتر.
اذكر حسابات قامت بنشر أشياء تهمك مع ذكر المنشور @username.
ركّز على المزاج العام، التحذيرات من التقلب، أي إشارات توافق/اختلاف مع السرد الإخباري.
<ُExpired>
وضع تاريخ ووقت وسعر يُتوقع أن تنتهي فيه إذا وصل.
"""})
# 2) role=user -> chart images as multipart entries
if chart_images:
log(step, f"Appending {len(chart_images)} chart image(s) to chat")
for img in chart_images:
timeframe = img.get("timeframe", "").strip()
image_url = img.get("imageUrl")
if image_url:
chat_history.append({
"role": "user",
"type": "multipart",
"content": [
{"type": "image", "url": image_url},
{"type": "text", "text": f"هذه لقطة شاشة لزوج XAUUSD إطار زمني {timeframe}"}
]
})
log(step, f"Added image message for timeframe={timeframe}, url={image_url}")
else:
log(step, f"Skipped image missing URL for timeframe={timeframe}")
else:
log(step, "No chart images available to append")
# 3) role=user -> News + Twitter + Price
log(step, "Appending news + twitter + price text block")
user_content = "News from Tridingview News :"
if news_summary:
user_content += f"{news_summary}\n\n"
else:
user_content += "No fresh news summary available.\n\n"
if twitter_summary:
user_content += "Twitter Feed Summary:\n"
user_content += f"{twitter_summary}\n\n"
else:
user_content += "Twitter Feed Summary: No recent Twitter data available.\n\n"
if gold_price_data is not None:
user_content += (
f"Gold Price [XAUUSD]:\n"
f" Bid: {gold_price_data.get('bid')}\n"
f" Ask: {gold_price_data.get('ask')}\n"
f" Spread: {gold_price_data.get('difference')}\n"
)
chat_history.append({"role": "user", "content": user_content})
log(step, "Appended news/price block")
# 4) role=user -> OLD ANALYSIS if exists
if old_ai_response:
chat_history.append({"role": "user", "content": "OLD ANALYSIS FROM YOU:" + old_ai_response})
log(step, "Appended old AI analysis")
else:
log(step, "No old AI analysis available")
# 5) role=user -> update question
update_prompt = "هل تريد تحديث تحليلك واستراتيجيتك بناءً على هذه المعطيات؟"
chat_history.append({"role": "user", "content": update_prompt})
log(step, "Appended update question")
log(step, "Chat history build complete")
return chat_history
def update_strategy_job():
"""Function to be scheduled, runs the main logic"""
global last_update_result
step = "JOB"
log(step, "Scheduled update started")
try:
# 1) Fetch old data from news.json in GitHub
log(step, "Fetching old data from GitHub (news.json)")
old_data_response = fetch_news_json_from_github()
old_ai_response = None
if old_data_response.get("success") and old_data_response.get("data"):
log(step, "Old data fetch success")
old_data = old_data_response["data"]
old_ai_response = json.dumps(old_data, ensure_ascii=False, separators=(',', ':'))
else:
log(step, f"Old data fetch failed or empty: {old_data_response}")
old_ai_response = None
# 2) Fetch news + gold price
log(step, "Fetching recent news items")
news_items = get_recent_news_items(hours=24)
log(step, f"Fetched {len(news_items) if news_items else 0} news item(s)")
news_summary = build_news_summary(news_items)
log(step, "Built news summary")
log(step, "Fetching live gold price XAUUSD")
gold_price_data = get_live_rates_for_pair("XAUUSD")
log(step, f"Price data: {gold_price_data}")
# 2b) Fetch Twitter summary from twiter.json
log(step, "Fetching twitter summary from GitHub (twiter.json)")
twitter_summary = ""
try:
tw_resp = fetch_twitter_json_from_github()
if tw_resp.get("success") and tw_resp.get("data"):
data = tw_resp["data"]
if isinstance(data, dict) and "twiter" in data:
twitter_summary = data.get("twiter", "")
elif isinstance(data, str):
twitter_summary = data
else:
twitter_summary = json.dumps(data, ensure_ascii=False)
log(step, "Twitter summary fetched successfully")
else:
log(step, f"Twitter fetch returned no data: {tw_resp}")
twitter_summary = ""
except Exception as te:
log(step, f"Twitter fetch error: {te}")
twitter_summary = ""
# 2c) Capture chart screenshots for 15m and 1h
charts = [
{
"url": "https://corvo-ai-charts.static.hf.space/index.html?symbol=XAUUSD&interval=15&exchange=OANDA",
"timeframe": "15 دقيقة"
},
{
"url": "https://corvo-ai-charts.static.hf.space/index.html?symbol=XAUUSD&interval=60&exchange=OANDA",
"timeframe": "1 ساعة"
}
]
chart_images = []
try:
log(step, "Capturing chart screenshots (15m, 1h)")
capture_result = capture_screenshots([c["url"] for c in charts], width=1920, height=1080, full_page=False)
if capture_result.get("success"):
log(step, "Screenshot capture success")
src_to_tf = {c["url"]: c["timeframe"] for c in charts}
for img in capture_result.get("images", []):
chart_images.append({
"timeframe": src_to_tf.get(img.get("srcUrl"), ""),
"imageUrl": img.get("imageUrl")
})
log(step, f"Prepared {len(chart_images)} chart image entries for chat")
else:
errs = capture_result.get("errors")
log(step, f"Screenshot capture failed. Errors: {errs}")
except Exception as e:
log(step, f"Screenshot exception: {e}")
# 3) Build chat_history including images
log(step, "Building chat history")
chat_history = build_formatted_chat_history(
old_ai_response=old_ai_response,
news_summary=news_summary,
twitter_summary=twitter_summary,
gold_price_data=gold_price_data,
chart_images=chart_images
)
log(step, f"Chat history ready. Messages count: {len(chat_history)}")
# 4) Call AI
log(step, "Calling AI API with chat history")
ai_response, updated_chat_history = call_o1_ai_api(chat_history)
log(step, "AI API call completed")
# 5) Save AI response to GitHub in news.json
log(step, "Serializing AI response for GitHub")
result_json_object = {
"response": ai_response,
"timestamp": time.time()
}
new_content_one_line = json.dumps([result_json_object], ensure_ascii=False, separators=(',', ':'))
log(step, f"Serialized content length: {len(new_content_one_line)}")
# 6) Get authenticity_token and commit_oid
log(step, "Fetching authenticity token and commit oid for news.json")
token, commit_oid = fetch_news_auth_token_and_oid()
if not token or not commit_oid:
last_update_result = {
"success": False,
"timestamp": time.time(),
"message": "Could not fetch authenticity token or commit oid for news.json."
}
log(step, last_update_result['message'])
return
log(step, "Fetched token and commit oid successfully")
# 7) Update GitHub file
log(step, "Updating news.json on GitHub")
update_result = update_news_json_file(token, commit_oid, new_content_one_line)
if update_result.get("success"):
last_update_result = {
"success": True,
"timestamp": time.time(),
"message": "Updated news.json successfully!",
"response": ai_response
}
log(step, last_update_result['message'])
else:
last_update_result = {
"success": False,
"timestamp": time.time(),
"message": f"Failed to update news.json: {update_result.get('message')}"
}
log(step, last_update_result['message'])
except Exception as e:
last_update_result = {
"success": False,
"timestamp": time.time(),
"message": f"Error during update: {str(e)}"
}
log(step, last_update_result['message'])
# API routes
@app.route('/status', methods=['GET'])
def get_status():
"""Returns the status of the last update"""
log("API", "GET /status called")
return jsonify({
"success": last_update_result.get("success", False),
"timestamp": last_update_result.get("timestamp"),
"message": last_update_result.get("message"),
"last_updated": time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(last_update_result.get("timestamp")))
if last_update_result.get("timestamp") else None
})
@app.route('/force-update', methods=['GET'])
def force_update():
"""Force an immediate update"""
log("API", "GET /force-update called")
update_strategy_job()
return jsonify({
"success": last_update_result.get("success", False),
"message": "Update job triggered",
"result": last_update_result
})
@app.route('/latest', methods=['GET'])
def get_latest():
"""Get the latest strategy response"""
log("API", "GET /latest called")
if "response" in last_update_result:
return jsonify({
"success": True,
"timestamp": last_update_result.get("timestamp"),
"response": last_update_result.get("response"),
"last_updated": time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(last_update_result.get("timestamp")))
})
else:
return jsonify({
"success": False,
"message": "No strategy data available yet"
})
# Setup the scheduler
def init_scheduler():
"""Initialize and start the scheduler"""
scheduler = BackgroundScheduler()
scheduler.add_job(func=update_strategy_job, trigger="interval", hours=1)
scheduler.start()
log("SCHED", "Scheduler started: will run every hour")
# Run once immediately on startup
log("SCHED", "Running initial update job on startup")
update_strategy_job()
if __name__ == "__main__":
# Initialize the scheduler before starting the Flask app
init_scheduler()
# Start the Flask app
log("APP", "Starting Flask server on port 7860")
app.run(host='0.0.0.0', port=7860)