| |
| |
|
|
| from fastapi import FastAPI, HTTPException, Depends |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, ValidationError, validator |
| from typing import List |
| import csv |
| import datetime |
| import os |
|
|
| from transformers import AutoTokenizer, AutoModelForSequenceClassification |
| import torch |
| from rank_bm25 import BM25Okapi |
| from janome.tokenizer import Tokenizer |
| import pandas as pd |
| import torch.nn.functional as F |
| import re |
| import openai |
| import httpx |
| import asyncio |
| import unicodedata |
|
|
| import aioredis |
| import random |
| import smtplib |
| from email.mime.text import MIMEText |
| from email.header import Header |
| from random import randint |
| from fastapi import HTTPException, Depends, FastAPI |
| from pydantic import BaseModel |
| from email.mime.multipart import MIMEMultipart |
|
|
| |
| """ |
| sudo apt update |
| sudo apt install redis-server |
| |
| """ |
| redis = aioredis.from_url("redis://127.0.0.1:6379", encoding="utf-8", decode_responses=True) |
|
|
| |
| import requests |
| webhook_url = 'https://discord.com/api/webhooks/example' |
| webhook_url2 = 'https://discord.com/api/webhooks/example2' |
|
|
|
|
| app = FastAPI() |
|
|
| |
| origins = [ |
| "http://172.16.9.200", |
| "http://localhost:3000", |
| "http://localhost:8000", |
| ] |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=origins, |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
|
|
| class EmailValidationRequest(BaseModel): |
| email: str |
| password: str |
|
|
| |
| @validator('email', pre=True, always=True) |
| def append_domain(cls, v): |
| return f"{v}@example.com" |
|
|
| |
| @validator('email') |
| def validate_email_range(cls, v): |
| user_id = int(v.split('@')[0]) |
| if 32181001 <= user_id <= 32246999: |
| return v |
| |
| elif user_id == 00000000: |
| return v |
| |
| |
| data = { |
| 'content': f'【ValueError】Email is out of the allowed range {user_id}@example.com', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise ValueError('Email is out of the allowed range') |
|
|
|
|
| class ChatGPTRequest(BaseModel): |
| loggedInEmail: str |
| message: str |
|
|
|
|
| class EmailValidationRequestForReset(BaseModel): |
| email: str |
| password: str |
|
|
| |
| @validator('email', pre=True, always=True) |
| def append_domain(cls, v): |
| return f"{v}@example.com" |
|
|
| |
| @validator('email') |
| def validate_email_range(cls, v): |
| user_id = int(v.split('@')[0]) |
| if 32181001 <= user_id <= 32246999: |
| return v |
| |
| elif user_id == 00000000: |
| return v |
| |
| |
| data = { |
| 'content': f'【ValueError】Email is out of the allowed range {user_id}@example.com', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise ValueError('Email is out of the allowed range') |
| |
|
|
| class EmailValidationRequestForChange(BaseModel): |
| email: str |
| enterednewpassword: str |
| enteredSixPassword: str |
|
|
| |
| @validator('email', pre=True, always=True) |
| def append_domain(cls, v): |
| return f"{v}@example.com" |
| |
|
|
|
|
|
|
| |
| def request_time_validation(): |
| current_hour = datetime.datetime.now().hour |
| if not 8 <= current_hour <= 16: |
|
|
| |
| data = { |
| 'content': '【400】Request out of allowed time', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise HTTPException(status_code=400, detail="Request out of allowed time") |
| return True |
|
|
| |
| def check_data_in_specific_line(): |
| with open("/home/example/server/ChatGPT Web App(新Googleサイト用) - アクセス制限検出.csv", mode='r', encoding='utf-8') as csv_file: |
| reader = csv.reader(csv_file) |
| for i, row in enumerate(reader, start=1): |
| if i == 51 and row: |
| |
| |
| data = { |
| 'content': '【400】Access restricted due to specific data presence', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise HTTPException(status_code=400, detail="Access restricted due to specific data presence") |
| return True |
|
|
| |
| async def validate_file_path(file_path: str) -> bool: |
| if file_path is None: |
|
|
| |
| data = { |
| 'content': '【400】File path is required', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise HTTPException(status_code=400, detail="File path is required") |
| if not os.path.exists(file_path): |
|
|
| |
| data = { |
| 'content': '【404】File not found', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise HTTPException(status_code=404, detail="File not found") |
| return True |
|
|
| |
| async def validate_email_password(email: str, password: str): |
| user_id = int(email.split('@')[0]) |
| year = email[2:4] |
|
|
| print(email) |
| print(year) |
|
|
| file_path = f"/home/example/server/login_data/ChatGPT Web App(新Googleサイト用) - ログイン情報({year}).csv" |
|
|
| |
| if email == "00000000@example.com": |
| if password == "00000000": |
| return True |
| else: |
| |
| |
| data = { |
| 'content': f'【401】Invalid email/password combination {email} {password}', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise HTTPException(status_code=401, detail="Invalid email/password combination") |
|
|
| |
| if not os.path.exists(file_path): |
|
|
| |
| data = { |
| 'content': '【404】Login data file not found', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise HTTPException(status_code=404, detail="Login data file not found") |
| |
| with open(file_path, mode='r', encoding='utf-8') as csv_file: |
| reader = csv.reader(csv_file) |
| first_user_id = int(f"32{year}1001") |
| row_number = user_id - first_user_id |
| for i, row in enumerate(reader): |
| if i == row_number: |
| _, row_password = row[0].split('(') |
| row_password = row_password[:-1] |
| if row_password == "********": |
| |
| data = { |
| 'content': f'【401】Delieted account by Admin {email}', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
| raise HTTPException(status_code=401, detail="無効なアカウント") |
| if row_password == password: |
| return {"authenticated": True} |
| break |
|
|
| |
| data = { |
| 'content': f'【401】Invalid email/password combination {email} {password}', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise HTTPException(status_code=401, detail="Invalid email/password combination") |
|
|
| |
| async def validate_email_uniqueness(email: str): |
| file_path = "/home/example/server/ChatGPT Web App(新Googleサイト用) - メールアドレス記録(毎日リセット).csv" |
| with open(file_path, mode='r', encoding='utf-8') as csv_file: |
| reader = csv.reader(csv_file) |
| for row in reader: |
| |
| if row and len(row) > 1 and row[1] == email: |
|
|
| |
| data = { |
| 'content': f'【409】Email already exists {email}', |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| raise HTTPException(status_code=409, detail="Email already exists") |
| return True |
|
|
| |
| async def record_email(email: str): |
| |
| if email.split('@')[0] == "00000000": |
| return True |
|
|
| file_paths = [ |
| "/home/example/server/ChatGPT Web App(新Googleサイト用) - アクセス制限検出.csv", |
| "/home/example/server/ChatGPT Web App(新Googleサイト用) - メールアドレス記録(毎日リセット).csv" |
| ] |
|
|
| for file_path in file_paths: |
| with open(file_path, mode='a', encoding='utf-8', newline='') as csv_file: |
| writer = csv.writer(csv_file) |
| |
| current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
| |
| writer.writerow([current_time, email]) |
|
|
|
|
|
|
|
|
| |
| inappropriate_keywords: List[str] = [ |
| "不適切な単語1", |
| "不適切な単語2", |
| |
| ] |
|
|
| def filter_message_content(message: str) -> str: |
| """ |
| メッセージ内の不適切な言葉をフィルタリングします。 |
| 不適切な内容が含まれている場合は、HTTPExceptionを投げます。 |
| """ |
| |
| def normalize_text(text: str) -> str: |
| return unicodedata.normalize('NFKC', text).lower() |
|
|
| |
| normalized_message = normalize_text(message) |
|
|
| |
| normalized_keywords = [normalize_text(keyword) for keyword in inappropriate_keywords] |
|
|
| |
| if any(re.search(keyword, normalized_message, flags=re.IGNORECASE) for keyword in normalized_keywords): |
| raise HTTPException(status_code=400, detail="Message contains inappropriate content") |
|
|
| print(f"\nフィルタリングにクリアしたクエリ:{message}") |
| return message |
|
|
|
|
| |
|
|
| |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| |
| tokenizer_path = '/home/example/server/stsb-xlm-r-multilingual' |
| tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) |
|
|
| |
| model_path = '/home/example/server/classify-net(epoc-10000)' |
| model = AutoModelForSequenceClassification.from_pretrained(model_path, num_labels=11) |
| model.to(device) |
|
|
|
|
|
|
| def search_bm25(docs, query, num_results=3): |
| |
| tokenizer = Tokenizer() |
| |
| tokenized_docs = [list(tokenizer.tokenize(str(doc), wakati=True)) for doc in docs] |
| bm25 = BM25Okapi(tokenized_docs) |
| tokenized_query = list(tokenizer.tokenize(str(query), wakati=True)) |
| scores = bm25.get_scores(tokenized_query) |
| top_indexes = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:num_results] |
| return [(docs[i], scores[i]) for i in top_indexes] |
|
|
| def predict_category(text, threshold=0.5): |
| |
| model.eval() |
| inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True).to(device) |
| with torch.no_grad(): |
| outputs = model(**inputs) |
| probabilities = F.softmax(outputs.logits, dim=1) |
| max_prob, predicted_category = torch.max(probabilities, dim=1) |
| |
| if max_prob.item() > threshold: |
| return predicted_category.item() |
| else: |
| return -1 |
|
|
|
|
|
|
| def search_bm25_all_data(query, csv_file_paths, num_results=3): |
| |
| tokenizer = Tokenizer() |
| |
| all_search_results = [] |
| |
| for path in csv_file_paths.values(): |
| df = pd.read_csv(path, header=None) |
| docs = df[0].tolist() |
| tokenized_docs = [list(tokenizer.tokenize(str(doc), wakati=True)) for doc in docs] |
| bm25 = BM25Okapi(tokenized_docs) |
| tokenized_query = list(tokenizer.tokenize(str(query), wakati=True)) |
| scores = bm25.get_scores(tokenized_query) |
| top_indexes = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:num_results] |
| for i in top_indexes: |
| all_search_results.append((docs[i], scores[i])) |
|
|
| |
| all_search_results.sort(key=lambda x: x[1], reverse=True) |
|
|
| |
| return all_search_results[:num_results] |
|
|
|
|
|
|
| def generate_nouns_with_similarity(query): |
| tokenizer = Tokenizer() |
| |
| nouns = [token.surface for token in tokenizer.tokenize(query) if token.part_of_speech.startswith('名詞')] |
| return nouns |
|
|
| |
| def search_documents_by_nouns(csv_file_paths, query): |
| |
| nouns = generate_nouns_with_similarity(query) |
| document_scores = {} |
|
|
| for label, path in csv_file_paths.items(): |
| try: |
| df = pd.read_csv(path, header=None) |
| for index, row in df.iterrows(): |
| doc = str(row[0]) |
| |
| score = sum(doc.count(noun) for noun in nouns) |
| if score > 0: |
| |
| document_scores[doc] = score |
| except Exception as e: |
| print(f"ファイル {path} の処理中にエラーが発生しました: {e}") |
|
|
| |
| ranked_documents = sorted(document_scores.items(), key=lambda x: x[1], reverse=True) |
| if not ranked_documents: |
| return "該当する結果なし" |
| else: |
| |
| return ranked_documents |
|
|
|
|
| |
|
|
| |
| replacement_dict = { |
| "Instagram": "SNS", |
| "インスタグラム": "SNS", |
| "インスタ": "SNS", |
| "ツイッター": "SNS", |
| "Twitter": "SNS", |
| "X": "SNS", |
| "エックス": "SNS", |
| "facebook": "SNS", |
| "FaceBook": "SNS", |
| "フェイスブック": "SNS", |
| "フェイス": "SNS", |
| "Misskey": "SNS", |
| "ミスキー": "SNS", |
| "LINE": "SNS", |
| "ライン": "SNS", |
| "Tiktok": "SNS", |
| "BeReal": "SNS", |
| "Bereal": "SNS", |
| "beReal": "SNS", |
| "bereal": "SNS", |
| "ビーリアル": "SNS", |
| "Bluesky": "SNS", |
| "BlueSky": "SNS", |
| "bluesky": "SNS", |
| "ブルースカイ": "SNS", |
| "pinterest": "SNS", |
| "Pinterest": "SNS", |
| "Threads": "SNS", |
| "threads": "SNS", |
| "スレッズ": "SNS", |
| "tik": "SNS", |
| "Tik": "SNS", |
| "Tok": "SNS", |
| "tok": "SNS", |
| "TikTok": "SNS", |
| "tiktok": "SNS", |
| "ティックトック": "SNS", |
| "自販機": "自動販売機", |
| } |
|
|
| def normalize_string(s): |
| normalized = unicodedata.normalize('NFKC', s).lower() |
| return normalized |
|
|
| def search_replacement_in_csv_files(csv_file_paths, query): |
| results = [] |
| query_normalized = normalize_string(query) |
|
|
| for key, replacement in replacement_dict.items(): |
| if normalize_string(key) in query_normalized: |
| replacement_normalized = normalize_string(replacement) |
| for csv_key, path in csv_file_paths.items(): |
| if 0 <= csv_key <= 10: |
| df = pd.read_csv(path, header=None) |
| for index, row in df.iterrows(): |
| if replacement_normalized in normalize_string(str(row[0])): |
| results.append(row[0]) |
| return results |
|
|
| |
|
|
|
|
| def main(query): |
|
|
| |
| results_list = [] |
|
|
| results_list.append(f"\nクエリ: {query}") |
|
|
| predicted_category = predict_category(query, threshold=0.5) |
| if predicted_category == -1: |
| results_list.append("確信度が閾値を下回るため、カテゴリに基づく検索は行いません。") |
| else: |
| results_list.append(f"\nカテゴリ(Powered by BERT): {predicted_category}") |
|
|
| |
| csv_file_paths = { |
| 0: '/home/example/server/school_rules_corrected_corrected/rule(0).csv', |
| 1: '/home/example/server/school_rules_corrected_corrected/rule(1).csv', |
| 2: '/home/example/server/school_rules_corrected_corrected/rule(2).csv', |
| 3: '/home/example/server/school_rules_corrected_corrected/rule(3).csv', |
| 4: '/home/example/server/school_rules_corrected_corrected/rule(4).csv', |
| 5: '/home/example/server/school_rules_corrected_corrected/rule(5).csv', |
| 6: '/home/example/server/school_rules_corrected_corrected/rule(6).csv', |
| 7: '/home/example/server/school_rules_corrected_corrected/rule(7).csv', |
| 8: '/home/example/server/school_rules_corrected_corrected/rule(8).csv', |
| 9: '/home/example/server/school_rules_corrected_corrected/rule(9).csv', |
| 10: '/home/example/server/school_rules_corrected_corrected/rule(10).csv' |
| |
| } |
|
|
| if predicted_category == -1: |
| pass |
| else: |
| selected_csv_path = csv_file_paths[predicted_category] |
| df = pd.read_csv(selected_csv_path, header=None) |
| docs = df[0].tolist() |
|
|
| |
| category_search_results = search_bm25(docs, query) |
| results_list.append("\nカテゴリの検索結果(Powered by BM25):") |
| for doc, score in category_search_results: |
| results_list.append(f"スコア: {score:.2f}, {doc}") |
| |
| |
| new_file_paths = [ |
| '/home/example/server/school_rules_additional/patch.csv', |
| ] |
| new_key = max(csv_file_paths.keys()) + 1 |
| for new_path in new_file_paths: |
| if os.path.exists(new_path): |
| csv_file_paths[new_key] = new_path |
| new_key += 1 |
| else: |
| results_list.append(f"ファイルが存在しないためスキップされました: {new_path}") |
|
|
| |
| all_data_search_results = search_bm25_all_data(query, csv_file_paths) |
| results_list.append("\n全データ検索結果(Powered by BM25):") |
| for doc, score in all_data_search_results: |
| results_list.append(f"スコア: {score:.2f}, {doc}") |
|
|
| |
| results_list.append("\n形態素解析による検索結果(Powered by Janome):") |
| janome_search_results = search_documents_by_nouns(csv_file_paths, query) |
| if janome_search_results == "該当する結果なし": |
| results_list.append(janome_search_results) |
| else: |
| |
| extended_nouns = generate_nouns_with_similarity(query) |
| results_list.append("抽出した名詞:" + ", ".join(extended_nouns)) |
| for doc, score in janome_search_results: |
| results_list.append(f"スコア: {score}:{doc}") |
|
|
| |
| results_list.append("\n言い換え拡張検索結果:") |
| search_results = search_replacement_in_csv_files(csv_file_paths, query) |
| if search_results: |
| for result in search_results: |
| results_list.append(result) |
| else: |
| results_list.append("クエリに対応する言い換えが辞書にありません。") |
| |
| return results_list |
|
|
| |
|
|
|
|
| def get_chatgpt_response(query: str): |
|
|
| |
| if isinstance(query, list): |
| query = "\n".join(query).strip() |
| else: |
| query = query |
|
|
| print(f"\n{query}") |
| |
| |
| openai.api_key = "example" |
| response = openai.ChatCompletion.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role": "system", "content": "You are an excellent assistant who never answers questions about anything other than school rules. Below are questions about school rules and search results from a database. Please kindly respond to the query in Japanese with the most credible sentence that reflects the results of the query. Additionally, paraphrase expansion search is highly reliable information.When asked for a reason, please provide a specific reason. When asked how to do something, please provide specific instructions on how to do it. If you do not know, please answer that you do not know."}, |
| {"role": "user", "content": query} |
| ], |
| max_tokens=200 |
| ) |
|
|
| try: |
| |
| text_response = response['choices'][0]['message']['content'] |
| print(f"\nChatGPTの回答:{text_response}") |
| return text_response |
| |
| except KeyError: |
| |
| print({"status": "error", "response": "Response does not contain 'text'."}) |
|
|
| |
| data = { |
| 'content': """【KeyError】{"status": "error", "response": "Response does not contain 'text'."}""", |
| } |
| |
| response = requests.post(webhook_url, json=data) |
|
|
| return({"status": "error", "response": "Response does not contain 'text'."}) |
|
|
|
|
|
|
| |
|
|
| |
| async def store_reset_code(email: str, code: str): |
| |
| await redis.set(email, code, ex=600) |
|
|
| async def get_reset_code(email: str): |
| return await redis.get(email) |
|
|
|
|
| |
| def send_email(from_email, to_email, subject, message, smtp_password): |
| |
| msg = MIMEMultipart('alternative') |
| msg['From'] = Header(from_email, 'utf-8') |
| msg['To'] = Header(to_email, 'utf-8') |
| msg['Subject'] = Header(subject, 'utf-8') |
|
|
| |
| msg.attach(MIMEText(message, 'html', 'utf-8')) |
|
|
| |
| server = smtplib.SMTP_SSL('smtp.gmail.com', 465) |
| server.login(from_email, smtp_password) |
| server.sendmail(from_email, [to_email], msg.as_string()) |
| server.quit() |
|
|
|
|
|
|
| |
| @app.post("/api/validateAndRecordEmail") |
| |
|
|
| |
| async def validate_and_record_email(request: EmailValidationRequest, data_check: bool = Depends(check_data_in_specific_line)): |
|
|
| |
| await validate_file_path(file_path="/home/example/server/ChatGPT Web App(新Googleサイト用) - アクセス制限検出.csv") |
| await validate_file_path(file_path="/home/example/server/ChatGPT Web App(新Googleサイト用) - メールアドレス記録(毎日リセット).csv") |
|
|
| |
| await validate_email_password(request.email, request.password) |
| await validate_email_uniqueness(request.email) |
| await record_email(request.email) |
|
|
| return {"status": "ok", "email": request.email} |
|
|
|
|
|
|
| |
| @app.post("/api/chatGPTbackend") |
| |
| async def chat_gpt_backend(request: ChatGPTRequest): |
|
|
| |
| filtered_message = filter_message_content(request.message) |
|
|
| |
| searched_query = main(filtered_message) |
|
|
| |
| chatGPT_response = get_chatgpt_response(searched_query) |
|
|
| |
| |
| if isinstance(chatGPT_response, str): |
| filtered_chatGPT_response = filter_message_content(chatGPT_response) |
| else: |
|
|
| |
| data = { |
| 'content': "【500】Invalid response from ChatGPT.", |
| } |
| |
| response = requests.post(webhook_url, json=data) |
| |
| |
| raise HTTPException(status_code=500, detail="Invalid response from ChatGPT.") |
|
|
| |
| csv_file_path = "/home/example/server/ChatGPT Web App(新Googleサイト用) - 履歴.csv" |
| with open(csv_file_path, mode='a', encoding='utf-8', newline='') as csv_file: |
| writer = csv.writer(csv_file) |
| |
| current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
| |
| row = [current_time, request.loggedInEmail, request.message, filtered_chatGPT_response] |
| |
| writer.writerow(row) |
|
|
| |
| if isinstance(searched_query, list): |
| searched_query = "\n".join(searched_query).strip() |
| else: |
| searched_query = searched_query |
| |
| |
| log_message = f"メールアドレス: {request.loggedInEmail}\n\nフィルタリング済みのクエリ:{filtered_message}\n{searched_query}\n\nChatGPTの初期回答:{chatGPT_response}\n\nChatGPTのフィルタリング済みの回答:{filtered_chatGPT_response}" |
|
|
| |
| if len(log_message) > 2000: |
| log_message = log_message[:2000] |
|
|
| |
| data = { |
| 'content': log_message |
| } |
|
|
| |
| response = requests.post(webhook_url2, json=data) |
|
|
| |
| return {"status": "ok", "response": filtered_chatGPT_response} |
|
|
|
|
|
|
| |
| @app.post("/api/validateAndRecordEmailForReset") |
|
|
| |
| async def validate_and_record_email_for_reset(request: EmailValidationRequestForReset, data_check: bool = Depends(check_data_in_specific_line)): |
|
|
| if request.email == "00000000@example.com": |
| raise HTTPException(status_code=403, detail="Can't Change") |
| else: |
| |
| await validate_email_password(request.email, request.password) |
|
|
| code = randint(100000, 999999) |
| await store_reset_code(request.email, str(code)) |
| |
| |
| from_email = 'noreply@example.com' |
| to_email = request.email |
| subject = f'【RuleScolar】2段階認証コードは {code} です' |
| message = f""" |
| <html> |
| <head> |
| <style> |
| body {{ |
| font-family: 'Arial', sans-serif; |
| margin: 0; |
| padding: 0; |
| background-color: #ffffff; |
| }} |
| .container {{ |
| max-width: 600px; |
| margin: 20px auto; |
| padding: 20px; |
| background-color: #f5f5f5; |
| border: 1px solid #011627; |
| border-radius: 50px; |
| box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); |
| }} |
| .header {{ |
| background-color: #f5f5f5; /* 背景色を薄いグレーに設定 */ |
| color: #011627; /* テキストの色を赤に設定 */ |
| padding: 10px; |
| text-align: center; |
| border-radius: 50px 50px 0 0; |
| border-bottom: 1px solid #011627; /* 下線を赤色で2pxの太さに設定 */ |
| }} |
| .code {{ |
| margin: 20px 0; |
| padding: 10px; |
| background-color: #ffffff; |
| text-align: center; |
| border-radius: 30px; |
| }} |
| .footer {{ |
| text-align: center; |
| margin-top: 20px; |
| font-size: 12px; |
| color: #666; |
| }} |
| </style> |
| </head> |
| <body> |
| <div class="container"> |
| <div class="header"> |
| <h1>R u l e S c h o l a r</h1> |
| </div> |
| <br><h3><strong>{request.email} 様</strong></h3> |
| <p>アカウントのセキュリティコードは以下のとおりです。</p> |
| <div class="code"> |
| <strong style="font-size: 22px; color: #F71735;">{code}</strong> |
| </div> |
| <p>コードは10分間有効です。<br>このメールに心当たりがない場合、またはご不明な点がある場合は、すぐにアカウントのセキュリティを確認してください。</p> |
| <div class="footer"> |
| このメールは自動送信されていますので、こちらに返信しないでください。<br> |
| ご質問がある場合は、生徒会までお問い合わせください。<br><br> |
| ©生徒会 |
| </div> |
| </div> |
| </body> |
| </html> |
| """ |
| smtp_password = 'example' |
| send_email(from_email, to_email, subject, message, smtp_password) |
|
|
| return {"status": "ok", "email": request.email} |
|
|
|
|
|
|
| |
| @app.post("/api/ChangePassword") |
|
|
| |
| async def change_password(request: EmailValidationRequestForChange): |
|
|
| print(request.email, request.enterednewpassword, request.enteredSixPassword) |
|
|
| |
| stored_code = await get_reset_code(request.email) |
| |
| if stored_code is None: |
| raise HTTPException(status_code=404, detail="No reset code found for this email address.") |
| |
| |
| if stored_code != request.enteredSixPassword: |
| raise HTTPException(status_code=403, detail="Invalid code provided.") |
| |
| |
| print("Verification successful.") |
|
|
| |
| csv_path = "/home/example/server/ChatGPT Web App(新Googleサイト用) - パスワード自動変更.csv" |
| with open(csv_path, mode='a', newline='') as file: |
| writer = csv.writer(file) |
| writer.writerow([request.email, str(request.enterednewpassword)]) |
|
|
| return {"status": "success", "message": "Password changed successfully."} |
|
|