kiroproxy / kiro_proxy /core /flow_monitor.py
KiroProxy User
chore: repo cleanup and maintenance
0edbd7b
"""Flow Monitor - LLM 流量监控
记录完整的请求/响应数据,支持查询、过滤、导出。
"""
import json
import time
import uuid
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import Optional, List, Dict, Any
from datetime import datetime, timezone
from collections import deque
from enum import Enum
class FlowState(str, Enum):
"""Flow 状态"""
PENDING = "pending" # 等待响应
STREAMING = "streaming" # 流式传输中
COMPLETED = "completed" # 完成
ERROR = "error" # 错误
@dataclass
class Message:
"""消息"""
role: str # user/assistant/system/tool
content: Any # str 或 list
name: Optional[str] = None # tool name
tool_call_id: Optional[str] = None
@dataclass
class TokenUsage:
"""Token 使用量"""
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_write_tokens: int = 0
@property
def total_tokens(self) -> int:
return self.input_tokens + self.output_tokens
@dataclass
class FlowRequest:
"""请求数据"""
method: str
path: str
headers: Dict[str, str]
body: Dict[str, Any]
# 解析后的字段
model: str = ""
messages: List[Message] = field(default_factory=list)
system: str = ""
tools: List[Dict] = field(default_factory=list)
stream: bool = False
max_tokens: int = 0
temperature: float = 1.0
@dataclass
class FlowResponse:
"""响应数据"""
status_code: int
headers: Dict[str, str] = field(default_factory=dict)
body: Any = None
# 解析后的字段
content: str = ""
tool_calls: List[Dict] = field(default_factory=list)
stop_reason: str = ""
usage: TokenUsage = field(default_factory=TokenUsage)
# 流式响应
chunks: List[str] = field(default_factory=list)
chunk_count: int = 0
@dataclass
class FlowError:
"""错误信息"""
type: str # rate_limit_error, api_error, etc.
message: str
status_code: int = 0
raw: str = ""
@dataclass
class FlowTiming:
"""时间信息"""
created_at: float = 0
first_byte_at: Optional[float] = None
completed_at: Optional[float] = None
@property
def ttfb_ms(self) -> Optional[float]:
"""Time to first byte"""
if self.first_byte_at and self.created_at:
return (self.first_byte_at - self.created_at) * 1000
return None
@property
def duration_ms(self) -> Optional[float]:
"""Total duration"""
if self.completed_at and self.created_at:
return (self.completed_at - self.created_at) * 1000
return None
@dataclass
class LLMFlow:
"""完整的 LLM 请求流"""
id: str
state: FlowState
# 路由信息
protocol: str # anthropic, openai, gemini
account_id: Optional[str] = None
account_name: Optional[str] = None
# 请求/响应
request: Optional[FlowRequest] = None
response: Optional[FlowResponse] = None
error: Optional[FlowError] = None
# 时间
timing: FlowTiming = field(default_factory=FlowTiming)
# 元数据
tags: List[str] = field(default_factory=list)
notes: str = ""
bookmarked: bool = False
# 重试信息
retry_count: int = 0
parent_flow_id: Optional[str] = None
def to_dict(self) -> dict:
"""转换为字典"""
d = {
"id": self.id,
"state": self.state.value,
"protocol": self.protocol,
"account_id": self.account_id,
"account_name": self.account_name,
"timing": {
"created_at": self.timing.created_at,
"first_byte_at": self.timing.first_byte_at,
"completed_at": self.timing.completed_at,
"ttfb_ms": self.timing.ttfb_ms,
"duration_ms": self.timing.duration_ms,
},
"tags": self.tags,
"notes": self.notes,
"bookmarked": self.bookmarked,
"retry_count": self.retry_count,
}
if self.request:
d["request"] = {
"method": self.request.method,
"path": self.request.path,
"model": self.request.model,
"stream": self.request.stream,
"message_count": len(self.request.messages),
"has_tools": bool(self.request.tools),
"has_system": bool(self.request.system),
}
if self.response:
d["response"] = {
"status_code": self.response.status_code,
"content_length": len(self.response.content),
"has_tool_calls": bool(self.response.tool_calls),
"stop_reason": self.response.stop_reason,
"chunk_count": self.response.chunk_count,
"usage": asdict(self.response.usage),
}
if self.error:
d["error"] = asdict(self.error)
return d
def to_full_dict(self) -> dict:
"""转换为完整字典(包含请求/响应体)"""
d = self.to_dict()
if self.request:
d["request"]["headers"] = self.request.headers
d["request"]["body"] = self.request.body
d["request"]["messages"] = [asdict(m) if hasattr(m, '__dataclass_fields__') else m for m in self.request.messages]
d["request"]["system"] = self.request.system
d["request"]["tools"] = self.request.tools
if self.response:
d["response"]["headers"] = self.response.headers
d["response"]["body"] = self.response.body
d["response"]["content"] = self.response.content
d["response"]["tool_calls"] = self.response.tool_calls
d["response"]["chunks"] = self.response.chunks[-10:] # 只保留最后10个chunk
return d
class FlowStore:
"""Flow 存储"""
def __init__(self, max_flows: int = 500, persist_dir: Optional[Path] = None):
self.flows: deque[LLMFlow] = deque(maxlen=max_flows)
self.flow_map: Dict[str, LLMFlow] = {}
self.persist_dir = persist_dir
self.max_flows = max_flows
# 统计
self.total_flows = 0
self.total_tokens_in = 0
self.total_tokens_out = 0
def add(self, flow: LLMFlow):
"""添加 Flow"""
# 如果队列满了,移除最旧的
if len(self.flows) >= self.max_flows:
old = self.flows[0]
if old.id in self.flow_map:
del self.flow_map[old.id]
self.flows.append(flow)
self.flow_map[flow.id] = flow
self.total_flows += 1
def get(self, flow_id: str) -> Optional[LLMFlow]:
"""获取 Flow"""
return self.flow_map.get(flow_id)
def update(self, flow_id: str, **kwargs):
"""更新 Flow"""
flow = self.flow_map.get(flow_id)
if flow:
for k, v in kwargs.items():
if hasattr(flow, k):
setattr(flow, k, v)
def query(
self,
protocol: Optional[str] = None,
model: Optional[str] = None,
account_id: Optional[str] = None,
state: Optional[FlowState] = None,
has_error: Optional[bool] = None,
bookmarked: Optional[bool] = None,
min_duration_ms: Optional[float] = None,
max_duration_ms: Optional[float] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
search: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> List[LLMFlow]:
"""查询 Flows"""
results = []
for flow in reversed(self.flows):
# 过滤条件
if protocol and flow.protocol != protocol:
continue
if model and flow.request and flow.request.model != model:
continue
if account_id and flow.account_id != account_id:
continue
if state and flow.state != state:
continue
if has_error is not None:
if has_error and not flow.error:
continue
if not has_error and flow.error:
continue
if bookmarked is not None and flow.bookmarked != bookmarked:
continue
if min_duration_ms and flow.timing.duration_ms and flow.timing.duration_ms < min_duration_ms:
continue
if max_duration_ms and flow.timing.duration_ms and flow.timing.duration_ms > max_duration_ms:
continue
if start_time and flow.timing.created_at < start_time:
continue
if end_time and flow.timing.created_at > end_time:
continue
if search:
# 简单搜索:在内容中查找
found = False
if flow.request and search.lower() in json.dumps(flow.request.body).lower():
found = True
if flow.response and search.lower() in flow.response.content.lower():
found = True
if not found:
continue
results.append(flow)
return results[offset:offset + limit]
def get_stats(self) -> dict:
"""获取统计信息"""
completed = [f for f in self.flows if f.state == FlowState.COMPLETED]
errors = [f for f in self.flows if f.state == FlowState.ERROR]
# 按模型统计
model_stats = {}
for f in self.flows:
if f.request:
model = f.request.model or "unknown"
if model not in model_stats:
model_stats[model] = {"count": 0, "errors": 0, "tokens_in": 0, "tokens_out": 0}
model_stats[model]["count"] += 1
if f.error:
model_stats[model]["errors"] += 1
if f.response and f.response.usage:
model_stats[model]["tokens_in"] += f.response.usage.input_tokens
model_stats[model]["tokens_out"] += f.response.usage.output_tokens
# 计算平均延迟
durations = [f.timing.duration_ms for f in completed if f.timing.duration_ms]
avg_duration = sum(durations) / len(durations) if durations else 0
return {
"total_flows": self.total_flows,
"active_flows": len(self.flows),
"completed": len(completed),
"errors": len(errors),
"error_rate": f"{len(errors) / max(1, len(self.flows)) * 100:.1f}%",
"avg_duration_ms": round(avg_duration, 2),
"total_tokens_in": self.total_tokens_in,
"total_tokens_out": self.total_tokens_out,
"by_model": model_stats,
}
def export_jsonl(self, flows: List[LLMFlow]) -> str:
"""导出为 JSONL 格式"""
lines = []
for f in flows:
lines.append(json.dumps(f.to_full_dict(), ensure_ascii=False))
return "\n".join(lines)
def export_markdown(self, flow: LLMFlow) -> str:
"""导出单个 Flow 为 Markdown"""
lines = [
f"# Flow {flow.id}",
"",
f"- **Protocol**: {flow.protocol}",
f"- **State**: {flow.state.value}",
f"- **Account**: {flow.account_name or flow.account_id or 'N/A'}",
f"- **Created**: {datetime.fromtimestamp(flow.timing.created_at).isoformat()}",
]
if flow.timing.duration_ms:
lines.append(f"- **Duration**: {flow.timing.duration_ms:.0f}ms")
if flow.request:
lines.extend([
"",
"## Request",
"",
f"- **Model**: {flow.request.model}",
f"- **Stream**: {flow.request.stream}",
f"- **Messages**: {len(flow.request.messages)}",
])
if flow.request.system:
lines.extend(["", "### System", "", f"```\n{flow.request.system}\n```"])
lines.extend(["", "### Messages", ""])
for msg in flow.request.messages:
content = msg.content if isinstance(msg.content, str) else json.dumps(msg.content, ensure_ascii=False)
lines.append(f"**{msg.role}**: {content[:500]}{'...' if len(content) > 500 else ''}")
lines.append("")
if flow.response:
lines.extend([
"## Response",
"",
f"- **Status**: {flow.response.status_code}",
f"- **Stop Reason**: {flow.response.stop_reason}",
])
if flow.response.usage:
lines.append(f"- **Tokens**: {flow.response.usage.input_tokens} in / {flow.response.usage.output_tokens} out")
if flow.response.content:
lines.extend(["", "### Content", "", f"```\n{flow.response.content[:2000]}\n```"])
if flow.error:
lines.extend([
"",
"## Error",
"",
f"- **Type**: {flow.error.type}",
f"- **Message**: {flow.error.message}",
])
return "\n".join(lines)
class FlowMonitor:
"""Flow 监控器"""
def __init__(self, max_flows: int = 500):
self.store = FlowStore(max_flows=max_flows)
def create_flow(
self,
protocol: str,
method: str,
path: str,
headers: Dict[str, str],
body: Dict[str, Any],
account_id: Optional[str] = None,
account_name: Optional[str] = None,
) -> str:
"""创建新的 Flow"""
flow_id = uuid.uuid4().hex[:12]
# 解析请求
request = FlowRequest(
method=method,
path=path,
headers={k: v for k, v in headers.items() if k.lower() not in ["authorization"]},
body=body,
model=body.get("model", ""),
stream=body.get("stream", False),
system=body.get("system", ""),
tools=body.get("tools", []),
max_tokens=body.get("max_tokens", 0),
temperature=body.get("temperature", 1.0),
)
# 解析消息
messages = body.get("messages", [])
for msg in messages:
request.messages.append(Message(
role=msg.get("role", "user"),
content=msg.get("content", ""),
name=msg.get("name"),
tool_call_id=msg.get("tool_call_id"),
))
flow = LLMFlow(
id=flow_id,
state=FlowState.PENDING,
protocol=protocol,
account_id=account_id,
account_name=account_name,
request=request,
timing=FlowTiming(created_at=time.time()),
)
self.store.add(flow)
return flow_id
def start_streaming(self, flow_id: str):
"""标记开始流式传输"""
flow = self.store.get(flow_id)
if flow:
flow.state = FlowState.STREAMING
flow.timing.first_byte_at = time.time()
if not flow.response:
flow.response = FlowResponse(status_code=200)
def add_chunk(self, flow_id: str, chunk: str):
"""添加流式响应块"""
flow = self.store.get(flow_id)
if flow and flow.response:
flow.response.chunks.append(chunk)
flow.response.chunk_count += 1
flow.response.content += chunk
def complete_flow(
self,
flow_id: str,
status_code: int,
content: str = "",
tool_calls: List[Dict] = None,
stop_reason: str = "",
usage: Optional[TokenUsage] = None,
headers: Dict[str, str] = None,
):
"""完成 Flow"""
flow = self.store.get(flow_id)
if not flow:
return
flow.state = FlowState.COMPLETED
flow.timing.completed_at = time.time()
if not flow.response:
flow.response = FlowResponse(status_code=status_code)
flow.response.status_code = status_code
flow.response.content = content or flow.response.content
flow.response.tool_calls = tool_calls or []
flow.response.stop_reason = stop_reason
flow.response.headers = headers or {}
if usage:
flow.response.usage = usage
self.store.total_tokens_in += usage.input_tokens
self.store.total_tokens_out += usage.output_tokens
def fail_flow(self, flow_id: str, error_type: str, message: str, status_code: int = 0, raw: str = ""):
"""标记 Flow 失败"""
flow = self.store.get(flow_id)
if not flow:
return
flow.state = FlowState.ERROR
flow.timing.completed_at = time.time()
flow.error = FlowError(
type=error_type,
message=message,
status_code=status_code,
raw=raw[:1000], # 限制长度
)
def bookmark_flow(self, flow_id: str, bookmarked: bool = True):
"""书签 Flow"""
flow = self.store.get(flow_id)
if flow:
flow.bookmarked = bookmarked
def add_note(self, flow_id: str, note: str):
"""添加备注"""
flow = self.store.get(flow_id)
if flow:
flow.notes = note
def add_tag(self, flow_id: str, tag: str):
"""添加标签"""
flow = self.store.get(flow_id)
if flow and tag not in flow.tags:
flow.tags.append(tag)
def get_flow(self, flow_id: str) -> Optional[LLMFlow]:
"""获取 Flow"""
return self.store.get(flow_id)
def query(self, **kwargs) -> List[LLMFlow]:
"""查询 Flows"""
return self.store.query(**kwargs)
def get_stats(self) -> dict:
"""获取统计"""
return self.store.get_stats()
def export(self, flow_ids: List[str] = None, format: str = "jsonl") -> str:
"""导出 Flows"""
if flow_ids:
flows = [self.store.get(fid) for fid in flow_ids if self.store.get(fid)]
else:
flows = list(self.store.flows)
if format == "jsonl":
return self.store.export_jsonl(flows)
elif format == "markdown" and len(flows) == 1:
return self.store.export_markdown(flows[0])
else:
return json.dumps([f.to_dict() for f in flows], ensure_ascii=False, indent=2)
# 全局实例
flow_monitor = FlowMonitor(max_flows=500)