| |
|
|
| import json |
| import os |
| import requests |
| import plugins |
| from bridge.context import ContextType |
| from bridge.reply import Reply, ReplyType |
| from common.log import logger |
| from plugins import * |
|
|
|
|
| @plugins.register( |
| name="Keyword", |
| desire_priority=900, |
| hidden=True, |
| desc="关键词匹配过滤", |
| version="0.1", |
| author="fengyege.top", |
| ) |
| class Keyword(Plugin): |
| def __init__(self): |
| super().__init__() |
| try: |
| curdir = os.path.dirname(__file__) |
| config_path = os.path.join(curdir, "config.json") |
| conf = None |
| if not os.path.exists(config_path): |
| logger.debug(f"[keyword]不存在配置文件{config_path}") |
| conf = {"keyword": {}} |
| with open(config_path, "w", encoding="utf-8") as f: |
| json.dump(conf, f, indent=4) |
| else: |
| logger.debug(f"[keyword]加载配置文件{config_path}") |
| with open(config_path, "r", encoding="utf-8") as f: |
| conf = json.load(f) |
| |
| self.keyword = conf["keyword"] |
|
|
| logger.info("[keyword] {}".format(self.keyword)) |
| self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context |
| logger.info("[keyword] inited.") |
| except Exception as e: |
| logger.warn("[keyword] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/keyword .") |
| raise e |
|
|
| def on_handle_context(self, e_context: EventContext): |
| if e_context["context"].type != ContextType.TEXT: |
| return |
|
|
| content = e_context["context"].content.strip() |
| logger.debug("[keyword] on_handle_context. content: %s" % content) |
| if content in self.keyword: |
| logger.info(f"[keyword] 匹配到关键字【{content}】") |
| reply_text = self.keyword[content] |
|
|
| |
| if (reply_text.startswith("http://") or reply_text.startswith("https://")) and any(reply_text.endswith(ext) for ext in [".jpg", ".jpeg", ".png", ".gif", ".img"]): |
| |
| reply = Reply() |
| reply.type = ReplyType.IMAGE_URL |
| reply.content = reply_text |
| |
| elif (reply_text.startswith("http://") or reply_text.startswith("https://")) and any(reply_text.endswith(ext) for ext in [".pdf", ".doc", ".docx", ".xls", "xlsx",".zip", ".rar"]): |
| |
| file_path = "tmp" |
| if not os.path.exists(file_path): |
| os.makedirs(file_path) |
| file_name = reply_text.split("/")[-1] |
| file_path = os.path.join(file_path, file_name) |
| response = requests.get(reply_text) |
| with open(file_path, "wb") as f: |
| f.write(response.content) |
| |
| reply = Reply() |
| reply.type = ReplyType.FILE |
| reply.content = file_path |
| |
| elif (reply_text.startswith("http://") or reply_text.startswith("https://")) and any(reply_text.endswith(ext) for ext in [".mp4"]): |
| |
| reply = Reply() |
| reply.type = ReplyType.VIDEO_URL |
| reply.content = reply_text |
| |
| else: |
| |
| reply = Reply() |
| reply.type = ReplyType.TEXT |
| reply.content = reply_text |
| |
| e_context["reply"] = reply |
| e_context.action = EventAction.BREAK_PASS |
| |
| def get_help_text(self, **kwargs): |
| help_text = "关键词过滤" |
| return help_text |
|
|