File size: 7,407 Bytes
ac029f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# --- browser_utils/script_manager.py ---
# 油猴脚本管理模块 - 动态挂载和注入脚本功能

import os
import json
import logging
from typing import Dict, List, Optional, Any
from playwright.async_api import Page as AsyncPage

logger = logging.getLogger("AIStudioProxyServer")

class ScriptManager:
    """油猴脚本管理器 - 负责动态加载和注入脚本"""
    
    def __init__(self, script_dir: str = "browser_utils"):
        self.script_dir = script_dir
        self.loaded_scripts: Dict[str, str] = {}
        self.model_configs: Dict[str, List[Dict[str, Any]]] = {}
        
    def load_script(self, script_name: str) -> Optional[str]:
        """加载指定的JavaScript脚本文件"""
        script_path = os.path.join(self.script_dir, script_name)
        
        if not os.path.exists(script_path):
            logger.error(f"脚本文件不存在: {script_path}")
            return None
            
        try:
            with open(script_path, 'r', encoding='utf-8') as f:
                script_content = f.read()
                self.loaded_scripts[script_name] = script_content
                logger.info(f"成功加载脚本: {script_name}")
                return script_content
        except Exception as e:
            logger.error(f"加载脚本失败 {script_name}: {e}")
            return None
    
    def load_model_config(self, config_path: str) -> Optional[List[Dict[str, Any]]]:
        """加载模型配置文件"""
        if not os.path.exists(config_path):
            logger.warning(f"模型配置文件不存在: {config_path}")
            return None
            
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                config_data = json.load(f)
                models = config_data.get('models', [])
                self.model_configs[config_path] = models
                logger.info(f"成功加载模型配置: {len(models)} 个模型")
                return models
        except Exception as e:
            logger.error(f"加载模型配置失败 {config_path}: {e}")
            return None
    
    def generate_dynamic_script(self, base_script: str, models: List[Dict[str, Any]], 

                              script_version: str = "dynamic") -> str:
        """基于模型配置动态生成脚本内容"""
        try:
            # 构建模型列表的JavaScript代码
            models_js = "const MODELS_TO_INJECT = [\n"
            for model in models:
                name = model.get('name', '')
                display_name = model.get('displayName', model.get('display_name', ''))
                description = model.get('description', f'Model injected by script {script_version}')
                
                # 如果displayName中没有包含版本信息,添加版本信息
                if f"(Script {script_version})" not in display_name:
                    display_name = f"{display_name} (Script {script_version})"
                
                models_js += f"""       {{

          name: '{name}',

          displayName: `{display_name}`,

          description: `{description}`

       }},\n"""
            
            models_js += "    ];"
            
            # 替换脚本中的模型定义部分
            # 查找模型定义的开始和结束标记
            start_marker = "const MODELS_TO_INJECT = ["
            end_marker = "];"
            
            start_idx = base_script.find(start_marker)
            if start_idx == -1:
                logger.error("未找到模型定义开始标记")
                return base_script
                
            # 找到对应的结束标记
            bracket_count = 0
            end_idx = start_idx + len(start_marker)
            found_end = False
            
            for i in range(end_idx, len(base_script)):
                if base_script[i] == '[':
                    bracket_count += 1
                elif base_script[i] == ']':
                    if bracket_count == 0:
                        end_idx = i + 1
                        found_end = True
                        break
                    bracket_count -= 1
            
            if not found_end:
                logger.error("未找到模型定义结束标记")
                return base_script
            
            # 替换模型定义部分
            new_script = (base_script[:start_idx] + 
                         models_js + 
                         base_script[end_idx:])
            
            # 更新版本号
            new_script = new_script.replace(
                f'const SCRIPT_VERSION = "v1.6";',
                f'const SCRIPT_VERSION = "{script_version}";'
            )
            
            logger.info(f"成功生成动态脚本,包含 {len(models)} 个模型")
            return new_script
            
        except Exception as e:
            logger.error(f"生成动态脚本失败: {e}")
            return base_script
    
    async def inject_script_to_page(self, page: AsyncPage, script_content: str, 

                                  script_name: str = "injected_script") -> bool:
        """将脚本注入到页面中"""
        try:
            # 移除UserScript头部信息,因为我们是直接注入而不是通过油猴
            cleaned_script = self._clean_userscript_headers(script_content)
            
            # 注入脚本
            await page.add_init_script(cleaned_script)
            logger.info(f"成功注入脚本到页面: {script_name}")
            return True
            
        except Exception as e:
            logger.error(f"注入脚本到页面失败 {script_name}: {e}")
            return False
    
    def _clean_userscript_headers(self, script_content: str) -> str:
        """清理UserScript头部信息"""
        lines = script_content.split('\n')
        cleaned_lines = []
        in_userscript_block = False
        
        for line in lines:
            if line.strip().startswith('// ==UserScript=='):
                in_userscript_block = True
                continue
            elif line.strip().startswith('// ==/UserScript=='):
                in_userscript_block = False
                continue
            elif in_userscript_block:
                continue
            else:
                cleaned_lines.append(line)
        
        return '\n'.join(cleaned_lines)
    
    async def setup_model_injection(self, page: AsyncPage,

                                  script_name: str = "more_modles.js") -> bool:
        """设置模型注入 - 直接注入油猴脚本"""

        # 检查脚本文件是否存在
        script_path = os.path.join(self.script_dir, script_name)
        if not os.path.exists(script_path):
            # 脚本文件不存在,静默跳过注入
            return False

        logger.info("开始设置模型注入...")

        # 加载油猴脚本
        script_content = self.load_script(script_name)
        if not script_content:
            return False

        # 直接注入原始脚本(不修改内容)
        return await self.inject_script_to_page(page, script_content, script_name)


# 全局脚本管理器实例
script_manager = ScriptManager()