|
|
"""Flow Monitor - LLM 流量监控 |
|
|
|
|
|
记录完整的请求/响应数据,支持查询、过滤、导出。 |
|
|
""" |
|
|
import json |
|
|
import time |
|
|
import uuid |
|
|
from pathlib import Path |
|
|
from dataclasses import dataclass, field, asdict |
|
|
from typing import Optional, List, Dict, Any |
|
|
from datetime import datetime, timezone |
|
|
from collections import deque |
|
|
from enum import Enum |
|
|
|
|
|
|
|
|
class FlowState(str, Enum): |
|
|
"""Flow 状态""" |
|
|
PENDING = "pending" |
|
|
STREAMING = "streaming" |
|
|
COMPLETED = "completed" |
|
|
ERROR = "error" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Message: |
|
|
"""消息""" |
|
|
role: str |
|
|
content: Any |
|
|
name: Optional[str] = None |
|
|
tool_call_id: Optional[str] = None |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class TokenUsage: |
|
|
"""Token 使用量""" |
|
|
input_tokens: int = 0 |
|
|
output_tokens: int = 0 |
|
|
cache_read_tokens: int = 0 |
|
|
cache_write_tokens: int = 0 |
|
|
|
|
|
@property |
|
|
def total_tokens(self) -> int: |
|
|
return self.input_tokens + self.output_tokens |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class FlowRequest: |
|
|
"""请求数据""" |
|
|
method: str |
|
|
path: str |
|
|
headers: Dict[str, str] |
|
|
body: Dict[str, Any] |
|
|
|
|
|
|
|
|
model: str = "" |
|
|
messages: List[Message] = field(default_factory=list) |
|
|
system: str = "" |
|
|
tools: List[Dict] = field(default_factory=list) |
|
|
stream: bool = False |
|
|
max_tokens: int = 0 |
|
|
temperature: float = 1.0 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class FlowResponse: |
|
|
"""响应数据""" |
|
|
status_code: int |
|
|
headers: Dict[str, str] = field(default_factory=dict) |
|
|
body: Any = None |
|
|
|
|
|
|
|
|
content: str = "" |
|
|
tool_calls: List[Dict] = field(default_factory=list) |
|
|
stop_reason: str = "" |
|
|
usage: TokenUsage = field(default_factory=TokenUsage) |
|
|
|
|
|
|
|
|
chunks: List[str] = field(default_factory=list) |
|
|
chunk_count: int = 0 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class FlowError: |
|
|
"""错误信息""" |
|
|
type: str |
|
|
message: str |
|
|
status_code: int = 0 |
|
|
raw: str = "" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class FlowTiming: |
|
|
"""时间信息""" |
|
|
created_at: float = 0 |
|
|
first_byte_at: Optional[float] = None |
|
|
completed_at: Optional[float] = None |
|
|
|
|
|
@property |
|
|
def ttfb_ms(self) -> Optional[float]: |
|
|
"""Time to first byte""" |
|
|
if self.first_byte_at and self.created_at: |
|
|
return (self.first_byte_at - self.created_at) * 1000 |
|
|
return None |
|
|
|
|
|
@property |
|
|
def duration_ms(self) -> Optional[float]: |
|
|
"""Total duration""" |
|
|
if self.completed_at and self.created_at: |
|
|
return (self.completed_at - self.created_at) * 1000 |
|
|
return None |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class LLMFlow: |
|
|
"""完整的 LLM 请求流""" |
|
|
id: str |
|
|
state: FlowState |
|
|
|
|
|
|
|
|
protocol: str |
|
|
account_id: Optional[str] = None |
|
|
account_name: Optional[str] = None |
|
|
|
|
|
|
|
|
request: Optional[FlowRequest] = None |
|
|
response: Optional[FlowResponse] = None |
|
|
error: Optional[FlowError] = None |
|
|
|
|
|
|
|
|
timing: FlowTiming = field(default_factory=FlowTiming) |
|
|
|
|
|
|
|
|
tags: List[str] = field(default_factory=list) |
|
|
notes: str = "" |
|
|
bookmarked: bool = False |
|
|
|
|
|
|
|
|
retry_count: int = 0 |
|
|
parent_flow_id: Optional[str] = None |
|
|
|
|
|
def to_dict(self) -> dict: |
|
|
"""转换为字典""" |
|
|
d = { |
|
|
"id": self.id, |
|
|
"state": self.state.value, |
|
|
"protocol": self.protocol, |
|
|
"account_id": self.account_id, |
|
|
"account_name": self.account_name, |
|
|
"timing": { |
|
|
"created_at": self.timing.created_at, |
|
|
"first_byte_at": self.timing.first_byte_at, |
|
|
"completed_at": self.timing.completed_at, |
|
|
"ttfb_ms": self.timing.ttfb_ms, |
|
|
"duration_ms": self.timing.duration_ms, |
|
|
}, |
|
|
"tags": self.tags, |
|
|
"notes": self.notes, |
|
|
"bookmarked": self.bookmarked, |
|
|
"retry_count": self.retry_count, |
|
|
} |
|
|
|
|
|
if self.request: |
|
|
d["request"] = { |
|
|
"method": self.request.method, |
|
|
"path": self.request.path, |
|
|
"model": self.request.model, |
|
|
"stream": self.request.stream, |
|
|
"message_count": len(self.request.messages), |
|
|
"has_tools": bool(self.request.tools), |
|
|
"has_system": bool(self.request.system), |
|
|
} |
|
|
|
|
|
if self.response: |
|
|
d["response"] = { |
|
|
"status_code": self.response.status_code, |
|
|
"content_length": len(self.response.content), |
|
|
"has_tool_calls": bool(self.response.tool_calls), |
|
|
"stop_reason": self.response.stop_reason, |
|
|
"chunk_count": self.response.chunk_count, |
|
|
"usage": asdict(self.response.usage), |
|
|
} |
|
|
|
|
|
if self.error: |
|
|
d["error"] = asdict(self.error) |
|
|
|
|
|
return d |
|
|
|
|
|
def to_full_dict(self) -> dict: |
|
|
"""转换为完整字典(包含请求/响应体)""" |
|
|
d = self.to_dict() |
|
|
|
|
|
if self.request: |
|
|
d["request"]["headers"] = self.request.headers |
|
|
d["request"]["body"] = self.request.body |
|
|
d["request"]["messages"] = [asdict(m) if hasattr(m, '__dataclass_fields__') else m for m in self.request.messages] |
|
|
d["request"]["system"] = self.request.system |
|
|
d["request"]["tools"] = self.request.tools |
|
|
|
|
|
if self.response: |
|
|
d["response"]["headers"] = self.response.headers |
|
|
d["response"]["body"] = self.response.body |
|
|
d["response"]["content"] = self.response.content |
|
|
d["response"]["tool_calls"] = self.response.tool_calls |
|
|
d["response"]["chunks"] = self.response.chunks[-10:] |
|
|
|
|
|
return d |
|
|
|
|
|
|
|
|
class FlowStore: |
|
|
"""Flow 存储""" |
|
|
|
|
|
def __init__(self, max_flows: int = 500, persist_dir: Optional[Path] = None): |
|
|
self.flows: deque[LLMFlow] = deque(maxlen=max_flows) |
|
|
self.flow_map: Dict[str, LLMFlow] = {} |
|
|
self.persist_dir = persist_dir |
|
|
self.max_flows = max_flows |
|
|
|
|
|
|
|
|
self.total_flows = 0 |
|
|
self.total_tokens_in = 0 |
|
|
self.total_tokens_out = 0 |
|
|
|
|
|
def add(self, flow: LLMFlow): |
|
|
"""添加 Flow""" |
|
|
|
|
|
if len(self.flows) >= self.max_flows: |
|
|
old = self.flows[0] |
|
|
if old.id in self.flow_map: |
|
|
del self.flow_map[old.id] |
|
|
|
|
|
self.flows.append(flow) |
|
|
self.flow_map[flow.id] = flow |
|
|
self.total_flows += 1 |
|
|
|
|
|
def get(self, flow_id: str) -> Optional[LLMFlow]: |
|
|
"""获取 Flow""" |
|
|
return self.flow_map.get(flow_id) |
|
|
|
|
|
def update(self, flow_id: str, **kwargs): |
|
|
"""更新 Flow""" |
|
|
flow = self.flow_map.get(flow_id) |
|
|
if flow: |
|
|
for k, v in kwargs.items(): |
|
|
if hasattr(flow, k): |
|
|
setattr(flow, k, v) |
|
|
|
|
|
def query( |
|
|
self, |
|
|
protocol: Optional[str] = None, |
|
|
model: Optional[str] = None, |
|
|
account_id: Optional[str] = None, |
|
|
state: Optional[FlowState] = None, |
|
|
has_error: Optional[bool] = None, |
|
|
bookmarked: Optional[bool] = None, |
|
|
min_duration_ms: Optional[float] = None, |
|
|
max_duration_ms: Optional[float] = None, |
|
|
start_time: Optional[float] = None, |
|
|
end_time: Optional[float] = None, |
|
|
search: Optional[str] = None, |
|
|
limit: int = 100, |
|
|
offset: int = 0, |
|
|
) -> List[LLMFlow]: |
|
|
"""查询 Flows""" |
|
|
results = [] |
|
|
|
|
|
for flow in reversed(self.flows): |
|
|
|
|
|
if protocol and flow.protocol != protocol: |
|
|
continue |
|
|
if model and flow.request and flow.request.model != model: |
|
|
continue |
|
|
if account_id and flow.account_id != account_id: |
|
|
continue |
|
|
if state and flow.state != state: |
|
|
continue |
|
|
if has_error is not None: |
|
|
if has_error and not flow.error: |
|
|
continue |
|
|
if not has_error and flow.error: |
|
|
continue |
|
|
if bookmarked is not None and flow.bookmarked != bookmarked: |
|
|
continue |
|
|
if min_duration_ms and flow.timing.duration_ms and flow.timing.duration_ms < min_duration_ms: |
|
|
continue |
|
|
if max_duration_ms and flow.timing.duration_ms and flow.timing.duration_ms > max_duration_ms: |
|
|
continue |
|
|
if start_time and flow.timing.created_at < start_time: |
|
|
continue |
|
|
if end_time and flow.timing.created_at > end_time: |
|
|
continue |
|
|
if search: |
|
|
|
|
|
found = False |
|
|
if flow.request and search.lower() in json.dumps(flow.request.body).lower(): |
|
|
found = True |
|
|
if flow.response and search.lower() in flow.response.content.lower(): |
|
|
found = True |
|
|
if not found: |
|
|
continue |
|
|
|
|
|
results.append(flow) |
|
|
|
|
|
return results[offset:offset + limit] |
|
|
|
|
|
def get_stats(self) -> dict: |
|
|
"""获取统计信息""" |
|
|
completed = [f for f in self.flows if f.state == FlowState.COMPLETED] |
|
|
errors = [f for f in self.flows if f.state == FlowState.ERROR] |
|
|
|
|
|
|
|
|
model_stats = {} |
|
|
for f in self.flows: |
|
|
if f.request: |
|
|
model = f.request.model or "unknown" |
|
|
if model not in model_stats: |
|
|
model_stats[model] = {"count": 0, "errors": 0, "tokens_in": 0, "tokens_out": 0} |
|
|
model_stats[model]["count"] += 1 |
|
|
if f.error: |
|
|
model_stats[model]["errors"] += 1 |
|
|
if f.response and f.response.usage: |
|
|
model_stats[model]["tokens_in"] += f.response.usage.input_tokens |
|
|
model_stats[model]["tokens_out"] += f.response.usage.output_tokens |
|
|
|
|
|
|
|
|
durations = [f.timing.duration_ms for f in completed if f.timing.duration_ms] |
|
|
avg_duration = sum(durations) / len(durations) if durations else 0 |
|
|
|
|
|
return { |
|
|
"total_flows": self.total_flows, |
|
|
"active_flows": len(self.flows), |
|
|
"completed": len(completed), |
|
|
"errors": len(errors), |
|
|
"error_rate": f"{len(errors) / max(1, len(self.flows)) * 100:.1f}%", |
|
|
"avg_duration_ms": round(avg_duration, 2), |
|
|
"total_tokens_in": self.total_tokens_in, |
|
|
"total_tokens_out": self.total_tokens_out, |
|
|
"by_model": model_stats, |
|
|
} |
|
|
|
|
|
def export_jsonl(self, flows: List[LLMFlow]) -> str: |
|
|
"""导出为 JSONL 格式""" |
|
|
lines = [] |
|
|
for f in flows: |
|
|
lines.append(json.dumps(f.to_full_dict(), ensure_ascii=False)) |
|
|
return "\n".join(lines) |
|
|
|
|
|
def export_markdown(self, flow: LLMFlow) -> str: |
|
|
"""导出单个 Flow 为 Markdown""" |
|
|
lines = [ |
|
|
f"# Flow {flow.id}", |
|
|
"", |
|
|
f"- **Protocol**: {flow.protocol}", |
|
|
f"- **State**: {flow.state.value}", |
|
|
f"- **Account**: {flow.account_name or flow.account_id or 'N/A'}", |
|
|
f"- **Created**: {datetime.fromtimestamp(flow.timing.created_at).isoformat()}", |
|
|
] |
|
|
|
|
|
if flow.timing.duration_ms: |
|
|
lines.append(f"- **Duration**: {flow.timing.duration_ms:.0f}ms") |
|
|
|
|
|
if flow.request: |
|
|
lines.extend([ |
|
|
"", |
|
|
"## Request", |
|
|
"", |
|
|
f"- **Model**: {flow.request.model}", |
|
|
f"- **Stream**: {flow.request.stream}", |
|
|
f"- **Messages**: {len(flow.request.messages)}", |
|
|
]) |
|
|
|
|
|
if flow.request.system: |
|
|
lines.extend(["", "### System", "", f"```\n{flow.request.system}\n```"]) |
|
|
|
|
|
lines.extend(["", "### Messages", ""]) |
|
|
for msg in flow.request.messages: |
|
|
content = msg.content if isinstance(msg.content, str) else json.dumps(msg.content, ensure_ascii=False) |
|
|
lines.append(f"**{msg.role}**: {content[:500]}{'...' if len(content) > 500 else ''}") |
|
|
lines.append("") |
|
|
|
|
|
if flow.response: |
|
|
lines.extend([ |
|
|
"## Response", |
|
|
"", |
|
|
f"- **Status**: {flow.response.status_code}", |
|
|
f"- **Stop Reason**: {flow.response.stop_reason}", |
|
|
]) |
|
|
|
|
|
if flow.response.usage: |
|
|
lines.append(f"- **Tokens**: {flow.response.usage.input_tokens} in / {flow.response.usage.output_tokens} out") |
|
|
|
|
|
if flow.response.content: |
|
|
lines.extend(["", "### Content", "", f"```\n{flow.response.content[:2000]}\n```"]) |
|
|
|
|
|
if flow.error: |
|
|
lines.extend([ |
|
|
"", |
|
|
"## Error", |
|
|
"", |
|
|
f"- **Type**: {flow.error.type}", |
|
|
f"- **Message**: {flow.error.message}", |
|
|
]) |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
class FlowMonitor: |
|
|
"""Flow 监控器""" |
|
|
|
|
|
def __init__(self, max_flows: int = 500): |
|
|
self.store = FlowStore(max_flows=max_flows) |
|
|
|
|
|
def create_flow( |
|
|
self, |
|
|
protocol: str, |
|
|
method: str, |
|
|
path: str, |
|
|
headers: Dict[str, str], |
|
|
body: Dict[str, Any], |
|
|
account_id: Optional[str] = None, |
|
|
account_name: Optional[str] = None, |
|
|
) -> str: |
|
|
"""创建新的 Flow""" |
|
|
flow_id = uuid.uuid4().hex[:12] |
|
|
|
|
|
|
|
|
request = FlowRequest( |
|
|
method=method, |
|
|
path=path, |
|
|
headers={k: v for k, v in headers.items() if k.lower() not in ["authorization"]}, |
|
|
body=body, |
|
|
model=body.get("model", ""), |
|
|
stream=body.get("stream", False), |
|
|
system=body.get("system", ""), |
|
|
tools=body.get("tools", []), |
|
|
max_tokens=body.get("max_tokens", 0), |
|
|
temperature=body.get("temperature", 1.0), |
|
|
) |
|
|
|
|
|
|
|
|
messages = body.get("messages", []) |
|
|
for msg in messages: |
|
|
request.messages.append(Message( |
|
|
role=msg.get("role", "user"), |
|
|
content=msg.get("content", ""), |
|
|
name=msg.get("name"), |
|
|
tool_call_id=msg.get("tool_call_id"), |
|
|
)) |
|
|
|
|
|
flow = LLMFlow( |
|
|
id=flow_id, |
|
|
state=FlowState.PENDING, |
|
|
protocol=protocol, |
|
|
account_id=account_id, |
|
|
account_name=account_name, |
|
|
request=request, |
|
|
timing=FlowTiming(created_at=time.time()), |
|
|
) |
|
|
|
|
|
self.store.add(flow) |
|
|
return flow_id |
|
|
|
|
|
def start_streaming(self, flow_id: str): |
|
|
"""标记开始流式传输""" |
|
|
flow = self.store.get(flow_id) |
|
|
if flow: |
|
|
flow.state = FlowState.STREAMING |
|
|
flow.timing.first_byte_at = time.time() |
|
|
if not flow.response: |
|
|
flow.response = FlowResponse(status_code=200) |
|
|
|
|
|
def add_chunk(self, flow_id: str, chunk: str): |
|
|
"""添加流式响应块""" |
|
|
flow = self.store.get(flow_id) |
|
|
if flow and flow.response: |
|
|
flow.response.chunks.append(chunk) |
|
|
flow.response.chunk_count += 1 |
|
|
flow.response.content += chunk |
|
|
|
|
|
def complete_flow( |
|
|
self, |
|
|
flow_id: str, |
|
|
status_code: int, |
|
|
content: str = "", |
|
|
tool_calls: List[Dict] = None, |
|
|
stop_reason: str = "", |
|
|
usage: Optional[TokenUsage] = None, |
|
|
headers: Dict[str, str] = None, |
|
|
): |
|
|
"""完成 Flow""" |
|
|
flow = self.store.get(flow_id) |
|
|
if not flow: |
|
|
return |
|
|
|
|
|
flow.state = FlowState.COMPLETED |
|
|
flow.timing.completed_at = time.time() |
|
|
|
|
|
if not flow.response: |
|
|
flow.response = FlowResponse(status_code=status_code) |
|
|
|
|
|
flow.response.status_code = status_code |
|
|
flow.response.content = content or flow.response.content |
|
|
flow.response.tool_calls = tool_calls or [] |
|
|
flow.response.stop_reason = stop_reason |
|
|
flow.response.headers = headers or {} |
|
|
|
|
|
if usage: |
|
|
flow.response.usage = usage |
|
|
self.store.total_tokens_in += usage.input_tokens |
|
|
self.store.total_tokens_out += usage.output_tokens |
|
|
|
|
|
def fail_flow(self, flow_id: str, error_type: str, message: str, status_code: int = 0, raw: str = ""): |
|
|
"""标记 Flow 失败""" |
|
|
flow = self.store.get(flow_id) |
|
|
if not flow: |
|
|
return |
|
|
|
|
|
flow.state = FlowState.ERROR |
|
|
flow.timing.completed_at = time.time() |
|
|
flow.error = FlowError( |
|
|
type=error_type, |
|
|
message=message, |
|
|
status_code=status_code, |
|
|
raw=raw[:1000], |
|
|
) |
|
|
|
|
|
def bookmark_flow(self, flow_id: str, bookmarked: bool = True): |
|
|
"""书签 Flow""" |
|
|
flow = self.store.get(flow_id) |
|
|
if flow: |
|
|
flow.bookmarked = bookmarked |
|
|
|
|
|
def add_note(self, flow_id: str, note: str): |
|
|
"""添加备注""" |
|
|
flow = self.store.get(flow_id) |
|
|
if flow: |
|
|
flow.notes = note |
|
|
|
|
|
def add_tag(self, flow_id: str, tag: str): |
|
|
"""添加标签""" |
|
|
flow = self.store.get(flow_id) |
|
|
if flow and tag not in flow.tags: |
|
|
flow.tags.append(tag) |
|
|
|
|
|
def get_flow(self, flow_id: str) -> Optional[LLMFlow]: |
|
|
"""获取 Flow""" |
|
|
return self.store.get(flow_id) |
|
|
|
|
|
def query(self, **kwargs) -> List[LLMFlow]: |
|
|
"""查询 Flows""" |
|
|
return self.store.query(**kwargs) |
|
|
|
|
|
def get_stats(self) -> dict: |
|
|
"""获取统计""" |
|
|
return self.store.get_stats() |
|
|
|
|
|
def export(self, flow_ids: List[str] = None, format: str = "jsonl") -> str: |
|
|
"""导出 Flows""" |
|
|
if flow_ids: |
|
|
flows = [self.store.get(fid) for fid in flow_ids if self.store.get(fid)] |
|
|
else: |
|
|
flows = list(self.store.flows) |
|
|
|
|
|
if format == "jsonl": |
|
|
return self.store.export_jsonl(flows) |
|
|
elif format == "markdown" and len(flows) == 1: |
|
|
return self.store.export_markdown(flows[0]) |
|
|
else: |
|
|
return json.dumps([f.to_dict() for f in flows], ensure_ascii=False, indent=2) |
|
|
|
|
|
|
|
|
|
|
|
flow_monitor = FlowMonitor(max_flows=500) |
|
|
|