#================================================== # IMPORT: TEST #================================================== import asyncio from ably import AblyRest def token_ably(apikey, token): try: ably = AblyRest(apikey) ##client_id = request.args.get("clientId") ##client_id = request.json.get('clientId') client_id = request.headers.get('clientId') ##client_id = generate_random_id() token_request_obj = asyncio.run(ably.auth.create_token_request({ "client_id": client_id, "ttl": 60 * 60 * 1000 * 24, # ⏱️ 有效 24 小時(60 分 x 60 秒 x 1000 毫秒) })) # 非匿名 token_request_dict = token_request_obj.to_dict() # 轉成 dict token_request_dict["token"] = token # 新增一個 key-value return jsonify(token_request_dict) except Exception as e: return jsonify({"status": "fail", "reason": "Invalid link", "detail": str(e)}), 403 import stanza ##safe nlp = stanza.Pipeline("en") ##safe ##doc = nlp(text) ##for sent in doc.sentences: def get_dependencies(doc): deps = [] for sent in doc.sentences: for w in sent.words: head_word = None if w.head == 0 else sent.words[w.head - 1] deps.append({ "sent_id": sent.id if hasattr(sent, "id") else None, "dependent_id": w.id, "dependent": w.text, "head_id": w.head, "head": "ROOT" if w.head == 0 else head_word.text, "deprel": w.deprel }) return deps def get_dependencies_v0(sent): deps = [] for w in sent.words: head_word = None if w.head == 0 else sent.words[w.head - 1] deps.append({ "dependent_id": w.id, "dependent": w.text, "head_id": w.head, "head": "ROOT" if w.head == 0 else head_word.text, "deprel": w.deprel }) return deps #================================================== # IMPORT: ALL #================================================== from urllib.parse import urlencode #webpush import requests #deurl #webpush from requests.exceptions import RequestException #deurl import re #deurl ##import socket from gradio_client import Client #X from huggingface_hub import HfApi #dataset import os #SECRET from cryptography.fernet import Fernet #FUNCTION import hashlib #FUNCTION import json #FUNCTION #webpush import base64 #FUNCTION import string #FUNCTION import random #FUNCTION import ast #FUNCTION import pandas as pd #FUNCTION import io #dataset ##import json from pywebpush import webpush, WebPushException #webpush ##import requests from google import genai #llms from google.genai.errors import APIError #llms #================================================== # SECRET: ALL #================================================== def checkSecret(SECRET): if SECRET: # 遮罩 Token 顯示前 5 個字符,其餘用 *** 代替 masked_token = SECRET[:3] + "***" + SECRET[-3:] # ⚠️ 僅在您自己檢查 Logs 時使用,檢查完畢後應刪除此行或註釋掉! print(f"SECRET: {masked_token}") # 繼續運行您的應用程式... return SECRET else: print(f"SECRET: The 'SECRET' is not a setting.") return False GS_ID = os.getenv("GOOGLE_SHEET_ID");checkSecret(GS_ID) HF_READ = os.getenv("HUGGINGFACE_READ_TOKEN");checkSecret(HF_READ) HF_WRITE = os.getenv("HUGGINGFACE_WRITE_TOKEN");checkSecret(HF_WRITE) OC_APIKEY = os.getenv("OPENAI_CHATGPT_APIKEY");checkSecret(OC_APIKEY) AR_APIKEY = os.environ.get("ABLY_REALTIME_APIKEY");checkSecret(AR_APIKEY) TG_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN");checkSecret(TG_TOKEN) NS_SECRET = os.getenv("NINS_CRYPT_SECRET");checkSecret(NS_SECRET) #webpush #32bit! VAPID_PUBLIC = os.getenv("VAPID_PUBLIC_KEY");checkSecret(VAPID_PUBLIC) #webpush VAPID_PRIVATE = os.getenv("VAPID_PRIVATE_KEY");checkSecret(VAPID_PRIVATE) #webpush VAPID_CLAIMS = { "sub": "mailto:duguaynins@gmail.com", ##"aud": "https://copyright.nins.cc" } key = NS_SECRET cipher = Fernet(key) print(VAPID_CLAIMS) #================================================== # FUNCTION: ALL #================================================== ##import json def str_to_json(string): return json.loads(string) def json_to_str(object): return json.dumps(object) ##import json ##import ast def inputs_to_dict(inputs): # 如果本來是 dict 直接用 if isinstance(inputs, dict): pass return inputs # 如果是 JSON 字串 elif isinstance(inputs, str): try: inputs = str_to_json(inputs) # 先試 JSON except json.JSONDecodeError: inputs = ast.literal_eval(inputs) # 再試 Python dict 格式 return inputs # 如果都不是 else: raise ValueError(f"Cannot convert to dict: {inputs}") ##import string ##import random def generate_random_id(): azAZ = string.ascii_letters # abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ random_id = ''.join(random.choices(azAZ, k=32)) return random_id ##import base64 def image_to_base64(path): with open(path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") def dohash_data(data: str) -> str: # "hello".encode("utf-8") # b"hello".decode("utf-8") # "utf-8", "ascii", "big5", "" # hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha512, hashlib.blake2b *** # bytes only!!! return hashlib.sha256(data.encode("utf-8")).hexdigest() def encrypt_data(data: str) -> str: return cipher.encrypt(data.encode("utf-8")).decode("utf-8") def decrypt_data(data: str) -> str: return cipher.decrypt(data.encode("utf-8")).decode("utf-8") def encrypt_data_to_bytes(data: str) -> bytes: return cipher.encrypt(data.encode("utf-8")) ##search_sheet(gid, index="index", target="webpush", signal="tag", verify="***") def search_sheet(gid, index, target, signal, verify, value): ##token_channel ##index='email', target='webpush', signal='time', verify="*O*" url = f'https://docs.google.com/spreadsheets/d/{gid}/export?format=csv' df = pd.read_csv(url, encoding='utf-8', dtype=str, keep_default_na=False) # 顯示重複、清理重複 ##duplicates = df[df[index].duplicated(keep=False)] ##print(f"duplicated: \n{duplicates}") df = df.drop_duplicates(subset=[index], keep='last') ##keep='first' ##只保留最後的重複值 # 篩選並取出 status 為 "正常" ##df = df[df["C"] != "HEHE"] ##只留不等於HEHE df = df[df[signal].isin(["***", "*O*", verify])] ##"TEST" ##驗證欄位 values = df[index].tolist() values = set(values) value = value.lower().strip() print(f"{len(values)} >>> {values}") if value in values: ##return True search = dict(zip(df[index], df[target])) webpush = search.get(value) return webpush else: ##return False return None def upload_to_hf(path_or_fileobj, path_in_repo, repo_id, token=None): token = token or HF_WRITE path_or_fileobj.seek(0) # 確保從開頭讀 api = HfApi() api.upload_file( path_or_fileobj=path_or_fileobj, path_in_repo=path_in_repo, repo_id=repo_id, repo_type="dataset", token=token ) return True #================================================== # FUNCTION: MODEL.SYS #================================================== from sentence_transformers import SentenceTransformer, util ##import gradio as gr # 載入多語言模型 ##model = SentenceTransformer("sentence-transformers/multi-qa-MiniLM-L6-cos-v1") # 90.9M ##model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") # 90.9M ##model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') # 50+ # 471M # Microsoft Research / UKP Lab ##model = SentenceTransformer('sentence-transformers/distiluse-base-multilingual-cased-v2') # 50+ # 539M # Hugging Face / UKP Lab ##model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-mpnet-base-v2") # 50+* # 1.11GB # Microsoft Research / UKP Lab model_meta = SentenceTransformer("sentence-transformers/stsb-xlm-r-multilingual") # 100+* # 1.11G # Trained by Sentence-Transformers based on XLM-RoBERTa model_goog = SentenceTransformer("sentence-transformers/LaBSE") # 100+* # 1.88GB # GOOGLE Research """ # 定義計算相似度的函數 def semantic_similarity(inputs): print(inputs);print(type(inputs)) inputs = inputs_to_dict(inputs) print(inputs);print(type(inputs)) sent1, sent2 = inputs.values() embeddings = model.encode([sent1, sent2], convert_to_tensor=True) cos_sim = util.pytorch_cos_sim(embeddings[0], embeddings[1]) return round(cos_sim.item(), 4) """ # 定義計算相似度的函數 def semantic_similarity(embeddings): ##sent1, sent2 = inputs.values() ##embeddings = model.encode([sent1, sent2], convert_to_tensor=True) cos_sim = util.pytorch_cos_sim(embeddings[0], embeddings[1]) return round(cos_sim.item(), 4) # float, int, str, dict def fn_main_ds(inputs): ##print(inputs);print(type(inputs)) ##20260402 inputs = inputs_to_dict(inputs) ##print(inputs);print(type(inputs)) ##20260402 sent1, sent2 = inputs.values() sim1 = semantic_similarity(model_goog.encode([sent1, sent2], convert_to_tensor=True)) sim2 = semantic_similarity(model_meta.encode([sent1, sent2], convert_to_tensor=True)) return {"goog": sim1, "meta": sim2} ##20260420 return str('{' + 'goog:' + str(sim1) + '||' + 'meta:' + str(sim2) + '}') # float, int, str, dict return json.dumps({ ##20260420 "goog": sim1, "meta": sim2 }) #================================================== # FUNCTION: LLMs.SYS #================================================== ##import requests def openai_text_generate(data): ##data = request.json if not data: return {"error": "data?"}, 400 if not data.get("system"): return {"error": "system?"}, 400 if not data.get("user"): return {"error": "user?"}, 400 if not data.get("models"): return {"error": "models?"}, 400 ## if not data.get("apikey"): return {"error": "apikey?"}, 400 models = data.get("models", "gpt-4o-mini") system = data.get("system", "sys prompt") user = data.get("user", "usr prompt") apikey = data.get("apikey", os.getenv("OPENAI_CHATGPT_APIKEY")) ##不會使用到後者 headers = { ##"Content-Type": "application/json", "Authorization": f"Bearer {os.getenv('OPENAI_CHATGPT_APIKEY')}" "Content-Type": "application/json", "Authorization": f"Bearer {apikey}" } payload = { "model": f"{models}", ##"model": "gpt-4o-mini", ##"model": "gpt-3.5-turbo", ##"model": "gpt-4o-nano", "messages": [ ##{"role": "system", "content": f"You are a professional translator who can translate any language into grammatically perfect English."}, {"role": "system", "content": f"{system}"}, {"role": "user", "content": f"{user}"}, ], "max_tokens": 500, "temperature": 0, # 翻譯建議使用 0 } response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) if response.status_code == 200: return response.json()['choices'][0]['message']['content'].strip() result = response.json()['choices'][0]['message']['content'].strip() # 直接返回 dict,不用 jsonify return { "system": data.get('system'), "user": data.get('user'), "result": result, } else: ##return jsonify({"error": response.text}), response.status_code return {"error": response.text}, response.status_code ##from google import genai ##from google.genai.errors import APIError def google_text_generation(data): ##data = request.json if not data: return {"error": "data?"}, 400 if not data.get("system"): return {"error": "system?"}, 400 if not data.get("user"): return {"error": "user?"}, 400 if not data.get("models"): return {"error": "models?"}, 400 ## if not data.get("apikey"): return {"error": "apikey?"}, 400 models = data.get("models", "gemini-1.5-flash") system = data.get("system", "sys prompt") user = data.get("user", "usr prompt") apikey = data.get("apikey", os.getenv("GOOGLE_GEMINI_APIKEY")) ##不會使用到後者 ##client = genai.Client(api_key=os.getenv("GOOGLE_GEMINI_APIKEY")) client = genai.Client(api_key=apikey) # --- 步驟 3: 設定和呼叫模型 --- ''' # 呼叫模型生成內容 response = client.models.generate_content( model=models, contents=prompt ) ''' # 呼叫模型生成內容 response = client.models.generate_content( model=models, config={"system_instruction": system}, contents=user, ) # 輸出模型的結果 ##print("--- 模型回覆 ---") ##20260402 ##print(response.text) ##20260402 ##print("---------------") ##20260402 return response.text """ from transformers import pipeline # 建立文字生成管線 generator = pipeline("text-generation", model="gpt2") # 可換成你喜歡的 HF 模型 def pipeline_text_generation(data): user = data.get("user", "auto") result = generator(user, max_length=100, do_sample=True, top_p=0.9) return result[0]['generated_text'] ##data.result.confidences ##data是JS變數名稱 ##const data = await HuggingFaceAPI(...); """ def fn_main_llms(inputs): inputs = inputs_to_dict(inputs)##print(inputs) ##20260402 ##return pipeline_text_generation(inputs) if inputs.get("models").lower().startswith("gemini"): return google_text_generation(inputs) elif inputs.get("models").lower().startswith("gpt"): return openai_text_generate(inputs) else: return "???" #================================================== # FUNCTION: DATASET.SYS #================================================== ##from cryptography.fernet import Fernet ##import hashlib ##import io def update_file(id, data, path_in_repo, repo_id, folder="default"): try: encrypted_data = encrypt_data(data) decrypted_data = decrypt_data(encrypted_data) content = io.BytesIO(encrypted_data.encode("utf-8")) ##text_content ##content = io.BytesIO( encrypt_data_to_bytes(data) ) ##*** ##user = dohash_data( id.strip().lower().encode("utf-8") ) user = dohash_data( id.strip().lower() ) ##path_in_repo = path_in_repo or f"{folder}/{user}/data.txt" path_in_repo = f"{folder}/{user}/data.txt" repo_id = repo_id # f"duguaynins/dataset-v20260406" if not repo_id: return 404 res = upload_to_hf(path_or_fileobj=content, path_in_repo=path_in_repo, repo_id=repo_id, token=None ) print("HF result:", res) print("DEBUG", flush=True) return 200 except Exception as e: return 500 def fn_main_dataset(inputs): id, data, path_in_repo, repo_id, folder = ( inputs["id"], inputs["data"], inputs["path_in_repo"], inputs["repo_id"], inputs["folder"] ) return update_file(id, data, path_in_repo, repo_id, folder) #================================================== # FUNCTION: WEBPUSH.SYS #================================================== ##VAPID_PUBLIC_KEY = VAPID_PUBLIC ##VAPID_PRIVATE_KEY = VAPID_PRIVATE def send_push_notifications(string): """ 批次推播通知,使用全域變數 subscriptions、payload、VAPID_PRIVATE_KEY、VAPID_CLAIMS """ payload = str_to_json(string) body = payload.get("data", {}).get("body", "") params = { ##追加網址的參數 ##20260414 "userInput": payload["data"]["body"] } payload["data"]["title"] = "nins/©" ##payload["data"]["url"] = "https://copyright.nins.cc/" ##鎖定網址的推放 payload["data"]["url"] = ( ##追加網址的參數 ##20260414 "https://copyright.nins.cc/?" + urlencode(params) ) data = json_to_str( payload.get("data", {}) ) subscriptions = payload.get("subscription", []) email = payload.get("data", {}).get("mail", "") ##email for sub in subscriptions: try: ##if type(sub) == str: ##pass ##進行解碼 if isinstance(sub, str): sub = decrypt_data(sub) ##20260406 sub = str_to_json(sub) ##20260407 if isinstance(sub, dict): verify = encrypt_data( json_to_str(sub) ) email_hash = hashlib.sha256(email.strip().lower().encode("utf-8")).hexdigest() url = f'https://huggingface.co/datasets/duguaynins/dataset-v20260406/resolve/main/{email_hash}/data.txt' url = f'https://huggingface.co/datasets/duguaynins/nins/resolve/main/{email_hash}/data.txt' if verify==requests.get(url).text: pass ##else: continue print(type(sub), sub) webpush( data=data, ##json_to_str(payload) subscription_info=sub, ##str_to_json(sub) ##subscription vapid_private_key=VAPID_PRIVATE, vapid_claims=VAPID_CLAIMS ) print("推播成功:", sub) ##20260402 continue ##return "O" except WebPushException as ex: print("推播失敗:", sub) ##20260402 print(ex) ##20260402 print("\n\n\n") continue ##return "X" if True: return f"{len(subscriptions)} * Subscribed" return "end." # 呼叫函數 ##send_push_notifications() def fn_main_webpush(inputs): return send_push_notifications(inputs) #================================================== # FUNCTION: DEURL.SYS #================================================== import httpx import urllib.parse from urllib.parse import urlparse, urljoin # 👈 加上這一行就解決了 import traceback # 👈 記得導入這個 from typing import Tuple, Dict def expand_urls(url): try: r = requests.get( url, headers={"User-Agent": "Mozilla/5.0"}, timeout=10 ) r.raise_for_status() except RequestException as e: print(f"[WARN] 無法抓取 {url} : {e}") return [] html = r.text base_domain = urlparse(url).netloc urls = re.findall(r'(?:href|src)=["\'](.*?)["\']', html, re.IGNORECASE) external_urls = set() for u in urls: full_url = urljoin(url, u) # 把相對路徑轉成完整網址 parsed = urlparse(full_url) if parsed.scheme not in ("http", "https"): continue if parsed.netloc != base_domain: # 排除同網域 ##external_urls.add(full_url) external_urls.add(parsed.netloc) if len(external_urls) >= 10: break return list(external_urls) def expand_url(url): try: with httpx.Client( ##transport=IPv4Only(), follow_redirects=True, timeout=10, max_redirects=5, # 限制跳轉次數,防止死循環 verify=True, # ✅ 正常開啟 headers={ "User-Agent": "Mozilla/5.0" } ) as client: r = client.get(url) return str(r.url) except httpx.RequestError as e: ##return f"Request error: {e}" return None except Exception as e: ##return f"Error: {e}" return None def expand_url_redirectcheck_io(url: str) -> str: ##expand_url_redirectcheck base_api = "https://redirectcheck.io/api/analyze" params = {"url": url} try: with httpx.Client(timeout=20) as client: resp = client.get(base_api, params=params) data = resp.json() # 取出 chain summary 中的 finalUrl return data.get("summary", {}).get("finalUrl", "-Unable to get the final URL-") except Exception as e: ##return f"錯誤: {e}" ##raise e return None def expand_url_unshorten_me(target_url): ##get_final_url_via_provider # 使用第三方 API 幫我們跑路 # 這裡以一個簡單的網址展開服務為例 api_url = f"https://unshorten.me/s/{target_url}" try: with httpx.Client(timeout=20) as client: resp = client.get(api_url) # 它會直接回傳最終網址的純文字 final_url = resp.text.strip() return final_url except Exception as e: ##return f"第三方查詢失敗: {e}" ##raise e return None def fn_main_deurl(inputs): print("[START]") urls = expand_urls(inputs) ##if urls: return urls urla = expand_url(inputs) if urla: return {"url":[urla], "urls":urls} urlb = expand_url_redirectcheck_io(inputs) if urlb: return {"url":[urlb], "urls":urls} urlc = expand_url_unshorten_me(inputs) if urlc: return {"url":[urlc], "urls":urls} print("[END]") #================================================== # ROUTE.SYS #================================================== from flask import Flask, render_template, redirect, url_for, request, jsonify, send_file from flask_cors import CORS app = Flask(__name__) CORS(app) # 允許所有來源,這對測試最方便 # 404 發生時,自動導回首頁 @app.errorhandler(404) def page_not_found(e): return redirect(url_for("home")) ##def home(): # 正常頁面 @app.route("/") def home(): return render_template("index.html") @app.route("/about") def about(): return render_template("about.html") @app.route("/contact") def contact(): return render_template("contact.html") @app.route("/MultipleEmbeddings/predict", methods=["POST"]) # 改! @app.route("/embeddings/multiple_v1", methods=["POST"]) # 改! def MultipleEmbeddings(): # 改! client = Client(src="https://duguaynins-MultipleEmbeddings-v260331.hf.space", hf_token=HF_READ) # 初始化 result = client.predict( **request.json, # 將 JSON dict 展開成 fn 的參數 api_name="/fn_main", # 如果知道 api_name,則 fn_index 可設定為 None fn_index=None, ##data=input_data, ) return jsonify({"result": result}) @app.route('/generate/llms_v1', methods=['POST']) def generate_llms(): client = Client(src="https://duguaynins-MultipleLLMs-v260401.hf.space", hf_token=HF_READ) # 初始化 result = client.predict( **request.json, # 將 JSON dict 展開成 fn 的參數 api_name="/fn_main", # 如果知道 api_name,則 fn_index 可設定為 None ##fn_index=0, # 如果知道 api_name,則 fn_index 可設定為 None ##data=input_data, ) return jsonify({"result": result}) """ @app.route('/generate/llms', methods=['POST']) ##??? def generate_llms(): ##??? inputs = request.json.get('inputs') result = fn_main_llms(inputs) ##??? return jsonify({"result": result}) """ ##{"inputs": {"system":_,"user":_,} } ##...{"user": {"source":_,"target":_,"en":_,} } @app.route('/generate/mqm', methods=['POST']) def generate_mqm(): ##??? ##data buffer = request.json.get('inputs') doc = nlp(buffer.get('intermediate', 'The admin is duguaynins.')) dep = get_dependencies(doc) #data.pop(list(data.items())[1][0], None) #data.pop(list(data.items())[3][0], None) #data.pop(list(data.items())[5][0], None) inputs = { "system": f"{os.getenv('NINS_PROMPT_MQM')}\n{buffer.get('system')}", ##source ##list(data.items())[0][0] "user": f"{dep}\n{buffer.get('user')}", "models": "gpt-4o-mini", ##gpt-4o-mini } result = fn_main_llms(inputs) ##??? result = str_to_json(result) ##ing merge = {} merge['MQM'] = result merge['dependencies'] = dep merge = json_to_str(merge) ##OK result = json_to_str(result) ##ing return jsonify({"result": merge}) ##result @app.route('/generate/rewrite', methods=['POST']) def generate_rewrite(): ##??? buffer = request.json.get('inputs') inputs = { "system": f"{os.getenv('NINS_PROMPT_REWRITE')}\n{buffer.get('system')}", "user": buffer.get('user'), "models": buffer.get('models'), ##gpt-4o-mini } result = fn_main_llms(inputs) ##??? result = str_to_json(result) ##ing ##for k, v in result.items(): print(k, v) sl = list(result.items())[0][1] sl_pro = list(result.items())[1][1] il = list(result.items())[2][1] il_pro = list(result.items())[3][1] ##tl = list(result.items())[4][1] ##可能目標都是英文的時候 ##tl_pro = list(result.items())[5][1] ##可能目標都是英文的時候 embeddings = fn_main_ds({'a': il, 'b': sl}) ##embeddings OR labse result['embeddings'] = embeddings ##embeddings OR labse ##dep, mqm = generate_mqm(result) ##??? ##result['dep'] = dep ##result['mqm'] = mqm result = json_to_str(result) ##ing return jsonify({"result": result}) @app.route('/generate/translate', methods=['POST']) def generate_translate(): ##??? """ buffer = request.json.get('inputs') inputs = { "system": f"{os.getenv('NINS_PROMPT_TRANSLATE')}\n{buffer.get('system')}", "user": buffer.get('user'), "models": buffer.get('models'), ##gpt-4o-mini } result = fn_main_llms(inputs) ##??? result = str_to_json(result) ##ing ##for k, v in result.items(): print(k, v) sl = list(result.items())[0][1] sl_pro = list(result.items())[1][1] il = list(result.items())[2][1] il_pro = list(result.items())[3][1] ##tl = list(result.items())[4][1] ##可能目標都是英文的時候 ##tl_pro = list(result.items())[5][1] ##可能目標都是英文的時候 embeddings = fn_main_ds({'a': il, 'b': sl}) ##embeddings OR labse result['embeddings'] = embeddings ##embeddings OR labse result = json_to_str(result) ##ing return jsonify({"result": result}) """ @app.route('/generate/llms', methods=['POST']) @app.route('/generate/llms-vt', methods=['POST']) ##??? def generate_llms_vt(): ##??? inputs = request.json.get('inputs') result = fn_main_llms(inputs) ##??? return jsonify({"result": result}) @app.route("/embeddings/multiple", methods=["POST"]) # 改! @app.route("/embeddings/multiple-vt", methods=["POST"]) # 改! def embeddings_multiple_vt(): inputs = request.json.get('inputs') result = fn_main_ds(inputs) ##??? return jsonify({"result": result}) @app.route('/webpush/ninscc', methods=['POST']) ##??? def webpush_ninscc(): ##??? inputs = request.json.get('inputs') result = fn_main_webpush(inputs) ##??? return jsonify({"result": result}) @app.route('/decode/url', methods=['POST']) ##??? def decode_url(): ##??? inputs = request.json.get('inputs') result = fn_main_deurl(inputs) ##??? return jsonify({"result": result}) @app.route('/dataset/update', methods=['POST']) ##??? def dataset_update(): ##??? inputs = request.json.get('inputs') result = fn_main_dataset(inputs) ##??? return jsonify({"result": result}) @app.route("/channels/ably", methods=["POST"]) # 改! ##20260420 def channels_get(): data = request.get_json() # VS request.json() email = request.args.get("email") or data.get("email") ##email = request.args.get("email") ##result = token_channel(email) ##search_sheet result = search_sheet(gid=GS_ID, index="index", target="webpush", signal="tag", verify="***", value=email) return jsonify({"result": result}) @app.route("/realtime/ably", methods=["POST"]) # 改! def realtime_ably(): # 改! return token_ably(AR_APIKEY, None) """ @app.route("/telegram/sendMessage", methods=["GET", "POST"]) def telegram_sendMessage(): pass token = TG_TOKEN chat_id = request.values.get("chatid") text = request.values.get("text") keyboard = {} url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={text}" params = { "chat_id": chat_id, "text": text, ##Hello from Python "parse_mode": "HTML", ##Markdown, MarkdownV2, HTML ##"reply_markup": json.dumps(keyboard)##keyboard # ✔ 直接丟 dict } data = None with httpx.Client(timeout=20) as client: resp = client.get(url, params=params) data = resp.json() return data r = requests.get(url, params=params, timeout=20) ##r = requests.post(url, data=params, timeout=(3, 20)) print(r.json()) return r.json() @app.route("/telegram/sendMessage_vt", methods=["GET", "POST"]) async def send_message_vt(): # 完美!同時支援 GET 和 POST 傳參 token = TG_TOKEN chat_id = request.values.get("chatid") text = request.values.get("text") if not chat_id or not text: return {"ok": False, "description": "Bad Request: chatid or text is missing"}, 400 url = f"https://api.telegram.org/bot{token}/sendMessage" payload = { "chat_id": chat_id, "text": text } try: # 設定較長的 timeout 應對 Hugging Face 的網路波動 async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(url, data=payload) # 直接回傳 Telegram API 的結果,方便除錯 return response.json() except httpx.ConnectTimeout: return {"ok": False, "error_code": 504, "description": "Gateway Timeout: Telegram connection timed out"}, 504 except Exception as e: return {"ok": False, "description": f"Internal Error: {str(e)}"}, 500 """ import platform if __name__ == "__main__": sys = platform.system() print(sys) if sys == "Windows": app.run(host="0.0.0.0", port=5800) else: app.run(host="0.0.0.0", port=7860) """ if __name__ == "__main__": app.run(host="0.0.0.0", port=7860) """