File size: 5,513 Bytes
e031780
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""消息处理模块



负责消息的解析、文本提取和会话指纹生成

"""
import asyncio
import base64
import hashlib
import logging
import re
from typing import List, TYPE_CHECKING

import httpx

if TYPE_CHECKING:
    from main import Message

logger = logging.getLogger(__name__)


def get_conversation_key(messages: List[dict], client_identifier: str = "") -> str:
    """

    生成对话指纹(使用前3条消息+客户端标识,确保唯一性)



    策略:

    1. 使用前3条消息生成指纹(而非仅第1条)

    2. 加入客户端标识(IP或request_id)避免不同用户冲突

    3. 保持Session复用能力(同一用户的后续消息仍能找到同一Session)



    Args:

        messages: 消息列表

        client_identifier: 客户端标识(如IP地址或request_id),用于区分不同用户

    """
    if not messages:
        return f"{client_identifier}:empty" if client_identifier else "empty"

    # 提取前3条消息的关键信息(角色+内容)
    message_fingerprints = []
    for msg in messages[:3]:  # 只取前3条
        role = msg.get("role", "")
        content = msg.get("content", "")

        # 统一处理内容格式(字符串或数组)
        if isinstance(content, list):
            # 多模态消息:只提取文本部分
            text = extract_text_from_content(content)
        else:
            text = str(content)

        # 标准化:去除首尾空白,转小写
        text = text.strip().lower()

        # 组合角色和内容
        message_fingerprints.append(f"{role}:{text}")

    # 使用前3条消息+客户端标识生成指纹
    conversation_prefix = "|".join(message_fingerprints)
    if client_identifier:
        conversation_prefix = f"{client_identifier}|{conversation_prefix}"

    return hashlib.md5(conversation_prefix.encode()).hexdigest()


def extract_text_from_content(content) -> str:
    """

    从消息 content 中提取文本内容

    统一处理字符串和多模态数组格式

    """
    if isinstance(content, str):
        return content
    elif isinstance(content, list):
        # 多模态消息:只提取文本部分
        return "".join([x.get("text", "") for x in content if x.get("type") == "text"])
    else:
        return str(content)


async def parse_last_message(messages: List['Message'], http_client: httpx.AsyncClient, request_id: str = ""):
    """解析最后一条消息,分离文本和文件(支持图片、PDF、文档等,base64 和 URL)"""
    if not messages:
        return "", []

    last_msg = messages[-1]
    content = last_msg.content

    text_content = ""
    images = [] # List of {"mime": str, "data": str_base64} - 兼容变量名,实际支持所有文件
    image_urls = []  # 需要下载的 URL - 兼容变量名,实际支持所有文件

    if isinstance(content, str):
        text_content = content
    elif isinstance(content, list):
        for part in content:
            if part.get("type") == "text":
                text_content += part.get("text", "")
            elif part.get("type") == "image_url":
                url = part.get("image_url", {}).get("url", "")
                # 解析 Data URI: data:mime/type;base64,xxxxxx (支持所有 MIME 类型)
                match = re.match(r"data:([^;]+);base64,(.+)", url)
                if match:
                    images.append({"mime": match.group(1), "data": match.group(2)})
                elif url.startswith(("http://", "https://")):
                    image_urls.append(url)
                else:
                    logger.warning(f"[FILE] [req_{request_id}] 不支持的文件格式: {url[:30]}...")

    # 并行下载所有 URL 文件(支持图片、PDF、文档等)
    if image_urls:
        async def download_url(url: str):
            try:
                resp = await http_client.get(url, timeout=30, follow_redirects=True)
                resp.raise_for_status()
                content_type = resp.headers.get("content-type", "application/octet-stream").split(";")[0]
                # 移除图片类型限制,支持所有文件类型
                b64 = base64.b64encode(resp.content).decode()
                logger.info(f"[FILE] [req_{request_id}] URL文件下载成功: {url[:50]}... ({len(resp.content)} bytes, {content_type})")
                return {"mime": content_type, "data": b64}
            except Exception as e:
                logger.warning(f"[FILE] [req_{request_id}] URL文件下载失败: {url[:50]}... - {e}")
                return None

        results = await asyncio.gather(*[download_url(u) for u in image_urls])
        images.extend([r for r in results if r])

    return text_content, images


def build_full_context_text(messages: List['Message']) -> str:
    """仅拼接历史文本,图片只处理当次请求的"""
    prompt = ""
    for msg in messages:
        role = "User" if msg.role in ["user", "system"] else "Assistant"
        content_str = extract_text_from_content(msg.content)

        # 为多模态消息添加图片标记
        if isinstance(msg.content, list):
            image_count = sum(1 for part in msg.content if part.get("type") == "image_url")
            if image_count > 0:
                content_str += "[图片]" * image_count

        prompt += f"{role}: {content_str}\n\n"
    return prompt