Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| from flask import Flask, request, jsonify, send_from_directory | |
| from openai import OpenAI | |
| from bs4 import BeautifulSoup | |
| import random | |
| from functions import FUNCTIONS_GROUP_1, FUNCTIONS_GROUP_2, get_function_descriptions | |
| app = Flask(__name__) | |
| API_KEY = os.getenv("OPENAI_API_KEY") | |
| BASE_URL = os.getenv("OPENAI_BASE_URL") | |
| emailkey = os.getenv("EMAIL_KEY") | |
| client = OpenAI(api_key=API_KEY, base_url=BASE_URL) | |
| def search_duckduckgo(keywords): | |
| search_term = " ".join(keywords) | |
| url = "https://www.bing.com/search" | |
| user_agents = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15", | |
| ] | |
| headers = { | |
| "User-Agent": random.choice(user_agents) | |
| } | |
| params = { | |
| "q": search_term, | |
| "setlang": "zh-CN" | |
| } | |
| response = requests.get(url, params=params, headers=headers) | |
| results = [] | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| for item in soup.select('.b_algo')[:5]: # Limit to 5 results | |
| title_elem = item.select_one('h2 a') | |
| snippet_elem = item.select_one('.b_caption p') | |
| if title_elem and snippet_elem: | |
| results.append({ | |
| "title": title_elem.text, | |
| "href": title_elem['href'], | |
| "body": snippet_elem.text | |
| }) | |
| return results | |
| def search_papers(query): | |
| url = f"https://api.crossref.org/works?query={query}" | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| papers = data['message']['items'] | |
| processed_papers = [] | |
| for paper in papers: | |
| processed_paper = { | |
| "标题": paper.get('title', [''])[0], | |
| "作者": ", ".join([f"{author.get('given', '')} {author.get('family', '')}" for author in paper.get('author', [])]), | |
| "DOI": paper.get('DOI', ''), | |
| "ISBN": ", ".join(paper.get('ISBN', [])), | |
| "摘要": paper.get('abstract', '').replace('<p>', '').replace('</p>', '').replace('<italic>', '').replace('</italic>', '') | |
| } | |
| processed_papers.append(processed_paper) | |
| return processed_papers | |
| else: | |
| return [] | |
| def send_email(to, subject, content): | |
| try: | |
| with smtplib.SMTP('106.15.184.28', 8025) as smtp: | |
| smtp.login("jwt", emailkey) | |
| message = MIMEMultipart() | |
| message['From'] = "Me <aixiao@aixiao.xyz>" | |
| message['To'] = to | |
| message['Subject'] = subject | |
| message.attach(MIMEText(content, 'html')) | |
| smtp.sendmail("aixiao@aixiao.xyz", to, message.as_string()) | |
| return True | |
| except Exception as e: | |
| print(f"发送邮件时出错: {str(e)}") | |
| return False | |
| def get_openai_response(messages, model="gpt-4o-mini", functions=None, function_call=None): | |
| try: | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| functions=functions, | |
| function_call=function_call | |
| ) | |
| return response.choices[0].message | |
| except Exception as e: | |
| print(f"调用OpenAI API时出错: {str(e)}") | |
| return None | |
| def process_function_call(function_name, function_args): | |
| if function_name == "search_duckduckgo": | |
| keywords = function_args.get('keywords', []) | |
| if not keywords: | |
| return "搜索关键词为空,无法执行搜索。" | |
| return search_duckduckgo(keywords) | |
| elif function_name == "search_papers": | |
| query = function_args.get('query', '') | |
| if not query: | |
| return "搜索查询为空,无法执行论文搜索。" | |
| return search_papers(query) | |
| elif function_name == "send_email": | |
| to = function_args.get('to', '') | |
| subject = function_args.get('subject', '') | |
| content = function_args.get('content', '') | |
| if not to or not subject or not content: | |
| return "邮件信息不完整,无法发送邮件。" | |
| success = send_email(to, subject, content) | |
| return { | |
| "success": success, | |
| "message": "邮件发送成功" if success else "邮件发送失败", | |
| "to": to, | |
| "subject": subject, | |
| "content": content, | |
| "is_email": True | |
| } | |
| else: | |
| return "未知的函数调用。" | |
| def index(): | |
| return send_from_directory('.', 'index.html') | |
| def chat(): | |
| data = request.json | |
| question = data['question'] | |
| history = data.get('history', []) | |
| messages = history + [{"role": "user", "content": question}] | |
| status_log = [] | |
| # 次级模型1: 处理搜索相关函数 | |
| status_log.append("次级模型1:正在判断是否需要选调第一组函数") | |
| sub_model_1_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS_GROUP_1, function_call="auto") | |
| # 次级模型2: 处理邮件发送相关函数 | |
| status_log.append("次级模型2:正在判断是否需要选调第二组函数") | |
| sub_model_2_response = get_openai_response(messages, model="gpt-4o-mini", functions=FUNCTIONS_GROUP_2, function_call="auto") | |
| function_call_1 = sub_model_1_response.function_call if sub_model_1_response and sub_model_1_response.function_call else None | |
| function_call_2 = sub_model_2_response.function_call if sub_model_2_response and sub_model_2_response.function_call else None | |
| if not function_call_1: | |
| status_log.append("次级模型1:判断不需要选调第一组函数") | |
| if not function_call_2: | |
| status_log.append("次级模型2:判断不需要选调第二组函数") | |
| final_function_call = None | |
| response = None | |
| search_results = None | |
| email_sent = False | |
| if function_call_1 and function_call_2: | |
| # 裁决模型: 决定使用哪个函数调用 | |
| status_log.append("裁决模型:正在决定使用哪个函数调用") | |
| arbitration_messages = messages + [ | |
| {"role": "system", "content": "两个次级模型都建议使用函数。请决定使用哪个函数更合适。"}, | |
| {"role": "assistant", "content": f"次级模型1建议使用函数:{function_call_1.name}"}, | |
| {"role": "assistant", "content": f"次级模型2建议使用函数:{function_call_2.name}"} | |
| ] | |
| arbitration_response = get_openai_response(arbitration_messages, model="gpt-4o-mini") | |
| if "模型1" in arbitration_response.content or function_call_1.name in arbitration_response.content: | |
| final_function_call = function_call_1 | |
| status_log.append(f"裁决模型:决定使用函数 {function_call_1.name}") | |
| else: | |
| final_function_call = function_call_2 | |
| status_log.append(f"裁决模型:决定使用函数 {function_call_2.name}") | |
| elif function_call_1: | |
| final_function_call = function_call_1 | |
| status_log.append(f"次级模型1:决定使用函数 {function_call_1.name}") | |
| elif function_call_2: | |
| final_function_call = function_call_2 | |
| status_log.append(f"次级模型2:决定使用函数 {function_call_2.name}") | |
| else: | |
| status_log.append("所有次级模型:判断不需要进行任何函数调用") | |
| if final_function_call: | |
| function_name = final_function_call.name | |
| function_args = json.loads(final_function_call.arguments) | |
| status_log.append(f"正在执行函数 {function_name}") | |
| result = process_function_call(function_name, function_args) | |
| status_log.append(f"函数 {function_name} 执行完成") | |
| if isinstance(result, dict) and result.get("is_email", False): | |
| response = f"邮件{'已成功' if result['success'] else '未能成功'}发送到 {result['to']}。\n\n主题:{result['subject']}\n\n内容:\n{result['content']}" | |
| email_sent = result['success'] | |
| elif isinstance(result, list): | |
| search_results = result | |
| messages.append({ | |
| "role": "function", | |
| "name": function_name, | |
| "content": json.dumps(result, ensure_ascii=False) | |
| }) | |
| else: | |
| messages.append({ | |
| "role": "function", | |
| "name": function_name, | |
| "content": str(result) | |
| }) | |
| # 只有在没有邮件发送结果时才调用主模型 | |
| if not response: | |
| status_log.append("主模型:正在生成回答") | |
| final_response = get_openai_response(messages, model="gpt-4o-mini") | |
| response = final_response.content if final_response else "Error occurred" | |
| status_log.append("主模型:回答生成完成") | |
| return jsonify({ | |
| "response": response, | |
| "status_log": status_log, | |
| "search_results": search_results, | |
| "search_used": bool(search_results), | |
| "email_sent": email_sent | |
| }) | |
| def update_settings(): | |
| data = request.json | |
| max_history = data.get('max_history', 10) | |
| return jsonify({"status": "success", "max_history": max_history}) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |
| # from flask import Flask, request, jsonify, send_from_directory | |
| # import requests | |
| # from bs4 import BeautifulSoup | |
| # import random | |
| # import time | |
| # app = Flask(__name__) | |
| # def perform_bing_search(keywords): | |
| # search_term = " ".join(keywords) | |
| # url = "https://www.bing.com/search" | |
| # user_agents = [ | |
| # "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| # "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0", | |
| # "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15", | |
| # ] | |
| # headers = {"User-Agent": random.choice(user_agents)} | |
| # params = {"q": search_term, "setlang": "zh-CN"} | |
| # response = requests.get(url, params=params, headers=headers) | |
| # if response.status_code == 200: | |
| # soup = BeautifulSoup(response.text, 'html.parser') | |
| # results = soup.select('.b_algo') | |
| # search_results = [] | |
| # for result in results[:5]: # 只取前5个结果 | |
| # title = result.select_one('h2 a') | |
| # snippet = result.select_one('.b_caption p') | |
| # if title and snippet: | |
| # search_results.append({ | |
| # "title": title.text, | |
| # "url": title['href'], | |
| # "snippet": snippet.text | |
| # }) | |
| # return search_results | |
| # return [] | |
| # @app.route('/') | |
| # def index(): | |
| # return send_from_directory('.', 'we.html') | |
| # @app.route('/start_test', methods=['POST']) | |
| # def start_test(): | |
| # data = request.json | |
| # keywords = data['keywords'].split() | |
| # interval = int(data['interval']) | |
| # first_search = perform_bing_search(keywords) | |
| # time.sleep(interval) | |
| # second_search = perform_bing_search(keywords) | |
| # success = len(first_search) > 0 and len(second_search) > 0 | |
| # return jsonify({ | |
| # "success": success, | |
| # "first_search": first_search, | |
| # "second_search": second_search | |
| # }) | |
| # if __name__ == '__main__': | |
| # app.run(host='0.0.0.0', port=7860, debug=True) | |