Server / app.py
duguaynins's picture
Update app.py
17f8cf2 verified
#==================================================
# IMPORT: TEST
#==================================================
import asyncio
from ably import AblyRest
def token_ably(apikey, token):
try:
ably = AblyRest(apikey)
##client_id = request.args.get("clientId")
##client_id = request.json.get('clientId')
client_id = request.headers.get('clientId')
##client_id = generate_random_id()
token_request_obj = asyncio.run(ably.auth.create_token_request({
"client_id": client_id,
"ttl": 60 * 60 * 1000 * 24, # ⏱️ 有效 24 小時(60 分 x 60 秒 x 1000 毫秒)
})) # 非匿名
token_request_dict = token_request_obj.to_dict() # 轉成 dict
token_request_dict["token"] = token # 新增一個 key-value
return jsonify(token_request_dict)
except Exception as e:
return jsonify({"status": "fail", "reason": "Invalid link", "detail": str(e)}), 403
import stanza ##safe
nlp = stanza.Pipeline("en") ##safe
##doc = nlp(text)
##for sent in doc.sentences:
def get_dependencies(doc):
deps = []
for sent in doc.sentences:
for w in sent.words:
head_word = None if w.head == 0 else sent.words[w.head - 1]
deps.append({
"sent_id": sent.id if hasattr(sent, "id") else None,
"dependent_id": w.id,
"dependent": w.text,
"head_id": w.head,
"head": "ROOT" if w.head == 0 else head_word.text,
"deprel": w.deprel
})
return deps
def get_dependencies_v0(sent):
deps = []
for w in sent.words:
head_word = None if w.head == 0 else sent.words[w.head - 1]
deps.append({
"dependent_id": w.id,
"dependent": w.text,
"head_id": w.head,
"head": "ROOT" if w.head == 0 else head_word.text,
"deprel": w.deprel
})
return deps
#==================================================
# IMPORT: ALL
#==================================================
from urllib.parse import urlencode #webpush
import requests #deurl #webpush
from requests.exceptions import RequestException #deurl
import re #deurl
##import socket
from gradio_client import Client #X
from huggingface_hub import HfApi #dataset
import os #SECRET
from cryptography.fernet import Fernet #FUNCTION
import hashlib #FUNCTION
import json #FUNCTION #webpush
import base64 #FUNCTION
import string #FUNCTION
import random #FUNCTION
import ast #FUNCTION
import pandas as pd #FUNCTION
import io #dataset
##import json
from pywebpush import webpush, WebPushException #webpush
##import requests
from google import genai #llms
from google.genai.errors import APIError #llms
#==================================================
# SECRET: ALL
#==================================================
def checkSecret(SECRET):
if SECRET:
# 遮罩 Token 顯示前 5 個字符,其餘用 *** 代替
masked_token = SECRET[:3] + "***" + SECRET[-3:]
# ⚠️ 僅在您自己檢查 Logs 時使用,檢查完畢後應刪除此行或註釋掉!
print(f"SECRET: {masked_token}")
# 繼續運行您的應用程式...
return SECRET
else:
print(f"SECRET: The 'SECRET' is not a setting.")
return False
GS_ID = os.getenv("GOOGLE_SHEET_ID");checkSecret(GS_ID)
HF_READ = os.getenv("HUGGINGFACE_READ_TOKEN");checkSecret(HF_READ)
HF_WRITE = os.getenv("HUGGINGFACE_WRITE_TOKEN");checkSecret(HF_WRITE)
OC_APIKEY = os.getenv("OPENAI_CHATGPT_APIKEY");checkSecret(OC_APIKEY)
AR_APIKEY = os.environ.get("ABLY_REALTIME_APIKEY");checkSecret(AR_APIKEY)
TG_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN");checkSecret(TG_TOKEN)
NS_SECRET = os.getenv("NINS_CRYPT_SECRET");checkSecret(NS_SECRET) #webpush #32bit!
VAPID_PUBLIC = os.getenv("VAPID_PUBLIC_KEY");checkSecret(VAPID_PUBLIC) #webpush
VAPID_PRIVATE = os.getenv("VAPID_PRIVATE_KEY");checkSecret(VAPID_PRIVATE) #webpush
VAPID_CLAIMS = {
"sub": "mailto:duguaynins@gmail.com",
##"aud": "https://copyright.nins.cc"
}
key = NS_SECRET
cipher = Fernet(key)
print(VAPID_CLAIMS)
#==================================================
# FUNCTION: ALL
#==================================================
##import json
def str_to_json(string):
return json.loads(string)
def json_to_str(object):
return json.dumps(object)
##import json
##import ast
def inputs_to_dict(inputs):
# 如果本來是 dict 直接用
if isinstance(inputs, dict):
pass
return inputs
# 如果是 JSON 字串
elif isinstance(inputs, str):
try:
inputs = str_to_json(inputs) # 先試 JSON
except json.JSONDecodeError:
inputs = ast.literal_eval(inputs) # 再試 Python dict 格式
return inputs
# 如果都不是
else: raise ValueError(f"Cannot convert to dict: {inputs}")
##import string
##import random
def generate_random_id():
azAZ = string.ascii_letters # abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
random_id = ''.join(random.choices(azAZ, k=32))
return random_id
##import base64
def image_to_base64(path):
with open(path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def dohash_data(data: str) -> str:
# "hello".encode("utf-8") # b"hello".decode("utf-8") # "utf-8", "ascii", "big5", ""
# hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha512, hashlib.blake2b *** # bytes only!!!
return hashlib.sha256(data.encode("utf-8")).hexdigest()
def encrypt_data(data: str) -> str:
return cipher.encrypt(data.encode("utf-8")).decode("utf-8")
def decrypt_data(data: str) -> str:
return cipher.decrypt(data.encode("utf-8")).decode("utf-8")
def encrypt_data_to_bytes(data: str) -> bytes:
return cipher.encrypt(data.encode("utf-8"))
##search_sheet(gid, index="index", target="webpush", signal="tag", verify="***")
def search_sheet(gid, index, target, signal, verify, value): ##token_channel ##index='email', target='webpush', signal='time', verify="*O*"
url = f'https://docs.google.com/spreadsheets/d/{gid}/export?format=csv'
df = pd.read_csv(url,
encoding='utf-8',
dtype=str,
keep_default_na=False)
# 顯示重複、清理重複
##duplicates = df[df[index].duplicated(keep=False)]
##print(f"duplicated: \n{duplicates}")
df = df.drop_duplicates(subset=[index], keep='last') ##keep='first' ##只保留最後的重複值
# 篩選並取出 status 為 "正常"
##df = df[df["C"] != "HEHE"] ##只留不等於HEHE
df = df[df[signal].isin(["***", "*O*", verify])] ##"TEST" ##驗證欄位
values = df[index].tolist()
values = set(values)
value = value.lower().strip()
print(f"{len(values)} >>> {values}")
if value in values:
##return True
search = dict(zip(df[index], df[target]))
webpush = search.get(value)
return webpush
else:
##return False
return None
def upload_to_hf(path_or_fileobj, path_in_repo, repo_id, token=None):
token = token or HF_WRITE
path_or_fileobj.seek(0) # 確保從開頭讀
api = HfApi()
api.upload_file(
path_or_fileobj=path_or_fileobj,
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type="dataset",
token=token
)
return True
#==================================================
# FUNCTION: MODEL.SYS
#==================================================
from sentence_transformers import SentenceTransformer, util
##import gradio as gr
# 載入多語言模型
##model = SentenceTransformer("sentence-transformers/multi-qa-MiniLM-L6-cos-v1") # 90.9M
##model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") # 90.9M
##model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') # 50+ # 471M # Microsoft Research / UKP Lab
##model = SentenceTransformer('sentence-transformers/distiluse-base-multilingual-cased-v2') # 50+ # 539M # Hugging Face / UKP Lab
##model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-mpnet-base-v2") # 50+* # 1.11GB # Microsoft Research / UKP Lab
model_meta = SentenceTransformer("sentence-transformers/stsb-xlm-r-multilingual") # 100+* # 1.11G # Trained by Sentence-Transformers based on XLM-RoBERTa
model_goog = SentenceTransformer("sentence-transformers/LaBSE") # 100+* # 1.88GB # GOOGLE Research
"""
# 定義計算相似度的函數
def semantic_similarity(inputs):
print(inputs);print(type(inputs))
inputs = inputs_to_dict(inputs)
print(inputs);print(type(inputs))
sent1, sent2 = inputs.values()
embeddings = model.encode([sent1, sent2], convert_to_tensor=True)
cos_sim = util.pytorch_cos_sim(embeddings[0], embeddings[1])
return round(cos_sim.item(), 4) """
# 定義計算相似度的函數
def semantic_similarity(embeddings):
##sent1, sent2 = inputs.values()
##embeddings = model.encode([sent1, sent2], convert_to_tensor=True)
cos_sim = util.pytorch_cos_sim(embeddings[0], embeddings[1])
return round(cos_sim.item(), 4) # float, int, str, dict
def fn_main_ds(inputs):
##print(inputs);print(type(inputs)) ##20260402
inputs = inputs_to_dict(inputs)
##print(inputs);print(type(inputs)) ##20260402
sent1, sent2 = inputs.values()
sim1 = semantic_similarity(model_goog.encode([sent1, sent2], convert_to_tensor=True))
sim2 = semantic_similarity(model_meta.encode([sent1, sent2], convert_to_tensor=True))
return {"goog": sim1, "meta": sim2} ##20260420
return str('{' + 'goog:' + str(sim1) + '||' + 'meta:' + str(sim2) + '}') # float, int, str, dict
return json.dumps({ ##20260420
"goog": sim1,
"meta": sim2
})
#==================================================
# FUNCTION: LLMs.SYS
#==================================================
##import requests
def openai_text_generate(data):
##data = request.json
if not data:
return {"error": "data?"}, 400
if not data.get("system"):
return {"error": "system?"}, 400
if not data.get("user"):
return {"error": "user?"}, 400
if not data.get("models"):
return {"error": "models?"}, 400
## if not data.get("apikey"): return {"error": "apikey?"}, 400
models = data.get("models", "gpt-4o-mini")
system = data.get("system", "sys prompt")
user = data.get("user", "usr prompt")
apikey = data.get("apikey", os.getenv("OPENAI_CHATGPT_APIKEY")) ##不會使用到後者
headers = {
##"Content-Type": "application/json", "Authorization": f"Bearer {os.getenv('OPENAI_CHATGPT_APIKEY')}"
"Content-Type": "application/json", "Authorization": f"Bearer {apikey}"
}
payload = {
"model": f"{models}",
##"model": "gpt-4o-mini",
##"model": "gpt-3.5-turbo",
##"model": "gpt-4o-nano",
"messages": [
##{"role": "system", "content": f"You are a professional translator who can translate any language into grammatically perfect English."},
{"role": "system", "content": f"{system}"},
{"role": "user", "content": f"{user}"},
],
"max_tokens": 500, "temperature": 0, # 翻譯建議使用 0
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content'].strip()
result = response.json()['choices'][0]['message']['content'].strip()
# 直接返回 dict,不用 jsonify
return {
"system": data.get('system'),
"user": data.get('user'),
"result": result,
}
else:
##return jsonify({"error": response.text}), response.status_code
return {"error": response.text}, response.status_code
##from google import genai
##from google.genai.errors import APIError
def google_text_generation(data):
##data = request.json
if not data:
return {"error": "data?"}, 400
if not data.get("system"):
return {"error": "system?"}, 400
if not data.get("user"):
return {"error": "user?"}, 400
if not data.get("models"):
return {"error": "models?"}, 400
## if not data.get("apikey"): return {"error": "apikey?"}, 400
models = data.get("models", "gemini-1.5-flash")
system = data.get("system", "sys prompt")
user = data.get("user", "usr prompt")
apikey = data.get("apikey", os.getenv("GOOGLE_GEMINI_APIKEY")) ##不會使用到後者
##client = genai.Client(api_key=os.getenv("GOOGLE_GEMINI_APIKEY"))
client = genai.Client(api_key=apikey)
# --- 步驟 3: 設定和呼叫模型 ---
'''
# 呼叫模型生成內容
response = client.models.generate_content(
model=models,
contents=prompt
) '''
# 呼叫模型生成內容
response = client.models.generate_content(
model=models,
config={"system_instruction": system},
contents=user,
)
# 輸出模型的結果
##print("--- 模型回覆 ---") ##20260402
##print(response.text) ##20260402
##print("---------------") ##20260402
return response.text
"""
from transformers import pipeline
# 建立文字生成管線
generator = pipeline("text-generation", model="gpt2") # 可換成你喜歡的 HF 模型
def pipeline_text_generation(data):
user = data.get("user", "auto")
result = generator(user, max_length=100, do_sample=True, top_p=0.9)
return result[0]['generated_text'] ##data.result.confidences ##data是JS變數名稱 ##const data = await HuggingFaceAPI(...);
"""
def fn_main_llms(inputs):
inputs = inputs_to_dict(inputs)##print(inputs) ##20260402
##return pipeline_text_generation(inputs)
if inputs.get("models").lower().startswith("gemini"): return google_text_generation(inputs)
elif inputs.get("models").lower().startswith("gpt"): return openai_text_generate(inputs)
else: return "???"
#==================================================
# FUNCTION: DATASET.SYS
#==================================================
##from cryptography.fernet import Fernet
##import hashlib
##import io
def update_file(id, data, path_in_repo, repo_id, folder="default"):
try:
encrypted_data = encrypt_data(data)
decrypted_data = decrypt_data(encrypted_data)
content = io.BytesIO(encrypted_data.encode("utf-8")) ##text_content
##content = io.BytesIO( encrypt_data_to_bytes(data) ) ##***
##user = dohash_data( id.strip().lower().encode("utf-8") )
user = dohash_data( id.strip().lower() )
##path_in_repo = path_in_repo or f"{folder}/{user}/data.txt"
path_in_repo = f"{folder}/{user}/data.txt"
repo_id = repo_id # f"duguaynins/dataset-v20260406"
if not repo_id: return 404
res = upload_to_hf(path_or_fileobj=content,
path_in_repo=path_in_repo,
repo_id=repo_id,
token=None
)
print("HF result:", res)
print("DEBUG", flush=True)
return 200
except Exception as e: return 500
def fn_main_dataset(inputs):
id, data, path_in_repo, repo_id, folder = (
inputs["id"],
inputs["data"],
inputs["path_in_repo"],
inputs["repo_id"],
inputs["folder"]
)
return update_file(id, data, path_in_repo, repo_id, folder)
#==================================================
# FUNCTION: WEBPUSH.SYS
#==================================================
##VAPID_PUBLIC_KEY = VAPID_PUBLIC
##VAPID_PRIVATE_KEY = VAPID_PRIVATE
def send_push_notifications(string):
"""
批次推播通知,使用全域變數 subscriptions、payload、VAPID_PRIVATE_KEY、VAPID_CLAIMS
"""
payload = str_to_json(string)
body = payload.get("data", {}).get("body", "")
params = { ##追加網址的參數 ##20260414
"userInput": payload["data"]["body"]
}
payload["data"]["title"] = "nins/©"
##payload["data"]["url"] = "https://copyright.nins.cc/" ##鎖定網址的推放
payload["data"]["url"] = ( ##追加網址的參數 ##20260414
"https://copyright.nins.cc/?" + urlencode(params)
)
data = json_to_str( payload.get("data", {}) )
subscriptions = payload.get("subscription", [])
email = payload.get("data", {}).get("mail", "") ##email
for sub in subscriptions:
try:
##if type(sub) == str: ##pass ##進行解碼
if isinstance(sub, str):
sub = decrypt_data(sub) ##20260406
sub = str_to_json(sub) ##20260407
if isinstance(sub, dict):
verify = encrypt_data( json_to_str(sub) )
email_hash = hashlib.sha256(email.strip().lower().encode("utf-8")).hexdigest()
url = f'https://huggingface.co/datasets/duguaynins/dataset-v20260406/resolve/main/{email_hash}/data.txt'
url = f'https://huggingface.co/datasets/duguaynins/nins/resolve/main/{email_hash}/data.txt'
if verify==requests.get(url).text: pass
##else: continue
print(type(sub), sub)
webpush(
data=data, ##json_to_str(payload)
subscription_info=sub, ##str_to_json(sub) ##subscription
vapid_private_key=VAPID_PRIVATE,
vapid_claims=VAPID_CLAIMS
)
print("推播成功:", sub) ##20260402
continue ##return "O"
except WebPushException as ex:
print("推播失敗:", sub) ##20260402
print(ex) ##20260402
print("\n\n\n")
continue ##return "X"
if True:
return f"{len(subscriptions)} * Subscribed"
return "end."
# 呼叫函數
##send_push_notifications()
def fn_main_webpush(inputs):
return send_push_notifications(inputs)
#==================================================
# FUNCTION: DEURL.SYS
#==================================================
import httpx
import urllib.parse
from urllib.parse import urlparse, urljoin # 👈 加上這一行就解決了
import traceback # 👈 記得導入這個
from typing import Tuple, Dict
def expand_urls(url):
try:
r = requests.get(
url,
headers={"User-Agent": "Mozilla/5.0"},
timeout=10
)
r.raise_for_status()
except RequestException as e:
print(f"[WARN] 無法抓取 {url} : {e}")
return []
html = r.text
base_domain = urlparse(url).netloc
urls = re.findall(r'(?:href|src)=["\'](.*?)["\']', html, re.IGNORECASE)
external_urls = set()
for u in urls:
full_url = urljoin(url, u) # 把相對路徑轉成完整網址
parsed = urlparse(full_url)
if parsed.scheme not in ("http", "https"):
continue
if parsed.netloc != base_domain: # 排除同網域
##external_urls.add(full_url)
external_urls.add(parsed.netloc)
if len(external_urls) >= 10: break
return list(external_urls)
def expand_url(url):
try:
with httpx.Client(
##transport=IPv4Only(),
follow_redirects=True,
timeout=10,
max_redirects=5, # 限制跳轉次數,防止死循環
verify=True, # ✅ 正常開啟
headers={
"User-Agent": "Mozilla/5.0"
}
) as client:
r = client.get(url)
return str(r.url)
except httpx.RequestError as e:
##return f"Request error: {e}"
return None
except Exception as e:
##return f"Error: {e}"
return None
def expand_url_redirectcheck_io(url: str) -> str: ##expand_url_redirectcheck
base_api = "https://redirectcheck.io/api/analyze"
params = {"url": url}
try:
with httpx.Client(timeout=20) as client:
resp = client.get(base_api, params=params)
data = resp.json()
# 取出 chain summary 中的 finalUrl
return data.get("summary", {}).get("finalUrl", "-Unable to get the final URL-")
except Exception as e:
##return f"錯誤: {e}"
##raise e
return None
def expand_url_unshorten_me(target_url): ##get_final_url_via_provider
# 使用第三方 API 幫我們跑路
# 這裡以一個簡單的網址展開服務為例
api_url = f"https://unshorten.me/s/{target_url}"
try:
with httpx.Client(timeout=20) as client:
resp = client.get(api_url)
# 它會直接回傳最終網址的純文字
final_url = resp.text.strip()
return final_url
except Exception as e:
##return f"第三方查詢失敗: {e}"
##raise e
return None
def fn_main_deurl(inputs):
print("[START]")
urls = expand_urls(inputs)
##if urls: return urls
urla = expand_url(inputs)
if urla: return {"url":[urla], "urls":urls}
urlb = expand_url_redirectcheck_io(inputs)
if urlb: return {"url":[urlb], "urls":urls}
urlc = expand_url_unshorten_me(inputs)
if urlc: return {"url":[urlc], "urls":urls}
print("[END]")
#==================================================
# ROUTE.SYS
#==================================================
from flask import Flask, render_template, redirect, url_for, request, jsonify, send_file
from flask_cors import CORS
app = Flask(__name__)
CORS(app) # 允許所有來源,這對測試最方便
# 404 發生時,自動導回首頁
@app.errorhandler(404)
def page_not_found(e):
return redirect(url_for("home")) ##def home():
# 正常頁面
@app.route("/")
def home():
return render_template("index.html")
@app.route("/about")
def about():
return render_template("about.html")
@app.route("/contact")
def contact():
return render_template("contact.html")
@app.route("/MultipleEmbeddings/predict", methods=["POST"]) # 改!
@app.route("/embeddings/multiple_v1", methods=["POST"]) # 改!
def MultipleEmbeddings(): # 改!
client = Client(src="https://duguaynins-MultipleEmbeddings-v260331.hf.space", hf_token=HF_READ) # 初始化
result = client.predict(
**request.json, # 將 JSON dict 展開成 fn 的參數
api_name="/fn_main", # 如果知道 api_name,則 fn_index 可設定為 None
fn_index=None,
##data=input_data,
)
return jsonify({"result": result})
@app.route('/generate/llms_v1', methods=['POST'])
def generate_llms():
client = Client(src="https://duguaynins-MultipleLLMs-v260401.hf.space", hf_token=HF_READ) # 初始化
result = client.predict(
**request.json, # 將 JSON dict 展開成 fn 的參數
api_name="/fn_main", # 如果知道 api_name,則 fn_index 可設定為 None
##fn_index=0, # 如果知道 api_name,則 fn_index 可設定為 None
##data=input_data,
)
return jsonify({"result": result})
"""
@app.route('/generate/llms', methods=['POST']) ##???
def generate_llms(): ##???
inputs = request.json.get('inputs')
result = fn_main_llms(inputs) ##???
return jsonify({"result": result})
"""
##{"inputs": {"system":_,"user":_,} }
##...{"user": {"source":_,"target":_,"en":_,} }
@app.route('/generate/mqm', methods=['POST'])
def generate_mqm(): ##??? ##data
buffer = request.json.get('inputs')
doc = nlp(buffer.get('intermediate', 'The admin is duguaynins.'))
dep = get_dependencies(doc)
#data.pop(list(data.items())[1][0], None)
#data.pop(list(data.items())[3][0], None)
#data.pop(list(data.items())[5][0], None)
inputs = {
"system": f"{os.getenv('NINS_PROMPT_MQM')}\n{buffer.get('system')}", ##source ##list(data.items())[0][0]
"user": f"{dep}\n{buffer.get('user')}",
"models": "gpt-4o-mini", ##gpt-4o-mini
}
result = fn_main_llms(inputs) ##???
result = str_to_json(result) ##ing
merge = {}
merge['MQM'] = result
merge['dependencies'] = dep
merge = json_to_str(merge) ##OK
result = json_to_str(result) ##ing
return jsonify({"result": merge}) ##result
@app.route('/generate/rewrite', methods=['POST'])
def generate_rewrite(): ##???
buffer = request.json.get('inputs')
inputs = {
"system": f"{os.getenv('NINS_PROMPT_REWRITE')}\n{buffer.get('system')}",
"user": buffer.get('user'),
"models": buffer.get('models'), ##gpt-4o-mini
}
result = fn_main_llms(inputs) ##???
result = str_to_json(result) ##ing
##for k, v in result.items(): print(k, v)
sl = list(result.items())[0][1]
sl_pro = list(result.items())[1][1]
il = list(result.items())[2][1]
il_pro = list(result.items())[3][1]
##tl = list(result.items())[4][1] ##可能目標都是英文的時候
##tl_pro = list(result.items())[5][1] ##可能目標都是英文的時候
embeddings = fn_main_ds({'a': il, 'b': sl}) ##embeddings OR labse
result['embeddings'] = embeddings ##embeddings OR labse
##dep, mqm = generate_mqm(result) ##???
##result['dep'] = dep
##result['mqm'] = mqm
result = json_to_str(result) ##ing
return jsonify({"result": result})
@app.route('/generate/translate', methods=['POST'])
def generate_translate(): ##???
"""
buffer = request.json.get('inputs')
inputs = {
"system": f"{os.getenv('NINS_PROMPT_TRANSLATE')}\n{buffer.get('system')}",
"user": buffer.get('user'),
"models": buffer.get('models'), ##gpt-4o-mini
}
result = fn_main_llms(inputs) ##???
result = str_to_json(result) ##ing
##for k, v in result.items(): print(k, v)
sl = list(result.items())[0][1]
sl_pro = list(result.items())[1][1]
il = list(result.items())[2][1]
il_pro = list(result.items())[3][1]
##tl = list(result.items())[4][1] ##可能目標都是英文的時候
##tl_pro = list(result.items())[5][1] ##可能目標都是英文的時候
embeddings = fn_main_ds({'a': il, 'b': sl}) ##embeddings OR labse
result['embeddings'] = embeddings ##embeddings OR labse
result = json_to_str(result) ##ing
return jsonify({"result": result})
"""
@app.route('/generate/llms', methods=['POST'])
@app.route('/generate/llms-vt', methods=['POST']) ##???
def generate_llms_vt(): ##???
inputs = request.json.get('inputs')
result = fn_main_llms(inputs) ##???
return jsonify({"result": result})
@app.route("/embeddings/multiple", methods=["POST"]) # 改!
@app.route("/embeddings/multiple-vt", methods=["POST"]) # 改!
def embeddings_multiple_vt():
inputs = request.json.get('inputs')
result = fn_main_ds(inputs) ##???
return jsonify({"result": result})
@app.route('/webpush/ninscc', methods=['POST']) ##???
def webpush_ninscc(): ##???
inputs = request.json.get('inputs')
result = fn_main_webpush(inputs) ##???
return jsonify({"result": result})
@app.route('/decode/url', methods=['POST']) ##???
def decode_url(): ##???
inputs = request.json.get('inputs')
result = fn_main_deurl(inputs) ##???
return jsonify({"result": result})
@app.route('/dataset/update', methods=['POST']) ##???
def dataset_update(): ##???
inputs = request.json.get('inputs')
result = fn_main_dataset(inputs) ##???
return jsonify({"result": result})
@app.route("/channels/ably", methods=["POST"]) # 改! ##20260420
def channels_get():
data = request.get_json() # VS request.json()
email = request.args.get("email") or data.get("email")
##email = request.args.get("email")
##result = token_channel(email) ##search_sheet
result = search_sheet(gid=GS_ID, index="index", target="webpush", signal="tag", verify="***", value=email)
return jsonify({"result": result})
@app.route("/realtime/ably", methods=["POST"]) # 改!
def realtime_ably(): # 改!
return token_ably(AR_APIKEY, None)
"""
@app.route("/telegram/sendMessage", methods=["GET", "POST"])
def telegram_sendMessage():
pass
token = TG_TOKEN
chat_id = request.values.get("chatid")
text = request.values.get("text")
keyboard = {}
url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={text}"
params = {
"chat_id": chat_id,
"text": text, ##Hello from Python
"parse_mode": "HTML", ##Markdown, MarkdownV2, HTML
##"reply_markup": json.dumps(keyboard)##keyboard # ✔ 直接丟 dict
}
data = None
with httpx.Client(timeout=20) as client:
resp = client.get(url, params=params)
data = resp.json()
return data
r = requests.get(url, params=params, timeout=20)
##r = requests.post(url, data=params, timeout=(3, 20))
print(r.json())
return r.json()
@app.route("/telegram/sendMessage_vt", methods=["GET", "POST"])
async def send_message_vt():
# 完美!同時支援 GET 和 POST 傳參
token = TG_TOKEN
chat_id = request.values.get("chatid")
text = request.values.get("text")
if not chat_id or not text:
return {"ok": False, "description": "Bad Request: chatid or text is missing"}, 400
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": text
}
try:
# 設定較長的 timeout 應對 Hugging Face 的網路波動
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, data=payload)
# 直接回傳 Telegram API 的結果,方便除錯
return response.json()
except httpx.ConnectTimeout:
return {"ok": False, "error_code": 504, "description": "Gateway Timeout: Telegram connection timed out"}, 504
except Exception as e:
return {"ok": False, "description": f"Internal Error: {str(e)}"}, 500
"""
import platform
if __name__ == "__main__":
sys = platform.system()
print(sys)
if sys == "Windows":
app.run(host="0.0.0.0", port=5800)
else:
app.run(host="0.0.0.0", port=7860)
"""
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860) """