| """ |
| MIT License |
| |
| Copyright (c) 2021 Lxns-Network |
| |
| Permission is hereby granted, free of charge, to any person obtaining a copy |
| of this software and associated documentation files (the "Software"), to deal |
| in the Software without restriction, including without limitation the rights |
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| copies of the Software, and to permit persons to whom the Software is |
| furnished to do so, subject to the following conditions: |
| |
| The above copyright notice and this permission notice shall be included in all |
| copies or substantial portions of the Software. |
| |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
| SOFTWARE. |
| """ |
|
|
| import base64 |
| import json |
| import os |
| import typing as T |
| from enum import Enum |
| from pydantic.v1 import BaseModel |
|
|
|
|
| class ComponentType(Enum): |
| Plain = "Plain" |
| Face = "Face" |
| Record = "Record" |
| Video = "Video" |
| At = "At" |
| Node = "Node" |
| Nodes = "Nodes" |
| Poke = "Poke" |
| Image = "Image" |
| Reply = "Reply" |
| Forward = "Forward" |
| File = "File" |
|
|
| RPS = "RPS" |
| Dice = "Dice" |
| Shake = "Shake" |
| Anonymous = "Anonymous" |
| Share = "Share" |
| Contact = "Contact" |
| Location = "Location" |
| Music = "Music" |
| RedBag = "RedBag" |
| Xml = "Xml" |
| Json = "Json" |
| CardImage = "CardImage" |
| TTS = "TTS" |
| Unknown = "Unknown" |
|
|
|
|
| class BaseMessageComponent(BaseModel): |
| type: ComponentType |
|
|
| def toString(self): |
| output = f"[CQ:{self.type.lower()}" |
| for k, v in self.__dict__.items(): |
| if k == "type" or v is None: |
| continue |
| if k == "_type": |
| k = "type" |
| if isinstance(v, bool): |
| v = 1 if v else 0 |
| output += ",%s=%s" % ( |
| k, |
| str(v) |
| .replace("&", "&") |
| .replace(",", ",") |
| .replace("[", "[") |
| .replace("]", "]"), |
| ) |
| output += "]" |
| return output |
|
|
| def toDict(self): |
| data = {} |
| for k, v in self.__dict__.items(): |
| if k == "type" or v is None: |
| continue |
| if k == "_type": |
| k = "type" |
| data[k] = v |
| return {"type": self.type.lower(), "data": data} |
|
|
|
|
| class Plain(BaseMessageComponent): |
| type: ComponentType = "Plain" |
| text: str |
| convert: T.Optional[bool] = True |
|
|
| def __init__(self, text: str, convert: bool = True, **_): |
| super().__init__(text=text, convert=convert, **_) |
|
|
| def toString(self): |
| if not self.convert: |
| return self.text |
| return ( |
| self.text.replace("&", "&").replace("[", "[").replace("]", "]") |
| ) |
|
|
|
|
| class Face(BaseMessageComponent): |
| type: ComponentType = "Face" |
| id: int |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Record(BaseMessageComponent): |
| type: ComponentType = "Record" |
| file: T.Optional[str] = "" |
| magic: T.Optional[bool] = False |
| url: T.Optional[str] = "" |
| cache: T.Optional[bool] = True |
| proxy: T.Optional[bool] = True |
| timeout: T.Optional[int] = 0 |
| |
| path: T.Optional[str] |
|
|
| def __init__(self, file: T.Optional[str], **_): |
| for k in _.keys(): |
| if k == "url": |
| pass |
| |
| super().__init__(file=file, **_) |
|
|
| @staticmethod |
| def fromFileSystem(path, **_): |
| return Record(file=f"file:///{os.path.abspath(path)}", path=path, **_) |
|
|
| @staticmethod |
| def fromURL(url: str, **_): |
| if url.startswith("http://") or url.startswith("https://"): |
| return Record(file=url, **_) |
| raise Exception("not a valid url") |
|
|
|
|
| class Video(BaseMessageComponent): |
| type: ComponentType = "Video" |
| file: str |
| cover: T.Optional[str] = "" |
| c: T.Optional[int] = 2 |
| |
| path: T.Optional[str] = "" |
|
|
| def __init__(self, file: str, **_): |
| |
| |
| |
| super().__init__(file=file, **_) |
|
|
| @staticmethod |
| def fromFileSystem(path, **_): |
| return Video(file=f"file:///{os.path.abspath(path)}", path=path, **_) |
|
|
| @staticmethod |
| def fromURL(url: str, **_): |
| if url.startswith("http://") or url.startswith("https://"): |
| return Video(file=url, **_) |
| raise Exception("not a valid url") |
|
|
|
|
| class At(BaseMessageComponent): |
| type: ComponentType = "At" |
| qq: T.Union[int, str] |
| name: T.Optional[str] = "" |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class AtAll(At): |
| qq: str = "all" |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class RPS(BaseMessageComponent): |
| type: ComponentType = "RPS" |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Dice(BaseMessageComponent): |
| type: ComponentType = "Dice" |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Shake(BaseMessageComponent): |
| type: ComponentType = "Shake" |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Anonymous(BaseMessageComponent): |
| type: ComponentType = "Anonymous" |
| ignore: T.Optional[bool] = False |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Share(BaseMessageComponent): |
| type: ComponentType = "Share" |
| url: str |
| title: str |
| content: T.Optional[str] = "" |
| image: T.Optional[str] = "" |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Contact(BaseMessageComponent): |
| type: ComponentType = "Contact" |
| _type: str |
| id: T.Optional[int] = 0 |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Location(BaseMessageComponent): |
| type: ComponentType = "Location" |
| lat: float |
| lon: float |
| title: T.Optional[str] = "" |
| content: T.Optional[str] = "" |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Music(BaseMessageComponent): |
| type: ComponentType = "Music" |
| _type: str |
| id: T.Optional[int] = 0 |
| url: T.Optional[str] = "" |
| audio: T.Optional[str] = "" |
| title: T.Optional[str] = "" |
| content: T.Optional[str] = "" |
| image: T.Optional[str] = "" |
|
|
| def __init__(self, **_): |
| |
| |
| |
| super().__init__(**_) |
|
|
|
|
| class Image(BaseMessageComponent): |
| type: ComponentType = "Image" |
| file: T.Optional[str] = "" |
| _type: T.Optional[str] = "" |
| subType: T.Optional[int] = 0 |
| url: T.Optional[str] = "" |
| cache: T.Optional[bool] = True |
| id: T.Optional[int] = 40000 |
| c: T.Optional[int] = 2 |
| |
| path: T.Optional[str] = "" |
| file_unique: T.Optional[str] = "" |
|
|
| def __init__(self, file: T.Optional[str], **_): |
| |
| |
| |
| |
| super().__init__(file=file, **_) |
|
|
| @staticmethod |
| def fromURL(url: str, **_): |
| if url.startswith("http://") or url.startswith("https://"): |
| return Image(file=url, **_) |
| raise Exception("not a valid url") |
|
|
| @staticmethod |
| def fromFileSystem(path, **_): |
| return Image(file=f"file:///{os.path.abspath(path)}", path=path, **_) |
|
|
| @staticmethod |
| def fromBase64(base64: str, **_): |
| return Image(f"base64://{base64}", **_) |
|
|
| @staticmethod |
| def fromBytes(byte: bytes): |
| return Image.fromBase64(base64.b64encode(byte).decode()) |
|
|
| @staticmethod |
| def fromIO(IO): |
| return Image.fromBytes(IO.read()) |
|
|
|
|
| class Reply(BaseMessageComponent): |
| type: ComponentType = "Reply" |
| id: T.Union[str, int] |
| """所引用的消息 ID""" |
| chain: T.Optional[T.List["BaseMessageComponent"]] = [] |
| """引用的消息段列表""" |
| sender_id: T.Optional[int] | T.Optional[str] = 0 |
| """引用的消息发送者 ID""" |
| sender_nickname: T.Optional[str] = "" |
| """引用的消息发送者昵称""" |
| time: T.Optional[int] = 0 |
| """引用的消息发送时间""" |
| message_str: T.Optional[str] = "" |
| """解析后的纯文本消息字符串""" |
|
|
| text: T.Optional[str] = "" |
| """deprecated""" |
| qq: T.Optional[int] = 0 |
| """deprecated""" |
| seq: T.Optional[int] = 0 |
| """deprecated""" |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class RedBag(BaseMessageComponent): |
| type: ComponentType = "RedBag" |
| title: str |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Poke(BaseMessageComponent): |
| type: str = "" |
| id: T.Optional[int] = 0 |
| qq: T.Optional[int] = 0 |
|
|
| def __init__(self, type: str, **_): |
| type = f"Poke:{type}" |
| super().__init__(type=type, **_) |
|
|
|
|
| class Forward(BaseMessageComponent): |
| type: ComponentType = "Forward" |
| id: str |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Node(BaseMessageComponent): |
| """群合并转发消息""" |
|
|
| type: ComponentType = "Node" |
| id: T.Optional[int] = 0 |
| name: T.Optional[str] = "" |
| uin: T.Optional[int] = 0 |
| content: T.Optional[T.Union[str, list, dict]] = "" |
| seq: T.Optional[T.Union[str, list]] = "" |
| time: T.Optional[int] = 0 |
|
|
| def __init__(self, content: T.Union[str, list, dict, "Node", T.List["Node"]], **_): |
| if isinstance(content, list): |
| _content = None |
| if all(isinstance(item, Node) for item in content): |
| _content = [node.toDict() for node in content] |
| else: |
| _content = "" |
| for chain in content: |
| _content += chain.toString() |
| content = _content |
| elif isinstance(content, Node): |
| content = content.toDict() |
| super().__init__(content=content, **_) |
|
|
| def toString(self): |
| |
| return "" |
|
|
|
|
| class Nodes(BaseMessageComponent): |
| type: ComponentType = "Nodes" |
| nodes: T.List[Node] |
|
|
| def __init__(self, nodes: T.List[Node], **_): |
| super().__init__(nodes=nodes, **_) |
|
|
| def toDict(self): |
| return {"messages": [node.toDict() for node in self.nodes]} |
|
|
|
|
| class Xml(BaseMessageComponent): |
| type: ComponentType = "Xml" |
| data: str |
| resid: T.Optional[int] = 0 |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Json(BaseMessageComponent): |
| type: ComponentType = "Json" |
| data: T.Union[str, dict] |
| resid: T.Optional[int] = 0 |
|
|
| def __init__(self, data, **_): |
| if isinstance(data, dict): |
| data = json.dumps(data) |
| super().__init__(data=data, **_) |
|
|
|
|
| class CardImage(BaseMessageComponent): |
| type: ComponentType = "CardImage" |
| file: str |
| cache: T.Optional[bool] = True |
| minwidth: T.Optional[int] = 400 |
| minheight: T.Optional[int] = 400 |
| maxwidth: T.Optional[int] = 500 |
| maxheight: T.Optional[int] = 500 |
| source: T.Optional[str] = "" |
| icon: T.Optional[str] = "" |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
| @staticmethod |
| def fromFileSystem(path, **_): |
| return CardImage(file=f"file:///{os.path.abspath(path)}", **_) |
|
|
|
|
| class TTS(BaseMessageComponent): |
| type: ComponentType = "TTS" |
| text: str |
|
|
| def __init__(self, **_): |
| super().__init__(**_) |
|
|
|
|
| class Unknown(BaseMessageComponent): |
| type: ComponentType = "Unknown" |
| text: str |
|
|
| def toString(self): |
| return "" |
|
|
|
|
| class File(BaseMessageComponent): |
| """ |
| 目前此消息段只适配了 Napcat。 |
| """ |
|
|
| type: ComponentType = "File" |
| name: T.Optional[str] = "" |
| file: T.Optional[str] = "" |
|
|
| def __init__(self, name: str, file: str): |
| super().__init__(name=name, file=file) |
|
|
|
|
| ComponentTypes = { |
| "plain": Plain, |
| "text": Plain, |
| "face": Face, |
| "record": Record, |
| "video": Video, |
| "at": At, |
| "rps": RPS, |
| "dice": Dice, |
| "shake": Shake, |
| "anonymous": Anonymous, |
| "share": Share, |
| "contact": Contact, |
| "location": Location, |
| "music": Music, |
| "image": Image, |
| "reply": Reply, |
| "redbag": RedBag, |
| "poke": Poke, |
| "forward": Forward, |
| "node": Node, |
| "nodes": Nodes, |
| "xml": Xml, |
| "json": Json, |
| "cardimage": CardImage, |
| "tts": TTS, |
| "unknown": Unknown, |
| "file": File, |
| } |
|
|