| |
| |
| from flask import Flask, request, make_response |
| import hashlib |
| import time |
| import xml.etree.ElementTree as ET |
| import os |
| import json |
| from openai import OpenAI |
| from dotenv import load_dotenv |
| from duckduckgo_search import DDGS |
| import requests |
| import smtplib |
| from email.mime.text import MIMEText |
| from email.mime.multipart import MIMEMultipart |
|
|
| |
| load_dotenv() |
|
|
| app = Flask(__name__) |
|
|
| |
| TOKEN = os.getenv('TOKEN') |
| API_KEY = os.getenv("API_KEY") |
| BASE_URL = os.getenv("OPENAI_BASE_URL") |
| emailkey = os.getenv("EMAIL_KEY") |
|
|
| client = OpenAI(api_key=API_KEY, base_url=BASE_URL) |
|
|
| |
| user_sessions = {} |
|
|
| FUNCTIONS = [ |
| { |
| "name": "search_duckduckgo", |
| "description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "keywords": { |
| "type": "array", |
| "items": {"type": "string"}, |
| "description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。" |
| } |
| }, |
| "required": ["keywords"] |
| } |
| }, |
| { |
| "name": "search_papers", |
| "description": "使用Crossref API搜索学术论文。", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "query": { |
| "type": "string", |
| "description": "搜索查询字符串。例如:'climate change'。" |
| } |
| }, |
| "required": ["query"] |
| } |
| }, |
| { |
| "name": "send_email", |
| "description": "发送电子邮件。", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "to": { |
| "type": "string", |
| "description": "收件人邮箱地址" |
| }, |
| "subject": { |
| "type": "string", |
| "description": "邮件主题" |
| }, |
| "content": { |
| "type": "string", |
| "description": "邮件内容" |
| } |
| }, |
| "required": ["to", "subject", "content"] |
| } |
| } |
| ] |
|
|
| def verify_wechat(request): |
| data = request.args |
| signature = data.get('signature') |
| timestamp = data.get('timestamp') |
| nonce = data.get('nonce') |
| echostr = data.get('echostr') |
| |
| temp = [timestamp, nonce, TOKEN] |
| temp.sort() |
| temp = ''.join(temp) |
| |
| if (hashlib.sha1(temp.encode('utf8')).hexdigest() == signature): |
| return echostr |
| else: |
| return 'error', 403 |
|
|
| def getUserMessageContentFromXML(xml_content): |
| root = ET.fromstring(xml_content) |
| content = root.find('Content').text |
| from_user_name = root.find('FromUserName').text |
| to_user_name = root.find('ToUserName').text |
| return content, from_user_name, to_user_name |
|
|
| def generate_response_xml(from_user_name, to_user_name, output_content): |
| output_xml = ''' |
| <xml> |
| <ToUserName><![CDATA[%s]]></ToUserName> |
| <FromUserName><![CDATA[%s]]></FromUserName> |
| <CreateTime>%s</CreateTime> |
| <MsgType><![CDATA[text]]></MsgType> |
| <Content><![CDATA[%s]]></Content> |
| </xml>''' |
| |
| response = make_response(output_xml % (from_user_name, to_user_name, str(int(time.time())), output_content)) |
| response.content_type = 'application/xml' |
| return response |
|
|
| def get_openai_response(messages): |
| try: |
| tools = [{"type": "function", "function": func} for func in FUNCTIONS] |
| response = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=messages, |
| tools=tools, |
| tool_choice="auto" |
| ) |
| return response.choices[0].message |
| except Exception as e: |
| print(f"调用OpenAI API时出错: {str(e)}") |
| return None |
|
|
| def split_message(message, max_length=500): |
| return [message[i:i+max_length] for i in range(0, len(message), max_length)] |
|
|
| def search_duckduckgo(keywords): |
| search_term = " ".join(keywords) |
| with DDGS() as ddgs: |
| return list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5)) |
|
|
| def search_papers(query): |
| url = f"https://api.crossref.org/works?query={query}" |
| response = requests.get(url) |
| if response.status_code == 200: |
| data = response.json() |
| papers = data['message']['items'][:5] |
| return [{"title": paper.get('title', [''])[0], "DOI": paper.get('DOI', '')} for paper in papers] |
| else: |
| return [] |
|
|
| def send_email(to, subject, content): |
| try: |
| with smtplib.SMTP('106.15.184.28', 8025) as smtp: |
| smtp.login("jwt", emailkey) |
| message = MIMEMultipart() |
| message['From'] = "Me <aixiao@aixiao.xyz>" |
| message['To'] = to |
| message['Subject'] = subject |
| message.attach(MIMEText(content, 'html')) |
| smtp.sendmail("aixiao@aixiao.xyz", to, message.as_string()) |
| return True |
| except Exception as e: |
| print(f"发送邮件时出错: {str(e)}") |
| return False |
|
|
| def process_function_call(tool_call): |
| function_name = tool_call.function.name |
| function_args = json.loads(tool_call.function.arguments) |
| if function_name == "search_duckduckgo": |
| return search_duckduckgo(function_args.get('keywords', [])) |
| elif function_name == "search_papers": |
| return search_papers(function_args.get('query', '')) |
| elif function_name == "send_email": |
| return send_email(function_args.get('to', ''), function_args.get('subject', ''), function_args.get('content', '')) |
| else: |
| return None |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| @app.route('/api/wx', methods=['GET', 'POST']) |
| def wechatai(): |
| if request.method == 'GET': |
| return verify_wechat(request) |
| else: |
| xml_str = request.data |
| user_message_content, from_user_name, to_user_name = getUserMessageContentFromXML(xml_str) |
|
|
| if from_user_name not in user_sessions: |
| user_sessions[from_user_name] = {'messages': [], 'pending_response': []} |
|
|
| session = user_sessions[from_user_name] |
|
|
| if user_message_content.lower() == '继续': |
| if session['pending_response']: |
| response_content = session['pending_response'].pop(0) |
| if session['pending_response']: |
| response_content += '\n\n回复"继续"获取下一部分。' |
| else: |
| response_content += '\n\n回复结束。' |
| else: |
| response_content = "没有待发送的消息。" |
| else: |
| session['messages'].append({"role": "user", "content": user_message_content}) |
| |
| gpt_response = get_openai_response(session['messages']) |
|
|
| if gpt_response.tool_calls: |
| for tool_call in gpt_response.tool_calls: |
| function_result = process_function_call(tool_call) |
| session['messages'].append({ |
| "role": "function", |
| "name": tool_call.function.name, |
| "content": json.dumps(function_result) |
| }) |
| |
| |
| response_content = gpt_response.content |
| else: |
| response_content = gpt_response.content |
|
|
| session['messages'].append({"role": "assistant", "content": response_content}) |
|
|
| response_parts = split_message(response_content) |
| if len(response_parts) > 1: |
| response_content = response_parts[0] + '\n\n回复"继续"获取下一部分。' |
| session['pending_response'] = response_parts[1:] |
| else: |
| response_content = response_parts[0] |
|
|
| return generate_response_xml(from_user_name, to_user_name, response_content) |
| |
| if __name__ == '__main__': |
| app.run(host='0.0.0.0', port=7860, debug=True) |