import json import time import requests from flask import Flask, jsonify from apscheduler.schedulers.background import BackgroundScheduler from fetch_news import get_recent_news_items, build_news_summary from get_price import get_live_rates_for_pair from ai_api import call_o1_ai_api from db_news import ( fetch_json_from_github as fetch_news_json_from_github, fetch_authenticity_token_and_commit_oid as fetch_news_auth_token_and_oid, update_user_json_file as update_news_json_file ) # NEW: import twitter db helpers from db_twiter import ( fetch_json_from_github as fetch_twitter_json_from_github, fetch_authenticity_token_and_commit_oid as fetch_twiter_auth_token_and_oid, update_user_json_file as update_twiter_json_file ) # Create Flask app app = Flask(__name__) # Global variable to store the last update result last_update_result = { "success": False, "timestamp": None, "message": "No updates yet" } # Simple logger helper def log(step, msg): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [{step}] {msg}", flush=True) # ========================= # Screenshot helper # ========================= SCREENSHOT_API = "https://corvo-ai-xx-sc.hf.space/capture" def capture_screenshots(urls, width=1080, height=1920, full_page=True, timeout=400): """ Calls the screenshot API with one or more URLs and returns a dict: { 'success': True/False, 'images': [ { 'srcUrl': , 'imageUrl': }, ... ], 'errors': [ ...raw errors... ] } """ step = "SCREENSHOT" payload = { "urls": urls, "width": width, "height": height, "fullPage": full_page } try: log(step, f"Requesting screenshots for {len(urls)} URL(s): {urls} w={width} h={height} fullPage={full_page}") resp = requests.post(SCREENSHOT_API, json=payload, timeout=timeout) log(step, f"API status code: {resp.status_code}") resp.raise_for_status() data = resp.json() results = data.get("results", []) errors = data.get("errors", []) log(step, f"API results count: {len(results)}, errors count: {len(errors)}") images = [] for item in results: out = item.get("output", {}) img_url = out.get("imageUrl") src = item.get("url") log(step, f"Parsed result for {src}: imageUrl={'OK' if img_url else 'MISSING'}") if img_url: images.append({ "srcUrl": src, "imageUrl": img_url }) success = len(images) > 0 log(step, f"Collected {len(images)} image(s). Success={success}") if errors: log(step, f"Errors: {errors}") return {"success": success, "images": images, "errors": errors} except Exception as e: log(step, f"Exception: {e}") return {"success": False, "images": [], "errors": [str(e)]} # ========================= # Chat history builder # ========================= def build_formatted_chat_history( old_ai_response=None, news_summary="", twitter_summary="", gold_price_data=None, chart_images=None ): step = "CHAT_BUILD" log(step, "Start building chat history") chat_history = [] # 1) role=system log(step, "Append system role prompt") chat_history.append({"role": "system", "content": """أنت نظام ذكاء اصطناعي احترافي متخصص في تلخيص تداول ذكية ومباشرة على الذهب XAUUSD. مسؤولياتك الأساسية: الأخبار والسياق والسعر: جمع آخر النقاط الجوهرية من الأخبار المؤثرة على الذهب (الفيدرالي، التضخم، PMI، التوترات الجيوسياسية، النمو، الدولار، العوائد، سيولة السوق). استدلال على دفة المخاطر Risk-on/Risk-off. تحديد الاتجاه المرجّح قصير/متوسط المدى إن أمكن. إضافة ملخص تويتر مختصر مع ذكر الحسابات ذات التأثير على الذهب/الفوركس. التركيز على تحليل فني دقيق وتنبؤ واضح لحركة السعر الآن. يجب أن يكون التحليل مباشرًا حول الاتجاه الذي سيذهب إليه السعر بناءً على المعطيات الحالية، مع الابتعاد عن تقديم استراتيجيات بناءً على أسعار مستقبلية. جودة المخرجات وتنسيق XML: استخدم العربية الفصيحة الموجزة والعملية. لا تضف نصاً خارج عناصر XML المطلوبة. التزم بالبنية التالية حصراً: <ُExpired>..<ُ/Expired> تفاصيل العناصر: أساسية: توجه الفيدرالي، بيانات تضخم/وظائف حديثة، تحركات عوائد وسندات، شهية المخاطرة، أحداث جيوسياسية، وضع الدولار. اختتم باستنتاج اتجاهي مرجّح وتأثيره المتوقع على XAUUSD. إذا كانت البيانات غير محدثة، اذكر ذلك بوضوح وقدّم إطار احتمالات بدل الجزم. قدم ملخصاً مختصراً لأبرز ما يتداوله على تويتر. اذكر حسابات قامت بنشر أشياء تهمك مع ذكر المنشور @username. ركّز على المزاج العام، التحذيرات من التقلب، أي إشارات توافق/اختلاف مع السرد الإخباري. <ُExpired> وضع تاريخ ووقت وسعر يُتوقع أن تنتهي فيه إذا وصل. """}) # 2) role=user -> chart images as multipart entries if chart_images: log(step, f"Appending {len(chart_images)} chart image(s) to chat") for img in chart_images: timeframe = img.get("timeframe", "").strip() image_url = img.get("imageUrl") if image_url: chat_history.append({ "role": "user", "type": "multipart", "content": [ {"type": "image", "url": image_url}, {"type": "text", "text": f"هذه لقطة شاشة لزوج XAUUSD إطار زمني {timeframe}"} ] }) log(step, f"Added image message for timeframe={timeframe}, url={image_url}") else: log(step, f"Skipped image missing URL for timeframe={timeframe}") else: log(step, "No chart images available to append") # 3) role=user -> News + Twitter + Price log(step, "Appending news + twitter + price text block") user_content = "News from Tridingview News :" if news_summary: user_content += f"{news_summary}\n\n" else: user_content += "No fresh news summary available.\n\n" if twitter_summary: user_content += "Twitter Feed Summary:\n" user_content += f"{twitter_summary}\n\n" else: user_content += "Twitter Feed Summary: No recent Twitter data available.\n\n" if gold_price_data is not None: user_content += ( f"Gold Price [XAUUSD]:\n" f" Bid: {gold_price_data.get('bid')}\n" f" Ask: {gold_price_data.get('ask')}\n" f" Spread: {gold_price_data.get('difference')}\n" ) chat_history.append({"role": "user", "content": user_content}) log(step, "Appended news/price block") # 4) role=user -> OLD ANALYSIS if exists if old_ai_response: chat_history.append({"role": "user", "content": "OLD ANALYSIS FROM YOU:" + old_ai_response}) log(step, "Appended old AI analysis") else: log(step, "No old AI analysis available") # 5) role=user -> update question update_prompt = "هل تريد تحديث تحليلك واستراتيجيتك بناءً على هذه المعطيات؟" chat_history.append({"role": "user", "content": update_prompt}) log(step, "Appended update question") log(step, "Chat history build complete") return chat_history def update_strategy_job(): """Function to be scheduled, runs the main logic""" global last_update_result step = "JOB" log(step, "Scheduled update started") try: # 1) Fetch old data from news.json in GitHub log(step, "Fetching old data from GitHub (news.json)") old_data_response = fetch_news_json_from_github() old_ai_response = None if old_data_response.get("success") and old_data_response.get("data"): log(step, "Old data fetch success") old_data = old_data_response["data"] old_ai_response = json.dumps(old_data, ensure_ascii=False, separators=(',', ':')) else: log(step, f"Old data fetch failed or empty: {old_data_response}") old_ai_response = None # 2) Fetch news + gold price log(step, "Fetching recent news items") news_items = get_recent_news_items(hours=24) log(step, f"Fetched {len(news_items) if news_items else 0} news item(s)") news_summary = build_news_summary(news_items) log(step, "Built news summary") log(step, "Fetching live gold price XAUUSD") gold_price_data = get_live_rates_for_pair("XAUUSD") log(step, f"Price data: {gold_price_data}") # 2b) Fetch Twitter summary from twiter.json log(step, "Fetching twitter summary from GitHub (twiter.json)") twitter_summary = "" try: tw_resp = fetch_twitter_json_from_github() if tw_resp.get("success") and tw_resp.get("data"): data = tw_resp["data"] if isinstance(data, dict) and "twiter" in data: twitter_summary = data.get("twiter", "") elif isinstance(data, str): twitter_summary = data else: twitter_summary = json.dumps(data, ensure_ascii=False) log(step, "Twitter summary fetched successfully") else: log(step, f"Twitter fetch returned no data: {tw_resp}") twitter_summary = "" except Exception as te: log(step, f"Twitter fetch error: {te}") twitter_summary = "" # 2c) Capture chart screenshots for 15m and 1h charts = [ { "url": "https://corvo-ai-charts.static.hf.space/index.html?symbol=XAUUSD&interval=15&exchange=OANDA", "timeframe": "15 دقيقة" }, { "url": "https://corvo-ai-charts.static.hf.space/index.html?symbol=XAUUSD&interval=60&exchange=OANDA", "timeframe": "1 ساعة" } ] chart_images = [] try: log(step, "Capturing chart screenshots (15m, 1h)") capture_result = capture_screenshots([c["url"] for c in charts], width=1920, height=1080, full_page=False) if capture_result.get("success"): log(step, "Screenshot capture success") src_to_tf = {c["url"]: c["timeframe"] for c in charts} for img in capture_result.get("images", []): chart_images.append({ "timeframe": src_to_tf.get(img.get("srcUrl"), ""), "imageUrl": img.get("imageUrl") }) log(step, f"Prepared {len(chart_images)} chart image entries for chat") else: errs = capture_result.get("errors") log(step, f"Screenshot capture failed. Errors: {errs}") except Exception as e: log(step, f"Screenshot exception: {e}") # 3) Build chat_history including images log(step, "Building chat history") chat_history = build_formatted_chat_history( old_ai_response=old_ai_response, news_summary=news_summary, twitter_summary=twitter_summary, gold_price_data=gold_price_data, chart_images=chart_images ) log(step, f"Chat history ready. Messages count: {len(chat_history)}") # 4) Call AI log(step, "Calling AI API with chat history") ai_response, updated_chat_history = call_o1_ai_api(chat_history) log(step, "AI API call completed") # 5) Save AI response to GitHub in news.json log(step, "Serializing AI response for GitHub") result_json_object = { "response": ai_response, "timestamp": time.time() } new_content_one_line = json.dumps([result_json_object], ensure_ascii=False, separators=(',', ':')) log(step, f"Serialized content length: {len(new_content_one_line)}") # 6) Get authenticity_token and commit_oid log(step, "Fetching authenticity token and commit oid for news.json") token, commit_oid = fetch_news_auth_token_and_oid() if not token or not commit_oid: last_update_result = { "success": False, "timestamp": time.time(), "message": "Could not fetch authenticity token or commit oid for news.json." } log(step, last_update_result['message']) return log(step, "Fetched token and commit oid successfully") # 7) Update GitHub file log(step, "Updating news.json on GitHub") update_result = update_news_json_file(token, commit_oid, new_content_one_line) if update_result.get("success"): last_update_result = { "success": True, "timestamp": time.time(), "message": "Updated news.json successfully!", "response": ai_response } log(step, last_update_result['message']) else: last_update_result = { "success": False, "timestamp": time.time(), "message": f"Failed to update news.json: {update_result.get('message')}" } log(step, last_update_result['message']) except Exception as e: last_update_result = { "success": False, "timestamp": time.time(), "message": f"Error during update: {str(e)}" } log(step, last_update_result['message']) # API routes @app.route('/status', methods=['GET']) def get_status(): """Returns the status of the last update""" log("API", "GET /status called") return jsonify({ "success": last_update_result.get("success", False), "timestamp": last_update_result.get("timestamp"), "message": last_update_result.get("message"), "last_updated": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_update_result.get("timestamp"))) if last_update_result.get("timestamp") else None }) @app.route('/force-update', methods=['GET']) def force_update(): """Force an immediate update""" log("API", "GET /force-update called") update_strategy_job() return jsonify({ "success": last_update_result.get("success", False), "message": "Update job triggered", "result": last_update_result }) @app.route('/latest', methods=['GET']) def get_latest(): """Get the latest strategy response""" log("API", "GET /latest called") if "response" in last_update_result: return jsonify({ "success": True, "timestamp": last_update_result.get("timestamp"), "response": last_update_result.get("response"), "last_updated": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_update_result.get("timestamp"))) }) else: return jsonify({ "success": False, "message": "No strategy data available yet" }) # Setup the scheduler def init_scheduler(): """Initialize and start the scheduler""" scheduler = BackgroundScheduler() scheduler.add_job(func=update_strategy_job, trigger="interval", hours=1) scheduler.start() log("SCHED", "Scheduler started: will run every hour") # Run once immediately on startup log("SCHED", "Running initial update job on startup") update_strategy_job() if __name__ == "__main__": # Initialize the scheduler before starting the Flask app init_scheduler() # Start the Flask app log("APP", "Starting Flask server on port 7860") app.run(host='0.0.0.0', port=7860)