Spaces:
Running
Running
| #================================================== | |
| # IMPORT: TEST | |
| #================================================== | |
| import asyncio | |
| from ably import AblyRest | |
| def token_ably(apikey, token): | |
| try: | |
| ably = AblyRest(apikey) | |
| ##client_id = request.args.get("clientId") | |
| ##client_id = request.json.get('clientId') | |
| client_id = request.headers.get('clientId') | |
| ##client_id = generate_random_id() | |
| token_request_obj = asyncio.run(ably.auth.create_token_request({ | |
| "client_id": client_id, | |
| "ttl": 60 * 60 * 1000 * 24, # ⏱️ 有效 24 小時(60 分 x 60 秒 x 1000 毫秒) | |
| })) # 非匿名 | |
| token_request_dict = token_request_obj.to_dict() # 轉成 dict | |
| token_request_dict["token"] = token # 新增一個 key-value | |
| return jsonify(token_request_dict) | |
| except Exception as e: | |
| return jsonify({"status": "fail", "reason": "Invalid link", "detail": str(e)}), 403 | |
| import stanza ##safe | |
| nlp = stanza.Pipeline("en") ##safe | |
| ##doc = nlp(text) | |
| ##for sent in doc.sentences: | |
| def get_dependencies(doc): | |
| deps = [] | |
| for sent in doc.sentences: | |
| for w in sent.words: | |
| head_word = None if w.head == 0 else sent.words[w.head - 1] | |
| deps.append({ | |
| "sent_id": sent.id if hasattr(sent, "id") else None, | |
| "dependent_id": w.id, | |
| "dependent": w.text, | |
| "head_id": w.head, | |
| "head": "ROOT" if w.head == 0 else head_word.text, | |
| "deprel": w.deprel | |
| }) | |
| return deps | |
| def get_dependencies_v0(sent): | |
| deps = [] | |
| for w in sent.words: | |
| head_word = None if w.head == 0 else sent.words[w.head - 1] | |
| deps.append({ | |
| "dependent_id": w.id, | |
| "dependent": w.text, | |
| "head_id": w.head, | |
| "head": "ROOT" if w.head == 0 else head_word.text, | |
| "deprel": w.deprel | |
| }) | |
| return deps | |
| #================================================== | |
| # IMPORT: ALL | |
| #================================================== | |
| from urllib.parse import urlencode #webpush | |
| import requests #deurl #webpush | |
| from requests.exceptions import RequestException #deurl | |
| import re #deurl | |
| ##import socket | |
| from gradio_client import Client #X | |
| from huggingface_hub import HfApi #dataset | |
| import os #SECRET | |
| from cryptography.fernet import Fernet #FUNCTION | |
| import hashlib #FUNCTION | |
| import json #FUNCTION #webpush | |
| import base64 #FUNCTION | |
| import string #FUNCTION | |
| import random #FUNCTION | |
| import ast #FUNCTION | |
| import pandas as pd #FUNCTION | |
| import io #dataset | |
| ##import json | |
| from pywebpush import webpush, WebPushException #webpush | |
| ##import requests | |
| from google import genai #llms | |
| from google.genai.errors import APIError #llms | |
| #================================================== | |
| # SECRET: ALL | |
| #================================================== | |
| def checkSecret(SECRET): | |
| if SECRET: | |
| # 遮罩 Token 顯示前 5 個字符,其餘用 *** 代替 | |
| masked_token = SECRET[:3] + "***" + SECRET[-3:] | |
| # ⚠️ 僅在您自己檢查 Logs 時使用,檢查完畢後應刪除此行或註釋掉! | |
| print(f"SECRET: {masked_token}") | |
| # 繼續運行您的應用程式... | |
| return SECRET | |
| else: | |
| print(f"SECRET: The 'SECRET' is not a setting.") | |
| return False | |
| GS_ID = os.getenv("GOOGLE_SHEET_ID");checkSecret(GS_ID) | |
| HF_READ = os.getenv("HUGGINGFACE_READ_TOKEN");checkSecret(HF_READ) | |
| HF_WRITE = os.getenv("HUGGINGFACE_WRITE_TOKEN");checkSecret(HF_WRITE) | |
| OC_APIKEY = os.getenv("OPENAI_CHATGPT_APIKEY");checkSecret(OC_APIKEY) | |
| AR_APIKEY = os.environ.get("ABLY_REALTIME_APIKEY");checkSecret(AR_APIKEY) | |
| TG_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN");checkSecret(TG_TOKEN) | |
| NS_SECRET = os.getenv("NINS_CRYPT_SECRET");checkSecret(NS_SECRET) #webpush #32bit! | |
| VAPID_PUBLIC = os.getenv("VAPID_PUBLIC_KEY");checkSecret(VAPID_PUBLIC) #webpush | |
| VAPID_PRIVATE = os.getenv("VAPID_PRIVATE_KEY");checkSecret(VAPID_PRIVATE) #webpush | |
| VAPID_CLAIMS = { | |
| "sub": "mailto:duguaynins@gmail.com", | |
| ##"aud": "https://copyright.nins.cc" | |
| } | |
| key = NS_SECRET | |
| cipher = Fernet(key) | |
| print(VAPID_CLAIMS) | |
| #================================================== | |
| # FUNCTION: ALL | |
| #================================================== | |
| ##import json | |
| def str_to_json(string): | |
| return json.loads(string) | |
| def json_to_str(object): | |
| return json.dumps(object) | |
| ##import json | |
| ##import ast | |
| def inputs_to_dict(inputs): | |
| # 如果本來是 dict 直接用 | |
| if isinstance(inputs, dict): | |
| pass | |
| return inputs | |
| # 如果是 JSON 字串 | |
| elif isinstance(inputs, str): | |
| try: | |
| inputs = str_to_json(inputs) # 先試 JSON | |
| except json.JSONDecodeError: | |
| inputs = ast.literal_eval(inputs) # 再試 Python dict 格式 | |
| return inputs | |
| # 如果都不是 | |
| else: raise ValueError(f"Cannot convert to dict: {inputs}") | |
| ##import string | |
| ##import random | |
| def generate_random_id(): | |
| azAZ = string.ascii_letters # abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ | |
| random_id = ''.join(random.choices(azAZ, k=32)) | |
| return random_id | |
| ##import base64 | |
| def image_to_base64(path): | |
| with open(path, "rb") as f: | |
| return base64.b64encode(f.read()).decode("utf-8") | |
| def dohash_data(data: str) -> str: | |
| # "hello".encode("utf-8") # b"hello".decode("utf-8") # "utf-8", "ascii", "big5", "" | |
| # hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha512, hashlib.blake2b *** # bytes only!!! | |
| return hashlib.sha256(data.encode("utf-8")).hexdigest() | |
| def encrypt_data(data: str) -> str: | |
| return cipher.encrypt(data.encode("utf-8")).decode("utf-8") | |
| def decrypt_data(data: str) -> str: | |
| return cipher.decrypt(data.encode("utf-8")).decode("utf-8") | |
| def encrypt_data_to_bytes(data: str) -> bytes: | |
| return cipher.encrypt(data.encode("utf-8")) | |
| ##search_sheet(gid, index="index", target="webpush", signal="tag", verify="***") | |
| def search_sheet(gid, index, target, signal, verify, value): ##token_channel ##index='email', target='webpush', signal='time', verify="*O*" | |
| url = f'https://docs.google.com/spreadsheets/d/{gid}/export?format=csv' | |
| df = pd.read_csv(url, | |
| encoding='utf-8', | |
| dtype=str, | |
| keep_default_na=False) | |
| # 顯示重複、清理重複 | |
| ##duplicates = df[df[index].duplicated(keep=False)] | |
| ##print(f"duplicated: \n{duplicates}") | |
| df = df.drop_duplicates(subset=[index], keep='last') ##keep='first' ##只保留最後的重複值 | |
| # 篩選並取出 status 為 "正常" | |
| ##df = df[df["C"] != "HEHE"] ##只留不等於HEHE | |
| df = df[df[signal].isin(["***", "*O*", verify])] ##"TEST" ##驗證欄位 | |
| values = df[index].tolist() | |
| values = set(values) | |
| value = value.lower().strip() | |
| print(f"{len(values)} >>> {values}") | |
| if value in values: | |
| ##return True | |
| search = dict(zip(df[index], df[target])) | |
| webpush = search.get(value) | |
| return webpush | |
| else: | |
| ##return False | |
| return None | |
| def upload_to_hf(path_or_fileobj, path_in_repo, repo_id, token=None): | |
| token = token or HF_WRITE | |
| path_or_fileobj.seek(0) # 確保從開頭讀 | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=path_or_fileobj, | |
| path_in_repo=path_in_repo, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| token=token | |
| ) | |
| return True | |
| #================================================== | |
| # FUNCTION: MODEL.SYS | |
| #================================================== | |
| from sentence_transformers import SentenceTransformer, util | |
| ##import gradio as gr | |
| # 載入多語言模型 | |
| ##model = SentenceTransformer("sentence-transformers/multi-qa-MiniLM-L6-cos-v1") # 90.9M | |
| ##model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") # 90.9M | |
| ##model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') # 50+ # 471M # Microsoft Research / UKP Lab | |
| ##model = SentenceTransformer('sentence-transformers/distiluse-base-multilingual-cased-v2') # 50+ # 539M # Hugging Face / UKP Lab | |
| ##model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-mpnet-base-v2") # 50+* # 1.11GB # Microsoft Research / UKP Lab | |
| model_meta = SentenceTransformer("sentence-transformers/stsb-xlm-r-multilingual") # 100+* # 1.11G # Trained by Sentence-Transformers based on XLM-RoBERTa | |
| model_goog = SentenceTransformer("sentence-transformers/LaBSE") # 100+* # 1.88GB # GOOGLE Research | |
| """ | |
| # 定義計算相似度的函數 | |
| def semantic_similarity(inputs): | |
| print(inputs);print(type(inputs)) | |
| inputs = inputs_to_dict(inputs) | |
| print(inputs);print(type(inputs)) | |
| sent1, sent2 = inputs.values() | |
| embeddings = model.encode([sent1, sent2], convert_to_tensor=True) | |
| cos_sim = util.pytorch_cos_sim(embeddings[0], embeddings[1]) | |
| return round(cos_sim.item(), 4) """ | |
| # 定義計算相似度的函數 | |
| def semantic_similarity(embeddings): | |
| ##sent1, sent2 = inputs.values() | |
| ##embeddings = model.encode([sent1, sent2], convert_to_tensor=True) | |
| cos_sim = util.pytorch_cos_sim(embeddings[0], embeddings[1]) | |
| return round(cos_sim.item(), 4) # float, int, str, dict | |
| def fn_main_ds(inputs): | |
| ##print(inputs);print(type(inputs)) ##20260402 | |
| inputs = inputs_to_dict(inputs) | |
| ##print(inputs);print(type(inputs)) ##20260402 | |
| sent1, sent2 = inputs.values() | |
| sim1 = semantic_similarity(model_goog.encode([sent1, sent2], convert_to_tensor=True)) | |
| sim2 = semantic_similarity(model_meta.encode([sent1, sent2], convert_to_tensor=True)) | |
| return {"goog": sim1, "meta": sim2} ##20260420 | |
| return str('{' + 'goog:' + str(sim1) + '||' + 'meta:' + str(sim2) + '}') # float, int, str, dict | |
| return json.dumps({ ##20260420 | |
| "goog": sim1, | |
| "meta": sim2 | |
| }) | |
| #================================================== | |
| # FUNCTION: LLMs.SYS | |
| #================================================== | |
| ##import requests | |
| def openai_text_generate(data): | |
| ##data = request.json | |
| if not data: | |
| return {"error": "data?"}, 400 | |
| if not data.get("system"): | |
| return {"error": "system?"}, 400 | |
| if not data.get("user"): | |
| return {"error": "user?"}, 400 | |
| if not data.get("models"): | |
| return {"error": "models?"}, 400 | |
| ## if not data.get("apikey"): return {"error": "apikey?"}, 400 | |
| models = data.get("models", "gpt-4o-mini") | |
| system = data.get("system", "sys prompt") | |
| user = data.get("user", "usr prompt") | |
| apikey = data.get("apikey", os.getenv("OPENAI_CHATGPT_APIKEY")) ##不會使用到後者 | |
| headers = { | |
| ##"Content-Type": "application/json", "Authorization": f"Bearer {os.getenv('OPENAI_CHATGPT_APIKEY')}" | |
| "Content-Type": "application/json", "Authorization": f"Bearer {apikey}" | |
| } | |
| payload = { | |
| "model": f"{models}", | |
| ##"model": "gpt-4o-mini", | |
| ##"model": "gpt-3.5-turbo", | |
| ##"model": "gpt-4o-nano", | |
| "messages": [ | |
| ##{"role": "system", "content": f"You are a professional translator who can translate any language into grammatically perfect English."}, | |
| {"role": "system", "content": f"{system}"}, | |
| {"role": "user", "content": f"{user}"}, | |
| ], | |
| "max_tokens": 500, "temperature": 0, # 翻譯建議使用 0 | |
| } | |
| response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
| if response.status_code == 200: | |
| return response.json()['choices'][0]['message']['content'].strip() | |
| result = response.json()['choices'][0]['message']['content'].strip() | |
| # 直接返回 dict,不用 jsonify | |
| return { | |
| "system": data.get('system'), | |
| "user": data.get('user'), | |
| "result": result, | |
| } | |
| else: | |
| ##return jsonify({"error": response.text}), response.status_code | |
| return {"error": response.text}, response.status_code | |
| ##from google import genai | |
| ##from google.genai.errors import APIError | |
| def google_text_generation(data): | |
| ##data = request.json | |
| if not data: | |
| return {"error": "data?"}, 400 | |
| if not data.get("system"): | |
| return {"error": "system?"}, 400 | |
| if not data.get("user"): | |
| return {"error": "user?"}, 400 | |
| if not data.get("models"): | |
| return {"error": "models?"}, 400 | |
| ## if not data.get("apikey"): return {"error": "apikey?"}, 400 | |
| models = data.get("models", "gemini-1.5-flash") | |
| system = data.get("system", "sys prompt") | |
| user = data.get("user", "usr prompt") | |
| apikey = data.get("apikey", os.getenv("GOOGLE_GEMINI_APIKEY")) ##不會使用到後者 | |
| ##client = genai.Client(api_key=os.getenv("GOOGLE_GEMINI_APIKEY")) | |
| client = genai.Client(api_key=apikey) | |
| # --- 步驟 3: 設定和呼叫模型 --- | |
| ''' | |
| # 呼叫模型生成內容 | |
| response = client.models.generate_content( | |
| model=models, | |
| contents=prompt | |
| ) ''' | |
| # 呼叫模型生成內容 | |
| response = client.models.generate_content( | |
| model=models, | |
| config={"system_instruction": system}, | |
| contents=user, | |
| ) | |
| # 輸出模型的結果 | |
| ##print("--- 模型回覆 ---") ##20260402 | |
| ##print(response.text) ##20260402 | |
| ##print("---------------") ##20260402 | |
| return response.text | |
| """ | |
| from transformers import pipeline | |
| # 建立文字生成管線 | |
| generator = pipeline("text-generation", model="gpt2") # 可換成你喜歡的 HF 模型 | |
| def pipeline_text_generation(data): | |
| user = data.get("user", "auto") | |
| result = generator(user, max_length=100, do_sample=True, top_p=0.9) | |
| return result[0]['generated_text'] ##data.result.confidences ##data是JS變數名稱 ##const data = await HuggingFaceAPI(...); | |
| """ | |
| def fn_main_llms(inputs): | |
| inputs = inputs_to_dict(inputs)##print(inputs) ##20260402 | |
| ##return pipeline_text_generation(inputs) | |
| if inputs.get("models").lower().startswith("gemini"): return google_text_generation(inputs) | |
| elif inputs.get("models").lower().startswith("gpt"): return openai_text_generate(inputs) | |
| else: return "???" | |
| #================================================== | |
| # FUNCTION: DATASET.SYS | |
| #================================================== | |
| ##from cryptography.fernet import Fernet | |
| ##import hashlib | |
| ##import io | |
| def update_file(id, data, path_in_repo, repo_id, folder="default"): | |
| try: | |
| encrypted_data = encrypt_data(data) | |
| decrypted_data = decrypt_data(encrypted_data) | |
| content = io.BytesIO(encrypted_data.encode("utf-8")) ##text_content | |
| ##content = io.BytesIO( encrypt_data_to_bytes(data) ) ##*** | |
| ##user = dohash_data( id.strip().lower().encode("utf-8") ) | |
| user = dohash_data( id.strip().lower() ) | |
| ##path_in_repo = path_in_repo or f"{folder}/{user}/data.txt" | |
| path_in_repo = f"{folder}/{user}/data.txt" | |
| repo_id = repo_id # f"duguaynins/dataset-v20260406" | |
| if not repo_id: return 404 | |
| res = upload_to_hf(path_or_fileobj=content, | |
| path_in_repo=path_in_repo, | |
| repo_id=repo_id, | |
| token=None | |
| ) | |
| print("HF result:", res) | |
| print("DEBUG", flush=True) | |
| return 200 | |
| except Exception as e: return 500 | |
| def fn_main_dataset(inputs): | |
| id, data, path_in_repo, repo_id, folder = ( | |
| inputs["id"], | |
| inputs["data"], | |
| inputs["path_in_repo"], | |
| inputs["repo_id"], | |
| inputs["folder"] | |
| ) | |
| return update_file(id, data, path_in_repo, repo_id, folder) | |
| #================================================== | |
| # FUNCTION: WEBPUSH.SYS | |
| #================================================== | |
| ##VAPID_PUBLIC_KEY = VAPID_PUBLIC | |
| ##VAPID_PRIVATE_KEY = VAPID_PRIVATE | |
| def send_push_notifications(string): | |
| """ | |
| 批次推播通知,使用全域變數 subscriptions、payload、VAPID_PRIVATE_KEY、VAPID_CLAIMS | |
| """ | |
| payload = str_to_json(string) | |
| body = payload.get("data", {}).get("body", "") | |
| params = { ##追加網址的參數 ##20260414 | |
| "userInput": payload["data"]["body"] | |
| } | |
| payload["data"]["title"] = "nins/©" | |
| ##payload["data"]["url"] = "https://copyright.nins.cc/" ##鎖定網址的推放 | |
| payload["data"]["url"] = ( ##追加網址的參數 ##20260414 | |
| "https://copyright.nins.cc/?" + urlencode(params) | |
| ) | |
| data = json_to_str( payload.get("data", {}) ) | |
| subscriptions = payload.get("subscription", []) | |
| email = payload.get("data", {}).get("mail", "") ##email | |
| for sub in subscriptions: | |
| try: | |
| ##if type(sub) == str: ##pass ##進行解碼 | |
| if isinstance(sub, str): | |
| sub = decrypt_data(sub) ##20260406 | |
| sub = str_to_json(sub) ##20260407 | |
| if isinstance(sub, dict): | |
| verify = encrypt_data( json_to_str(sub) ) | |
| email_hash = hashlib.sha256(email.strip().lower().encode("utf-8")).hexdigest() | |
| url = f'https://huggingface.co/datasets/duguaynins/dataset-v20260406/resolve/main/{email_hash}/data.txt' | |
| url = f'https://huggingface.co/datasets/duguaynins/nins/resolve/main/{email_hash}/data.txt' | |
| if verify==requests.get(url).text: pass | |
| ##else: continue | |
| print(type(sub), sub) | |
| webpush( | |
| data=data, ##json_to_str(payload) | |
| subscription_info=sub, ##str_to_json(sub) ##subscription | |
| vapid_private_key=VAPID_PRIVATE, | |
| vapid_claims=VAPID_CLAIMS | |
| ) | |
| print("推播成功:", sub) ##20260402 | |
| continue ##return "O" | |
| except WebPushException as ex: | |
| print("推播失敗:", sub) ##20260402 | |
| print(ex) ##20260402 | |
| print("\n\n\n") | |
| continue ##return "X" | |
| if True: | |
| return f"{len(subscriptions)} * Subscribed" | |
| return "end." | |
| # 呼叫函數 | |
| ##send_push_notifications() | |
| def fn_main_webpush(inputs): | |
| return send_push_notifications(inputs) | |
| #================================================== | |
| # FUNCTION: DEURL.SYS | |
| #================================================== | |
| import httpx | |
| import urllib.parse | |
| from urllib.parse import urlparse, urljoin # 👈 加上這一行就解決了 | |
| import traceback # 👈 記得導入這個 | |
| from typing import Tuple, Dict | |
| def expand_urls(url): | |
| try: | |
| r = requests.get( | |
| url, | |
| headers={"User-Agent": "Mozilla/5.0"}, | |
| timeout=10 | |
| ) | |
| r.raise_for_status() | |
| except RequestException as e: | |
| print(f"[WARN] 無法抓取 {url} : {e}") | |
| return [] | |
| html = r.text | |
| base_domain = urlparse(url).netloc | |
| urls = re.findall(r'(?:href|src)=["\'](.*?)["\']', html, re.IGNORECASE) | |
| external_urls = set() | |
| for u in urls: | |
| full_url = urljoin(url, u) # 把相對路徑轉成完整網址 | |
| parsed = urlparse(full_url) | |
| if parsed.scheme not in ("http", "https"): | |
| continue | |
| if parsed.netloc != base_domain: # 排除同網域 | |
| ##external_urls.add(full_url) | |
| external_urls.add(parsed.netloc) | |
| if len(external_urls) >= 10: break | |
| return list(external_urls) | |
| def expand_url(url): | |
| try: | |
| with httpx.Client( | |
| ##transport=IPv4Only(), | |
| follow_redirects=True, | |
| timeout=10, | |
| max_redirects=5, # 限制跳轉次數,防止死循環 | |
| verify=True, # ✅ 正常開啟 | |
| headers={ | |
| "User-Agent": "Mozilla/5.0" | |
| } | |
| ) as client: | |
| r = client.get(url) | |
| return str(r.url) | |
| except httpx.RequestError as e: | |
| ##return f"Request error: {e}" | |
| return None | |
| except Exception as e: | |
| ##return f"Error: {e}" | |
| return None | |
| def expand_url_redirectcheck_io(url: str) -> str: ##expand_url_redirectcheck | |
| base_api = "https://redirectcheck.io/api/analyze" | |
| params = {"url": url} | |
| try: | |
| with httpx.Client(timeout=20) as client: | |
| resp = client.get(base_api, params=params) | |
| data = resp.json() | |
| # 取出 chain summary 中的 finalUrl | |
| return data.get("summary", {}).get("finalUrl", "-Unable to get the final URL-") | |
| except Exception as e: | |
| ##return f"錯誤: {e}" | |
| ##raise e | |
| return None | |
| def expand_url_unshorten_me(target_url): ##get_final_url_via_provider | |
| # 使用第三方 API 幫我們跑路 | |
| # 這裡以一個簡單的網址展開服務為例 | |
| api_url = f"https://unshorten.me/s/{target_url}" | |
| try: | |
| with httpx.Client(timeout=20) as client: | |
| resp = client.get(api_url) | |
| # 它會直接回傳最終網址的純文字 | |
| final_url = resp.text.strip() | |
| return final_url | |
| except Exception as e: | |
| ##return f"第三方查詢失敗: {e}" | |
| ##raise e | |
| return None | |
| def fn_main_deurl(inputs): | |
| print("[START]") | |
| urls = expand_urls(inputs) | |
| ##if urls: return urls | |
| urla = expand_url(inputs) | |
| if urla: return {"url":[urla], "urls":urls} | |
| urlb = expand_url_redirectcheck_io(inputs) | |
| if urlb: return {"url":[urlb], "urls":urls} | |
| urlc = expand_url_unshorten_me(inputs) | |
| if urlc: return {"url":[urlc], "urls":urls} | |
| print("[END]") | |
| #================================================== | |
| # ROUTE.SYS | |
| #================================================== | |
| from flask import Flask, render_template, redirect, url_for, request, jsonify, send_file | |
| from flask_cors import CORS | |
| app = Flask(__name__) | |
| CORS(app) # 允許所有來源,這對測試最方便 | |
| # 404 發生時,自動導回首頁 | |
| def page_not_found(e): | |
| return redirect(url_for("home")) ##def home(): | |
| # 正常頁面 | |
| def home(): | |
| return render_template("index.html") | |
| def about(): | |
| return render_template("about.html") | |
| def contact(): | |
| return render_template("contact.html") | |
| # 改! | |
| # 改! | |
| def MultipleEmbeddings(): # 改! | |
| client = Client(src="https://duguaynins-MultipleEmbeddings-v260331.hf.space", hf_token=HF_READ) # 初始化 | |
| result = client.predict( | |
| **request.json, # 將 JSON dict 展開成 fn 的參數 | |
| api_name="/fn_main", # 如果知道 api_name,則 fn_index 可設定為 None | |
| fn_index=None, | |
| ##data=input_data, | |
| ) | |
| return jsonify({"result": result}) | |
| def generate_llms(): | |
| client = Client(src="https://duguaynins-MultipleLLMs-v260401.hf.space", hf_token=HF_READ) # 初始化 | |
| result = client.predict( | |
| **request.json, # 將 JSON dict 展開成 fn 的參數 | |
| api_name="/fn_main", # 如果知道 api_name,則 fn_index 可設定為 None | |
| ##fn_index=0, # 如果知道 api_name,則 fn_index 可設定為 None | |
| ##data=input_data, | |
| ) | |
| return jsonify({"result": result}) | |
| """ | |
| @app.route('/generate/llms', methods=['POST']) ##??? | |
| def generate_llms(): ##??? | |
| inputs = request.json.get('inputs') | |
| result = fn_main_llms(inputs) ##??? | |
| return jsonify({"result": result}) | |
| """ | |
| ##{"inputs": {"system":_,"user":_,} } | |
| ##...{"user": {"source":_,"target":_,"en":_,} } | |
| def generate_mqm(): ##??? ##data | |
| buffer = request.json.get('inputs') | |
| doc = nlp(buffer.get('intermediate', 'The admin is duguaynins.')) | |
| dep = get_dependencies(doc) | |
| #data.pop(list(data.items())[1][0], None) | |
| #data.pop(list(data.items())[3][0], None) | |
| #data.pop(list(data.items())[5][0], None) | |
| inputs = { | |
| "system": f"{os.getenv('NINS_PROMPT_MQM')}\n{buffer.get('system')}", ##source ##list(data.items())[0][0] | |
| "user": f"{dep}\n{buffer.get('user')}", | |
| "models": "gpt-4o-mini", ##gpt-4o-mini | |
| } | |
| result = fn_main_llms(inputs) ##??? | |
| result = str_to_json(result) ##ing | |
| merge = {} | |
| merge['MQM'] = result | |
| merge['dependencies'] = dep | |
| merge = json_to_str(merge) ##OK | |
| result = json_to_str(result) ##ing | |
| return jsonify({"result": merge}) ##result | |
| def generate_rewrite(): ##??? | |
| buffer = request.json.get('inputs') | |
| inputs = { | |
| "system": f"{os.getenv('NINS_PROMPT_REWRITE')}\n{buffer.get('system')}", | |
| "user": buffer.get('user'), | |
| "models": buffer.get('models'), ##gpt-4o-mini | |
| } | |
| result = fn_main_llms(inputs) ##??? | |
| result = str_to_json(result) ##ing | |
| ##for k, v in result.items(): print(k, v) | |
| sl = list(result.items())[0][1] | |
| sl_pro = list(result.items())[1][1] | |
| il = list(result.items())[2][1] | |
| il_pro = list(result.items())[3][1] | |
| ##tl = list(result.items())[4][1] ##可能目標都是英文的時候 | |
| ##tl_pro = list(result.items())[5][1] ##可能目標都是英文的時候 | |
| embeddings = fn_main_ds({'a': il, 'b': sl}) ##embeddings OR labse | |
| result['embeddings'] = embeddings ##embeddings OR labse | |
| ##dep, mqm = generate_mqm(result) ##??? | |
| ##result['dep'] = dep | |
| ##result['mqm'] = mqm | |
| result = json_to_str(result) ##ing | |
| return jsonify({"result": result}) | |
| def generate_translate(): ##??? | |
| """ | |
| buffer = request.json.get('inputs') | |
| inputs = { | |
| "system": f"{os.getenv('NINS_PROMPT_TRANSLATE')}\n{buffer.get('system')}", | |
| "user": buffer.get('user'), | |
| "models": buffer.get('models'), ##gpt-4o-mini | |
| } | |
| result = fn_main_llms(inputs) ##??? | |
| result = str_to_json(result) ##ing | |
| ##for k, v in result.items(): print(k, v) | |
| sl = list(result.items())[0][1] | |
| sl_pro = list(result.items())[1][1] | |
| il = list(result.items())[2][1] | |
| il_pro = list(result.items())[3][1] | |
| ##tl = list(result.items())[4][1] ##可能目標都是英文的時候 | |
| ##tl_pro = list(result.items())[5][1] ##可能目標都是英文的時候 | |
| embeddings = fn_main_ds({'a': il, 'b': sl}) ##embeddings OR labse | |
| result['embeddings'] = embeddings ##embeddings OR labse | |
| result = json_to_str(result) ##ing | |
| return jsonify({"result": result}) | |
| """ | |
| ##??? | |
| def generate_llms_vt(): ##??? | |
| inputs = request.json.get('inputs') | |
| result = fn_main_llms(inputs) ##??? | |
| return jsonify({"result": result}) | |
| # 改! | |
| # 改! | |
| def embeddings_multiple_vt(): | |
| inputs = request.json.get('inputs') | |
| result = fn_main_ds(inputs) ##??? | |
| return jsonify({"result": result}) | |
| ##??? | |
| def webpush_ninscc(): ##??? | |
| inputs = request.json.get('inputs') | |
| result = fn_main_webpush(inputs) ##??? | |
| return jsonify({"result": result}) | |
| ##??? | |
| def decode_url(): ##??? | |
| inputs = request.json.get('inputs') | |
| result = fn_main_deurl(inputs) ##??? | |
| return jsonify({"result": result}) | |
| ##??? | |
| def dataset_update(): ##??? | |
| inputs = request.json.get('inputs') | |
| result = fn_main_dataset(inputs) ##??? | |
| return jsonify({"result": result}) | |
| # 改! ##20260420 | |
| def channels_get(): | |
| data = request.get_json() # VS request.json() | |
| email = request.args.get("email") or data.get("email") | |
| ##email = request.args.get("email") | |
| ##result = token_channel(email) ##search_sheet | |
| result = search_sheet(gid=GS_ID, index="index", target="webpush", signal="tag", verify="***", value=email) | |
| return jsonify({"result": result}) | |
| # 改! | |
| def realtime_ably(): # 改! | |
| return token_ably(AR_APIKEY, None) | |
| """ | |
| @app.route("/telegram/sendMessage", methods=["GET", "POST"]) | |
| def telegram_sendMessage(): | |
| pass | |
| token = TG_TOKEN | |
| chat_id = request.values.get("chatid") | |
| text = request.values.get("text") | |
| keyboard = {} | |
| url = f"https://api.telegram.org/bot{token}/sendMessage?chat_id={chat_id}&text={text}" | |
| params = { | |
| "chat_id": chat_id, | |
| "text": text, ##Hello from Python | |
| "parse_mode": "HTML", ##Markdown, MarkdownV2, HTML | |
| ##"reply_markup": json.dumps(keyboard)##keyboard # ✔ 直接丟 dict | |
| } | |
| data = None | |
| with httpx.Client(timeout=20) as client: | |
| resp = client.get(url, params=params) | |
| data = resp.json() | |
| return data | |
| r = requests.get(url, params=params, timeout=20) | |
| ##r = requests.post(url, data=params, timeout=(3, 20)) | |
| print(r.json()) | |
| return r.json() | |
| @app.route("/telegram/sendMessage_vt", methods=["GET", "POST"]) | |
| async def send_message_vt(): | |
| # 完美!同時支援 GET 和 POST 傳參 | |
| token = TG_TOKEN | |
| chat_id = request.values.get("chatid") | |
| text = request.values.get("text") | |
| if not chat_id or not text: | |
| return {"ok": False, "description": "Bad Request: chatid or text is missing"}, 400 | |
| url = f"https://api.telegram.org/bot{token}/sendMessage" | |
| payload = { | |
| "chat_id": chat_id, | |
| "text": text | |
| } | |
| try: | |
| # 設定較長的 timeout 應對 Hugging Face 的網路波動 | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.post(url, data=payload) | |
| # 直接回傳 Telegram API 的結果,方便除錯 | |
| return response.json() | |
| except httpx.ConnectTimeout: | |
| return {"ok": False, "error_code": 504, "description": "Gateway Timeout: Telegram connection timed out"}, 504 | |
| except Exception as e: | |
| return {"ok": False, "description": f"Internal Error: {str(e)}"}, 500 | |
| """ | |
| import platform | |
| if __name__ == "__main__": | |
| sys = platform.system() | |
| print(sys) | |
| if sys == "Windows": | |
| app.run(host="0.0.0.0", port=5800) | |
| else: | |
| app.run(host="0.0.0.0", port=7860) | |
| """ | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) """ |