feat: add WeChat QR code login and AGP WebSocket channel plugin

- Auth module: WeChat OAuth2 scan-to-login flow with terminal QR code
- Token persistence to ~/.openclaw/wechat-access-auth.json (chmod 600)
- Token resolution: config > saved state > interactive login
- Invite code verification (configurable bypass)
- Production/test environment support
- AGP WebSocket client with heartbeat, reconnect, wake detection
- Message handler: Agent dispatch with streaming text and tool calls
- Random device GUID generation (persisted, no real machine ID)
This commit is contained in:
HenryXiaoYang
2026-03-10 02:29:06 +08:00
commit ba754ccc31
33 changed files with 14992 additions and 0 deletions

40
websocket/index.ts Normal file
View File

@@ -0,0 +1,40 @@
// ============================================
// WebSocket 模块导出
// ============================================
// 类型定义
export type {
AGPEnvelope,
AGPMethod,
ContentBlock,
ToolCall,
ToolCallKind,
ToolCallStatus,
ToolLocation,
PromptPayload,
CancelPayload,
UpdatePayload,
UpdateType,
PromptResponsePayload,
StopReason,
PromptMessage,
CancelMessage,
UpdateMessage,
PromptResponseMessage,
WebSocketClientConfig,
ConnectionState,
WebSocketClientCallbacks,
} from "./types.js";
// WebSocket 客户端
export { WechatAccessWebSocketClient } from "./websocket-client.js";
// 消息处理器
export { handlePrompt, handleCancel } from "./message-handler.js";
// 消息适配器
export {
extractTextFromContent,
promptPayloadToFuwuhaoMessage,
buildWebSocketMessageContext,
} from "./message-adapter.js";

View File

@@ -0,0 +1,116 @@
/**
* @file message-adapter.ts
* @description AGP 协议消息与 OpenClaw 内部格式之间的适配器
*
* 设计思路:
* WebSocket 通道AGP 协议)和 HTTP 通道(微信服务号 Webhook使用不同的消息格式
* 但底层的 Agent 路由、会话管理、消息处理逻辑是完全相同的。
* 此适配器将 AGP 消息转换为 OpenClaw 内部的 FuwuhaoMessage 格式,
* 从而复用 HTTP 通道已有的 buildMessageContext 逻辑,避免重复实现。
*
* 转换链路:
* AGP PromptPayload → FuwuhaoMessage → MsgContextOpenClaw 内部格式)
*/
import type { PromptPayload, ContentBlock } from "./types.js";
import type { FuwuhaoMessage } from "../http/types.js";
import { getWecomRuntime } from "../common/runtime.js";
import { buildMessageContext } from "../common/message-context.js";
// ============================================
// 消息适配器
// ============================================
// 负责 AGP 协议消息与 OpenClaw 内部格式之间的转换
/**
* 从 ContentBlock 数组中提取纯文本
* @param content - AGP 协议的内容块数组(每个块有 type 和 text 字段)
* @returns 合并后的纯文本字符串(多个文本块用换行符连接)
* @description
* AGP 协议的消息内容是结构化的 ContentBlock 数组,支持多种类型(目前只有 text
* 此函数将所有 text 类型的块提取出来,合并为一个纯文本字符串。
*
* 处理步骤:
* 1. filter: 过滤出 type === "text" 的块(忽略未来可能新增的其他类型)
* 2. map: 提取每个块的 text 字段
* 3. join: 用换行符连接多个文本块
*
* 示例:
* ```
* extractTextFromContent([
* { type: "text", text: "你好" },
* { type: "text", text: "请帮我写代码" }
* ])
* // 返回:"你好\n请帮我写代码"
* ```
*/
export const extractTextFromContent = (content: ContentBlock[]): string => {
return content
.filter((block) => block.type === "text")
.map((block) => block.text)
.join("\n");
};
/**
* 将 AGP session.prompt 载荷转换为 FuwuhaoMessage 格式
* @param payload - AGP 协议的 prompt 载荷(包含 session_id、prompt_id、content 等)
* @param userId - 用户 ID来自 AGP 信封的 user_id 字段)
* @returns OpenClaw 内部的 FuwuhaoMessage 格式
* @description
* FuwuhaoMessage 是 OpenClaw 为微信服务号定义的内部消息格式,
* 与微信服务号 Webhook 推送的消息格式保持一致。
* 通过将 AGP 消息转换为此格式,可以复用 HTTP 通道的所有处理逻辑。
*
* 字段映射:
* - msgtype: 固定为 "text"(当前只支持文本消息)
* - MsgId: 使用 prompt_id 作为消息 ID保证唯一性
* - Content: 从 ContentBlock 数组提取的纯文本
* - FromUserName: 发送者 ID来自 AGP 信封的 user_id
* - ToUserName: 固定为 "fuwuhao_bot"(接收方标识)
* - CreateTime: 当前时间戳秒级Math.floor(Date.now() / 1000)
*
* `Date.now()` 返回毫秒级时间戳,除以 1000 并取整得到秒级时间戳,
* 与微信服务号 Webhook 的 CreateTime 字段格式一致。
*/
export const promptPayloadToFuwuhaoMessage = (
payload: PromptPayload,
userId: string
): FuwuhaoMessage => {
const textContent = extractTextFromContent(payload.content);
return {
msgtype: "text",
MsgId: payload.prompt_id, // 使用 prompt_id 作为消息唯一 ID
Content: textContent,
FromUserName: userId,
ToUserName: "fuwuhao_bot",
CreateTime: Math.floor(Date.now() / 1000), // 秒级时间戳
};
};
/**
* 构建 WebSocket 消息的完整上下文
* @param payload - AGP 协议的 prompt 载荷
* @param userId - 用户 ID
* @returns 消息上下文对象,包含:
* - ctx: MsgContext — OpenClaw 内部消息上下文(含路由、会话等信息)
* - route: 路由信息agentId、accountId、sessionKey 等)
* - storePath: 会话存储文件路径
* @description
* 这是适配器的核心函数,完成两步转换:
* 1. AGP PromptPayload → FuwuhaoMessage通过 promptPayloadToFuwuhaoMessage
* 2. FuwuhaoMessage → MsgContext通过 buildMessageContext复用 HTTP 通道逻辑)
*
* buildMessageContext 内部会:
* - 根据消息的 FromUserName 和 ToUserName 确定路由(选择哪个 Agent
* - 计算 sessionKey用于关联历史对话
* - 确定 storePath会话历史存储位置
* - 构建完整的 MsgContext包含所有 Agent 处理所需的上下文信息)
*
* 通过这种适配方式WebSocket 通道和 HTTP 通道共享同一套路由和会话管理逻辑,
* 确保两个通道的行为完全一致。
*/
export const buildWebSocketMessageContext = (payload: PromptPayload, userId: string) => {
const message = promptPayloadToFuwuhaoMessage(payload, userId);
return buildMessageContext(message);
};

View File

@@ -0,0 +1,612 @@
/**
* @file message-handler.ts
* @description WebSocket 消息处理器
*
* 负责处理从 AGP 服务端收到的下行消息,核心流程:
* 1. 收到 session.prompt → 调用 OpenClaw Agent 处理用户指令
* 2. 通过 runtime.events.onAgentEvent 监听 Agent 的流式输出
* 3. 将流式输出实时通过 WebSocket 推送给服务端session.update
* 4. Agent 处理完成后发送最终结果session.promptResponse
* 5. 收到 session.cancel → 中断正在处理的 Turn
*/
import type {
PromptMessage,
CancelMessage,
ContentBlock,
ToolCall,
} from "./types.js";
import { onAgentEvent, type AgentEventPayload } from "../common/agent-events.js";
import type { WechatAccessWebSocketClient } from "./websocket-client.js";
/** 内容安全审核拦截标记,由 content-security 插件的 fetch 拦截器嵌入伪 SSE 响应中 */
const SECURITY_BLOCK_MARKER = "<!--CONTENT_SECURITY_BLOCK-->";
/** 安全拦截后返回给微信用户的通用提示文本(不暴露具体拦截原因) */
const SECURITY_BLOCK_USER_MESSAGE = "抱歉,我无法处理该任务,让我们换个任务试试看?";
/**
* `getWecomRuntime` 返回 OpenClaw 框架注入的运行时实例PluginRuntime
* 运行时提供了访问框架核心功能的统一入口,包括:
* - runtime.config.loadConfig():读取 openclaw 配置文件(~/.openclaw/config.json
* - runtime.events.onAgentEvent():订阅 Agent 运行时事件(流式输出、工具调用等)
* - runtime.channel.session会话元数据管理记录用户会话信息
* - runtime.channel.activity渠道活动统计记录收发消息次数
* - runtime.channel.reply消息回复调度调用 Agent 并分发回复)
*/
import { getWecomRuntime } from "../common/runtime.js";
import {
extractTextFromContent,
buildWebSocketMessageContext,
} from "./message-adapter.js";
// ============================================
// WebSocket 消息处理器
// ============================================
// 接收 AGP 下行消息 → 调用 OpenClaw Agent → 发送 AGP 上行消息
/**
* 活跃的 Prompt Turn 追踪器
* @description
* 每个正在处理中的用户请求Turn都会在 activeTurns Map 中注册一条记录。
* 用于支持取消操作:收到 session.cancel 时,通过 promptId 找到对应的 Turn
* 将其标记为已取消,并取消 Agent 事件订阅。
*/
interface ActiveTurn {
sessionId: string;
promptId: string;
/** 是否已被取消标志位Agent 事件回调中检查此值决定是否继续处理) */
cancelled: boolean;
/**
* Agent 事件取消订阅函数。
* `runtime.events.onAgentEvent()` 返回一个函数,调用该函数可以取消订阅,
* 停止接收后续的 Agent 事件(类似 EventEmitter 的 removeListener
*/
unsubscribe?: () => void;
}
/**
* 当前活跃的 Turn 映射promptId → ActiveTurn
* @description
* 使用 Map 而非对象,因为 Map 的 key 可以是任意类型,且有更好的增删性能。
* promptId 是服务端分配的唯一 Turn ID用于关联 prompt 和 cancel 消息。
*/
const activeTurns = new Map<string, ActiveTurn>();
/**
* 处理 session.prompt 消息 — 接收用户指令并调用 Agent
* @param message - AGP session.prompt 消息(包含用户指令内容)
* @param client - WebSocket 客户端实例(用于发送上行消息回服务端)
* @description
* 完整处理流程:
*
* ```
* 服务端 → session.prompt
* ↓
* 1. 注册 ActiveTurn支持后续取消
* ↓
* 2. getWecomRuntime() 获取运行时
* ↓
* 3. runtime.config.loadConfig() 读取配置
* ↓
* 4. buildWebSocketMessageContext() 构建消息上下文(路由、会话路径等)
* ↓
* 5. runtime.channel.session.recordSessionMetaFromInbound() 记录会话元数据
* ↓
* 6. runtime.channel.activity.record() 记录入站活动统计
* ↓
* 7. runtime.events.onAgentEvent() 订阅 Agent 流式事件
* ↓
* 8. runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher() 调用 Agent
* ↓ Agent 运行期间,步骤 7 的回调持续触发)
* ├── assistant 流 → client.sendMessageChunk() → session.update(message_chunk)
* └── tool 流 → client.sendToolCall/sendToolCallUpdate() → session.update(tool_call)
* ↓
* 9. client.sendPromptResponse() → session.promptResponse最终结果
* ```
*/
export const handlePrompt = async (
message: PromptMessage,
client: WechatAccessWebSocketClient
): Promise<void> => {
const { payload } = message;
const { session_id: sessionId, prompt_id: promptId } = payload;
const userId = message.user_id ?? "";
const guid = message.guid ?? "";
//message {
// msg_id: '9b842a47-c07d-4307-974f-42a4f8eeecb4',
// guid: '0ef9cc5e5dcb7ca068b0fb9982352c33',
// user_id: '3730000',
// method: 'session.prompt',
// payload: {
// session_id: '384f885b-4387-4f2b-9233-89a5fe6f94ee',
// prompt_id: 'ca694ac8-35e3-4e8b-9ecc-88efd4324515',
// agent_app: 'agent_demo',
// content: [ [Object] ]
// }
// }
const textContent = extractTextFromContent(payload.content);
console.log("[wechat-access-ws] 收到 prompt:", payload);
// ============================================
// 1. 注册活跃 Turn
// ============================================
// 在 activeTurns Map 中注册此次请求,以便 handleCancel 能找到并取消它
const turn: ActiveTurn = {
sessionId,
promptId,
cancelled: false,
};
activeTurns.set(promptId, turn);
try {
/**
* getWecomRuntime() 返回 OpenClaw 框架的运行时实例PluginRuntime
* 这是一个单例,在插件初始化时由 setWecomRuntime(api.runtime) 注入。
* 如果未初始化就调用会抛出错误。
*/
const runtime = getWecomRuntime();
/**
* runtime.config.loadConfig() 同步读取 OpenClaw 配置文件。
* 配置文件通常位于 ~/.openclaw/config.json包含
* - Agent 配置(模型、系统提示词等)
* - 渠道配置(各渠道的账号信息)
* - 会话存储路径等
* 返回的 cfg 对象在后续的 dispatchReplyWithBufferedBlockDispatcher 中使用。
*/
const cfg = runtime.config.loadConfig();
// ============================================
// 2. 构建消息上下文
// ============================================
/**
* buildWebSocketMessageContext() 将 AGP 消息转换为 OpenClaw 内部的消息上下文格式。
* 返回值包含:
* - ctx: MsgContext — 消息上下文(包含 From、To、SessionKey、AgentId 等字段)
* - route: 路由信息agentId、accountId、sessionKey 等)
* - storePath: 会话存储文件路径(如 ~/.openclaw/sessions/agent-xxx.json
*
* 这样可以复用 HTTP 通道的路由和会话管理逻辑,保持一致性。
*/
const { ctx, route, storePath } = buildWebSocketMessageContext(payload, userId);
console.log("[wechat-access-ws] 路由信息:", {
sessionKey: route.sessionKey,
agentId: route.agentId,
accountId: route.accountId,
});
// ============================================
// 3. 记录会话元数据
// ============================================
/**
* runtime.channel.session.recordSessionMetaFromInbound() 将本次消息的元数据
* 写入会话存储文件storePath 指向的 JSON 文件)。
* 元数据包括:用户 ID、渠道类型、最后活跃时间等。
* 这些数据用于会话管理、上下文恢复等功能。
*
* 使用 void + .catch() 的原因:
* - void: 明确表示不等待此 Promise不阻塞主流程
* - .catch(): 捕获错误并打印日志,避免未处理的 Promise rejection
* 会话元数据写入失败不影响消息处理,所以不需要 await。
*/
void runtime.channel.session
.recordSessionMetaFromInbound({
storePath,
sessionKey: (ctx.SessionKey as string) ?? route.sessionKey,
ctx,
})
.catch((err: unknown) => {
console.log(`[wechat-access-ws] 记录会话元数据失败: ${String(err)}`);
});
// ============================================
// 4. 记录入站活动
// ============================================
/**
* runtime.channel.activity.record() 记录渠道活动统计数据。
* direction: "inbound" 表示这是一条收到的消息(用户 → 系统)。
* 这些统计数据用于 OpenClaw 控制台的活动监控面板。
*/
runtime.channel.activity.record({
channel: "wechat-access",
accountId: route.accountId ?? "default",
direction: "inbound",
});
// ============================================
// 5. 订阅 Agent 事件(流式输出)
// ============================================
/**
* runtime.events.onAgentEvent() 注册一个全局 Agent 事件监听器。
* 当 Agent 运行时会通过事件总线EventEmitter广播各种事件。
*
* AgentEventPayload 结构:
* {
* runId: string; // Agent 运行实例 ID
* seq: number; // 事件序号(严格递增,用于检测丢失事件)
* stream: string; // 事件流类型(见下方说明)
* ts: number; // 时间戳(毫秒)
* data: Record<string, unknown>; // 事件数据(不同 stream 有不同结构)
* sessionKey?: string; // 关联的会话 key
* }
*
* stream 类型说明:
* - "assistant": AI 助手的文本输出流
* data.delta: 增量文本(本次新增的部分)
* data.text: 累积文本(从开始到现在的完整文本)
* - "tool": 工具调用流
* data.phase: 阶段("start" | "update" | "result"
* data.name: 工具名称(如 "read_file"、"write"
* data.toolCallId: 工具调用唯一 ID
* data.args: 工具参数phase=start 时)
* data.result: 工具执行结果phase=result 时)
* data.isError: 是否执行失败phase=result 时)
* - "lifecycle": 生命周期事件start/end/error
* - "compaction": 上下文压缩事件
*
* 返回值是取消订阅函数,调用后停止接收事件。
* 注意:这是全局事件总线,所有 Agent 运行的事件都会触发此回调,
* 但目前没有按 runId 过滤(因为同一时间通常只有一个 Agent 在运行)。
*/
let lastEmittedText = ""; // 记录已发送的累积文本,用于计算增量
let toolCallCounter = 0; // 工具调用计数器,用于生成备用 toolCallId
// await 确保 SDK 加载完成、监听器真正挂载后,再调用 dispatchReply
// 否则 Agent 产生事件时监听器还未注册,导致所有事件丢失
const unsubscribe = await onAgentEvent((evt: AgentEventPayload) => {
// 如果 Turn 已被取消,忽略后续事件(不再向服务端推送)
if (turn.cancelled) return;
// 过滤非本 Turn 的事件,避免并发多个 prompt 时事件串流
if (evt.sessionKey && evt.sessionKey !== route.sessionKey) return;
const data = evt.data as Record<string, unknown>;
// --- 处理流式文本assistant 流)---
if (evt.stream === "assistant") {
/**
* Agent 生成文本时,事件总线会持续触发 assistant 流事件。
* 每个事件包含:
* - data.delta: 本次新增的文本片段(增量)
* - data.text: 从开始到现在的完整文本(累积)
*
* 优先使用 delta增量因为它直接就是需要发送的内容。
* 如果没有 delta某些 AI 提供商只提供累积文本),
* 则通过 text.slice(lastEmittedText.length) 手动计算增量。
*/
const delta = data.delta as string | undefined;
const text = data.text as string | undefined;
let textToSend = delta;
if (!textToSend && text && text !== lastEmittedText) {
// 手动计算增量:新的累积文本 - 已发送的累积文本 = 本次增量
textToSend = text.slice(lastEmittedText.length);
lastEmittedText = text;
} else if (delta) {
lastEmittedText += delta;
}
// 检测安全审核拦截标记:如果流式文本中包含拦截标记,停止向用户推送
// 拦截标记由 content-security 插件的 fetch 拦截器注入伪 SSE 响应
if (textToSend && textToSend.includes(SECURITY_BLOCK_MARKER)) {
console.warn("[wechat-access-ws] 流式文本中检测到安全审核拦截标记,停止推送");
turn.cancelled = true; // 标记为已取消,阻止后续流式事件继续推送
return;
}
if (lastEmittedText.includes(SECURITY_BLOCK_MARKER)) {
console.warn("[wechat-access-ws] 累积文本中检测到安全审核拦截标记,停止推送");
turn.cancelled = true;
return;
}
if (textToSend) {
// 将增量文本作为 session.update(message_chunk) 发送给服务端
client.sendMessageChunk(sessionId, promptId, {
type: "text",
text: textToSend,
}, guid, userId);
}
return;
}
// --- 处理工具调用事件tool 流)---
if (evt.stream === "tool") {
/**
* 工具调用有三个阶段phase
* - "start": 工具开始执行(发送 tool_callstatus=in_progress
* - "update": 工具执行中有中间结果(发送 tool_call_updatestatus=in_progress
* - "result": 工具执行完成(发送 tool_call_updatestatus=completed/failed
*
* toolCallId 是工具调用的唯一标识,用于关联同一次工具调用的多个事件。
* 如果 Agent 没有提供 toolCallId则用计数器生成一个备用 ID。
*/
const phase = data.phase as string | undefined;
const toolName = data.name as string | undefined;
const toolCallId = (data.toolCallId as string) || `tc-${++toolCallCounter}`;
if (phase === "start") {
// 工具开始执行:通知服务端展示工具调用状态(进行中)
const toolCall: ToolCall = {
tool_call_id: toolCallId,
title: toolName,
kind: mapToolKind(toolName), // 根据工具名推断工具类型read/edit/search 等)
status: "in_progress",
};
client.sendToolCall(sessionId, promptId, toolCall, guid, userId);
} else if (phase === "update") {
// 工具执行中有中间结果(如读取文件的部分内容)
const toolCall: ToolCall = {
tool_call_id: toolCallId,
title: toolName,
status: "in_progress",
content: data.text
? [{ type: "text" as const, text: data.text as string }]
: undefined,
};
client.sendToolCallUpdate(sessionId, promptId, toolCall, guid, userId);
} else if (phase === "result") {
// 工具执行完成:更新状态为 completed 或 failed
const isError = data.isError as boolean | undefined;
const toolCall: ToolCall = {
tool_call_id: toolCallId,
title: toolName,
status: isError ? "failed" : "completed",
// 将工具执行结果作为内容块附加(可选,用于展示)
content: data.result
? [{ type: "text" as const, text: data.result as string }]
: undefined,
};
client.sendToolCallUpdate(sessionId, promptId, toolCall, guid, userId);
}
return;
}
});
// 将取消订阅函数保存到 Turn 记录中,以便 handleCancel 调用
turn.unsubscribe = unsubscribe;
// ============================================
// 6. 调用 Agent 处理消息
// ============================================
/**
* runtime.channel.reply.resolveEffectiveMessagesConfig() 解析当前 Agent 的消息配置。
* 返回值包含:
* - responsePrefix: 回复前缀(如果配置了的话)
* - 其他消息格式配置
* 参数 route.agentId 指定要查询哪个 Agent 的配置。
*/
const messagesConfig = runtime.channel.reply.resolveEffectiveMessagesConfig(
cfg,
route.agentId
);
let finalText: string | null = null;
/**
* runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher() 是核心调用。
* 它完成以下工作:
* 1. 根据 ctx消息上下文和 cfg配置确定使用哪个 Agent
* 2. 加载该 Agent 的历史会话记录(上下文)
* 3. 调用 AI 模型生成回复(流式)
* 4. 在生成过程中,通过事件总线广播 assistant/tool 流事件(步骤 5 的回调会收到)
* 5. 将生成的回复通过 dispatcherOptions.deliver 回调交付
* 6. 保存本次对话到会话历史
*
* "BufferedBlockDispatcher" 的含义:
* - Buffered: 将流式输出缓冲后再交付(避免过于频繁的回调)
* - Block: 按块(段落/句子)分割回复
* - Dispatcher: 负责将回复分发给 deliver 回调
*
* 返回值 { queuedFinal } 包含最终排队的回复内容(此处未使用,通过 deliver 回调获取)。
*
* 注意:此函数是 async 的,会等待 Agent 完全处理完毕才 resolve。
* 在等待期间,步骤 5 注册的 onAgentEvent 回调会持续被触发(流式推送)。
*/
const { queuedFinal } = await runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx,
cfg,
dispatcherOptions: {
responsePrefix: messagesConfig.responsePrefix,
/**
* deliver 回调:当 Agent 生成了一个完整的回复块时调用。
* @param payload - 回复内容text、mediaUrl 等)
* @param info - 回复元信息kind: "final" | "chunk" | "error" 等)
*
* 这里主要用于:
* 1. 捕获最终回复文本finalText
* 2. 记录出站活动统计
*
* 注意:流式文本已经通过 onAgentEvent 的 assistant 流实时推送了,
* 这里的 deliver 是最终汇总的回调,用于获取完整的最终文本。
*/
deliver: async (
payload: {
text?: string;
mediaUrl?: string;
mediaUrls?: string[];
isError?: boolean;
channelData?: unknown;
},
info: { kind: string }
) => {
if (turn.cancelled) return;
console.log(`[wechat-access-ws] Agent ${info.kind} 回复:`, payload.text?.slice(0, 50));
// 保存最终回复文本,用于构建 session.promptResponse 的 content
// 不限制 kind只要有 text 就更新final/chunk 都可能携带完整文本)
if (payload.text) {
// 检测安全审核拦截标记:如果回复文本包含拦截标记,
// 替换为通用安全提示,不向用户暴露具体拦截原因和内部标记
if (payload.text.includes(SECURITY_BLOCK_MARKER)) {
console.warn("[wechat-access-ws] deliver 回复中检测到安全审核拦截标记,替换为安全提示");
finalText = SECURITY_BLOCK_USER_MESSAGE;
} else {
finalText = payload.text;
}
}
// 记录出站活动统计(每次 deliver 都算一次出站)
runtime.channel.activity.record({
channel: "wechat-access",
accountId: route.accountId ?? "default",
direction: "outbound",
});
},
onError: (err: unknown, info: { kind: string }) => {
console.error(`[wechat-access-ws] Agent ${info.kind} 回复失败:`, err);
},
},
replyOptions: {},
});
// ============================================
// 7. 发送最终结果
// ============================================
// Agent 处理完成,取消事件订阅并清理 Turn 记录
unsubscribe();
activeTurns.delete(promptId);
if (turn.cancelled) {
// 如果在 Agent 处理期间收到了 cancel 消息,发送 cancelled 响应
client.sendPromptResponse({
session_id: sessionId,
prompt_id: promptId,
stop_reason: "cancelled",
}, guid, userId);
return;
}
// 构建最终内容块(如果有文本回复的话)
// 优先用 deliver 回调收到的 finalText兜底用流式事件累积的 lastEmittedText
let replyText = finalText || (lastEmittedText.trim() ? lastEmittedText : null);
// 最后一道防线:检查最终回复文本是否包含安全拦截标记
// 正常情况下 deliver 回调和流式事件中已经处理过了,这里是兜底
if (replyText && replyText.includes(SECURITY_BLOCK_MARKER)) {
console.warn("[wechat-access-ws] 最终回复文本中检测到安全审核拦截标记,替换为安全提示");
replyText = SECURITY_BLOCK_USER_MESSAGE;
}
const responseContent: ContentBlock[] = replyText
? [{ type: "text", text: replyText }]
: [];
// 发送 session.promptResponse告知服务端本次 Turn 已正常完成
client.sendPromptResponse({
session_id: sessionId,
prompt_id: promptId,
stop_reason: "end_turn",
content: responseContent,
}, guid, userId);
console.log("[wechat-access-ws] prompt 处理完成:", { promptId, hasReply: !!replyText, finalText: !!finalText, lastEmittedText: lastEmittedText.length });
} catch (err) {
// ============================================
// 错误处理
// ============================================
console.error("[wechat-access-ws] prompt 处理失败:", err);
// 清理活跃 Turn取消事件订阅从 Map 中移除)
const currentTurn = activeTurns.get(promptId);
currentTurn?.unsubscribe?.();
activeTurns.delete(promptId);
// 发送错误响应,告知服务端本次 Turn 因错误终止
client.sendPromptResponse({
session_id: sessionId,
prompt_id: promptId,
stop_reason: "error",
error: err instanceof Error ? err.message : String(err),
}, guid, userId);
}
};
/**
* 处理 session.cancel 消息 — 取消正在处理的 Prompt Turn
* @param message - AGP session.cancel 消息
* @param client - WebSocket 客户端实例
* @description
* 取消流程:
* 1. 通过 promptId 在 activeTurns Map 中查找对应的 Turn
* 2. 将 turn.cancelled 标记为 truehandlePrompt 中的 onAgentEvent 回调会检查此标志)
* 3. 调用 turn.unsubscribe() 停止接收后续 Agent 事件
* 4. 从 activeTurns 中移除此 Turn
* 5. 发送 session.promptResponsestop_reason: "cancelled"
*
* 注意:取消操作是"尽力而为"的Agent 可能已经处理完毕,
* 此时 activeTurns 中找不到对应 Turn但仍然发送 cancelled 响应。
*/
export const handleCancel = (
message: CancelMessage,
client: WechatAccessWebSocketClient
): void => {
const { session_id: sessionId, prompt_id: promptId } = message.payload;
console.log("[wechat-access-ws] 收到 cancel:", { sessionId, promptId });
const turn = activeTurns.get(promptId);
if (!turn) {
console.warn(`[wechat-access-ws] 未找到活跃 Turn: ${promptId}`);
// 即使找不到对应 Turn可能已处理完毕也发送 cancelled 响应
// 确保服务端收到明确的结束信号
client.sendPromptResponse({
session_id: sessionId,
prompt_id: promptId,
stop_reason: "cancelled",
});
return;
}
// 标记为已取消handlePrompt 中的 onAgentEvent 回调会检查此标志,
// 一旦为 true后续的流式事件都会被忽略不再向服务端推送
turn.cancelled = true;
// 取消 Agent 事件订阅,停止接收后续事件
// 可选链 ?.() 是因为 unsubscribe 可能还未赋值Turn 刚注册但还未到步骤 5
turn.unsubscribe?.();
activeTurns.delete(promptId);
// 发送 cancelled 响应
client.sendPromptResponse({
session_id: sessionId,
prompt_id: promptId,
stop_reason: "cancelled",
});
console.log("[wechat-access-ws] Turn 已取消:", promptId);
};
// ============================================
// 辅助函数
// ============================================
/**
* 将工具名称映射为 AGP 协议的 ToolCallKind
* @param toolName - 工具名称(如 "read_file"、"write"、"grep_search" 等)
* @returns ToolCallKind 枚举值,用于服务端展示不同类型的工具调用图标
* @description
* 通过关键词匹配推断工具类型,映射规则:
* - read/get/view → "read"(读取操作)
* - write/edit/replace → "edit"(编辑操作)
* - delete/remove → "delete"(删除操作)
* - search/find/grep → "search"(搜索操作)
* - fetch/request/http → "fetch"(网络请求)
* - think/reason → "think"(思考/推理)
* - exec/run/terminal → "execute"(执行命令)
* - 其他 → "other"
*/
const mapToolKind = (toolName?: string): ToolCall["kind"] => {
if (!toolName) return "other";
const name = toolName.toLowerCase();
if (name.includes("read") || name.includes("get") || name.includes("view")) return "read";
if (name.includes("write") || name.includes("edit") || name.includes("replace")) return "edit";
if (name.includes("delete") || name.includes("remove")) return "delete";
if (name.includes("search") || name.includes("find") || name.includes("grep")) return "search";
if (name.includes("fetch") || name.includes("request") || name.includes("http")) return "fetch";
if (name.includes("think") || name.includes("reason")) return "think";
if (name.includes("exec") || name.includes("run") || name.includes("terminal")) return "execute";
return "other";
};

290
websocket/types.ts Normal file
View File

@@ -0,0 +1,290 @@
/**
* @file types.ts
* @description AGP (Agent Gateway Protocol) 协议类型定义
*
* AGP 是 OpenClaw 与外部服务(如微信服务号后端)之间的 WebSocket 通信协议。
* 所有消息都使用统一的「信封Envelope」格式通过 method 字段区分消息类型。
*
* 消息方向:
* 下行(服务端 → 客户端session.prompt、session.cancel
* 上行(客户端 → 服务端session.update、session.promptResponse
*
* 基于 websocket.md 协议文档定义
*/
// ============================================
// AGP 消息信封
// ============================================
/**
* AGP 统一消息信封
* 所有 WebSocket 消息(上行和下行)均使用此格式
*/
export interface AGPEnvelope<T = unknown> {
/** 全局唯一消息 IDUUID用于幂等去重 */
msg_id: string;
/** 设备唯一标识(下行消息携带,上行消息需原样回传) */
guid?: string;
/** 用户 ID下行消息携带上行消息需原样回传 */
user_id?: string;
/** 消息类型 */
method: AGPMethod;
/** 消息载荷 */
payload: T;
}
// ============================================
// Method 枚举
// ============================================
/**
* AGP 消息方法枚举
* - session.prompt: 下发用户指令(服务端 → 客户端)
* - session.cancel: 取消 Prompt Turn服务端 → 客户端)
* - session.update: 流式中间更新(客户端 → 服务端)
* - session.promptResponse: 最终结果(客户端 → 服务端)
*/
export type AGPMethod =
| "session.prompt"
| "session.cancel"
| "session.update"
| "session.promptResponse"
| "ping";
// ============================================
// 通用数据结构
// ============================================
/**
* 内容块
* 当前仅支持 text 类型
*/
export interface ContentBlock {
type: "text";
text: string;
}
/**
* 工具调用状态枚举
*/
export type ToolCallStatus = "pending" | "in_progress" | "completed" | "failed";
/**
* 工具调用类型枚举
*/
export type ToolCallKind = "read" | "edit" | "delete" | "execute" | "search" | "fetch" | "think" | "other";
/**
* 工具操作路径
* @description 记录工具调用涉及的文件或目录路径,用于在 UI 中展示操作位置
*/
export interface ToolLocation {
/** 文件或目录的绝对路径 */
path: string;
}
/**
* 工具调用
* @description
* 描述一次工具调用的完整信息,用于在 session.update 消息中实时推送工具执行状态。
* 一次工具调用会产生多个 session.update 消息:
* 1. update_type=tool_call工具开始执行status=in_progress
* 2. update_type=tool_call_update执行中间状态status=in_progress可选
* 3. update_type=tool_call_update执行完成status=completed/failed
*/
export interface ToolCall {
/** 工具调用唯一 ID用于关联同一次工具调用的多个 update 消息 */
tool_call_id: string;
/** 工具调用标题(展示用,通常是工具名称,如 "read_file" */
title?: string;
/** 工具类型,用于 UI 展示不同的图标 */
kind?: ToolCallKind;
/** 工具调用当前状态 */
status: ToolCallStatus;
/** 工具调用结果内容phase=result 时附带,用于展示工具输出) */
content?: ContentBlock[];
/** 工具操作涉及的文件路径(可选,用于 UI 展示操作位置) */
locations?: ToolLocation[];
}
// ============================================
// 下行消息(服务端 → 客户端)
// ============================================
/**
* session.prompt 载荷 — 下发用户指令
* @description
* 服务端收到用户消息后通过此消息将用户指令下发给客户端OpenClaw Agent处理。
* 客户端处理完毕后,需要发送 session.promptResponse 作为响应。
*/
export interface PromptPayload {
/** 所属 Session ID标识一个完整的对话会话 */
session_id: string;
/** 本次 Turn 唯一 ID标识一次「用户提问 + AI 回答」的完整交互) */
prompt_id: string;
/** 目标 AI 应用标识(指定由哪个 Agent 处理此消息) */
agent_app: string;
/** 用户指令内容(结构化内容块数组,目前只支持 text 类型) */
content: ContentBlock[];
}
/**
* session.cancel 载荷 — 取消 Prompt Turn
* @description
* 用户主动取消正在处理的请求时,服务端发送此消息。
* 客户端收到后应停止 Agent 处理,并发送 stop_reason=cancelled 的 promptResponse。
*/
export interface CancelPayload {
/** 所属 Session ID */
session_id: string;
/** 要取消的 Turn ID与对应 session.prompt 的 prompt_id 一致) */
prompt_id: string;
/** 目标 AI 应用标识 */
agent_app: string;
}
// ============================================
// 上行消息(客户端 → 服务端)
// ============================================
/**
* session.update 的更新类型
* @description
* 定义 session.update 消息中 update_type 字段的可选值:
* - message_chunk: Agent 生成的增量文本片段(流式输出,每次只包含新增的部分)
* - tool_call: Agent 开始调用一个工具(通知服务端展示工具调用状态)
* - tool_call_update: 工具调用状态变更(执行中的中间结果,或执行完成/失败)
*/
export type UpdateType = "message_chunk" | "tool_call" | "tool_call_update";
/**
* session.update 载荷 — 流式中间更新
* @description
* 在 Agent 处理 session.prompt 的过程中,通过此消息实时推送中间状态。
* 服务端收到后转发给用户端,实现流式输出效果。
*
* 根据 update_type 的不同,使用不同的字段:
* - message_chunk: 使用 content 字段(单个 ContentBlock非数组
* - tool_call / tool_call_update: 使用 tool_call 字段
*/
export interface UpdatePayload {
/** 所属 Session ID */
session_id: string;
/** 所属 Turn ID与对应 session.prompt 的 prompt_id 一致) */
prompt_id: string;
/** 更新类型,决定使用 content 还是 tool_call 字段 */
update_type: UpdateType;
/**
* 文本内容块update_type=message_chunk 时使用)
* 注意:这里是单个 ContentBlock 对象,而非数组
*/
content?: ContentBlock;
/** 工具调用信息update_type=tool_call 或 tool_call_update 时使用) */
tool_call?: ToolCall;
}
/**
* 停止原因枚举
* - end_turn: 正常完成
* - cancelled: 被取消
* - refusal: AI 应用拒绝执行
* - error: 技术错误
*/
export type StopReason = "end_turn" | "cancelled" | "refusal" | "error";
/**
* session.promptResponse 载荷 — 最终结果
* @description
* Agent 处理完 session.prompt 后,必须发送此消息告知服务端本次 Turn 已结束。
* 无论正常完成、被取消还是出错,都需要发送此消息。
* 服务端收到后才会认为本次 Turn 已关闭,可以接受下一个 prompt。
*/
export interface PromptResponsePayload {
/** 所属 Session ID */
session_id: string;
/** 所属 Turn ID与对应 session.prompt 的 prompt_id 一致) */
prompt_id: string;
/** 停止原因,告知服务端 Turn 是如何结束的 */
stop_reason: StopReason;
/**
* 最终结果内容ContentBlock 数组)
* stop_reason=end_turn 时附带,包含 Agent 的完整回复文本
* stop_reason=cancelled/error 时通常为空
*/
content?: ContentBlock[];
/** 错误描述stop_reason 为 error 或 refusal 时附带,说明失败原因) */
error?: string;
}
// ============================================
// 类型别名(方便使用)
// ============================================
/** 下行session.prompt 消息 */
export type PromptMessage = AGPEnvelope<PromptPayload>;
/** 下行session.cancel 消息 */
export type CancelMessage = AGPEnvelope<CancelPayload>;
/** 上行session.update 消息 */
export type UpdateMessage = AGPEnvelope<UpdatePayload>;
/** 上行session.promptResponse 消息 */
export type PromptResponseMessage = AGPEnvelope<PromptResponsePayload>;
// ============================================
// WebSocket 客户端配置
// ============================================
/**
* WebSocket 客户端配置
* @description
* 在插件入口index.ts的 WS_CONFIG 常量中配置,传入 WechatAccessWebSocketClient 构造函数。
*/
export interface WebSocketClientConfig {
/** WebSocket 服务端地址(如 ws://21.0.62.97:8080/ */
url: string;
/** 设备唯一标识,用于服务端识别连接来源(作为 URL 查询参数传递) */
guid: string;
/** 用户账户 ID作为 URL 查询参数传递,也用于上行消息的 user_id 字段) */
userId: string;
/** 鉴权 token可选作为 URL 查询参数传递,当前服务端未校验) */
token?: string;
/**
* 重连间隔基准值(毫秒),默认 30003秒
* 实际重连间隔使用指数退避策略,此值是第一次重连的等待时间
*/
reconnectInterval?: number;
/**
* 最大重连次数,默认 0无限重连
* 设为正整数时,超过此次数后停止重连并将状态设为 disconnected
*/
maxReconnectAttempts?: number;
/**
* 心跳间隔(毫秒),默认 2400004分钟
* 应小于服务端的空闲超时时间(通常为 5 分钟),确保连接不会因空闲被断开
* 心跳使用 WebSocket 原生 ping 控制帧ws 库的 ws.ping() 方法)
*/
heartbeatInterval?: number;
/**
* 当前 openclaw gateway 监听的端口号(来自 cfg.gateway.port
* 用于日志前缀,方便区分多实例
*/
gatewayPort?: string;
}
/**
* WebSocket 连接状态
*/
export type ConnectionState = "disconnected" | "connecting" | "connected" | "reconnecting";
/**
* WebSocket 客户端事件回调
*/
export interface WebSocketClientCallbacks {
/** 连接成功 */
onConnected?: () => void;
/** 连接断开 */
onDisconnected?: (reason?: string) => void;
/** 收到 session.prompt 消息 */
onPrompt?: (message: PromptMessage) => void;
/** 收到 session.cancel 消息 */
onCancel?: (message: CancelMessage) => void;
/** 发生错误 */
onError?: (error: Error) => void;
}

View File

@@ -0,0 +1,739 @@
/**
* `randomUUID` 来自 Node.js 内置的 `node:crypto` 模块。
* 用于生成符合 RFC 4122 标准的 UUID v4 字符串,格式如:
* "550e8400-e29b-41d4-a716-446655440000"
* 每次调用都会生成一个全局唯一的随机字符串,用作消息的 msg_id。
* 注意:这是 Node.js 原生 API不需要安装任何第三方库。
*/
import { randomUUID } from "node:crypto";
import WebSocket from "ws";
import type {
AGPEnvelope,
AGPMethod,
WebSocketClientConfig,
ConnectionState,
WebSocketClientCallbacks,
PromptMessage,
CancelMessage,
UpdatePayload,
PromptResponsePayload,
ContentBlock,
ToolCall,
} from "./types.js";
// ============================================
// WebSocket 客户端核心
// ============================================
// 负责 WebSocket 连接管理、消息收发、自动重连、心跳保活
/**
* WebSocket 客户端
* @description
* 连接到 AGP WebSocket 服务端,处理双向通信:
* - 接收下行消息session.prompt / session.cancel
* - 发送上行消息session.update / session.promptResponse
* - 自动重连:连接断开后自动尝试重连(指数退避策略)
* - 心跳保活:定期发送 WebSocket 原生 ping 帧,防止服务端因空闲超时断开连接
* - 消息去重:通过 msg_id 实现幂等处理,避免重复消息被处理两次
*/
export class WechatAccessWebSocketClient {
private config: Required<Omit<WebSocketClientConfig, "token" | "gatewayPort">> & { token?: string; gatewayPort?: string };
private callbacks: WebSocketClientCallbacks;
/**
* ws 库的 WebSocket 实例。
* 类型写作 `WebSocket.WebSocket` 是因为 ws 库的默认导出是类本身,
* 而 `WebSocket.WebSocket` 是其实例类型TypeScript 类型系统的要求)。
* 未连接时为 null。
*/
private ws: WebSocket | null = null;
/** 当前连接状态 */
private state: ConnectionState = "disconnected";
/**
* 重连定时器句柄。
* `ReturnType<typeof setTimeout>` 是 TypeScript 推荐的写法,
* 可以同时兼容 Node.js返回 Timeout 对象)和浏览器(返回 number环境。
*/
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
/**
* 心跳定时器句柄。
* `ReturnType<typeof setInterval>` 同上,兼容 Node.js 和浏览器。
*/
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
/** 当前已尝试的重连次数 */
private reconnectAttempts = 0;
/**
* 已处理的消息 ID 集合(用于去重)。
* 使用 Set 而非数组,查找时间复杂度为 O(1)。
* 当消息因网络问题被重发时,通过检查 msg_id 是否已存在来避免重复处理。
*/
private processedMsgIds = new Set<string>();
/** 消息 ID 缓存定期清理定时器(防止 Set 无限增长导致内存泄漏) */
private msgIdCleanupTimer: ReturnType<typeof setInterval> | null = null;
/** 上次收到 pong 的时间戳(用于检测连接假死) */
private lastPongTime = Date.now();
/** 系统唤醒检测定时器 */
private wakeupCheckTimer: ReturnType<typeof setInterval> | null = null;
/** 唤醒检测:上次 tick 的时间戳 */
private lastTickTime = Date.now();
/** 消息 ID 缓存的最大容量,超过此值时触发清理 */
private static readonly MAX_MSG_ID_CACHE = 1000;
/** 从 config.url 中解析出端口号,用于日志前缀 */
private get port(): string {
return this.config.gatewayPort ?? 'unknown';
}
/** 带端口号的日志前缀 */
private get logPrefix(): string {
return `[wechat-access-ws:${this.port}]`;
}
constructor(config: WebSocketClientConfig, callbacks: WebSocketClientCallbacks = {}) {
this.config = {
url: config.url,
guid: config.guid ?? '',
userId: config.userId ?? '',
token: config.token,
gatewayPort: config.gatewayPort,
reconnectInterval: config.reconnectInterval ?? 3000,
maxReconnectAttempts: config.maxReconnectAttempts ?? 0,
// 默认 20s发一次心跳小于服务端 1 分钟的空闲超时时间
heartbeatInterval: config.heartbeatInterval ?? 20000,
};
this.callbacks = callbacks;
}
/**
* 启动 WebSocket 连接
* @description
* 如果当前已连接或正在连接中,则直接返回,避免重复建立连接。
* 同时启动消息 ID 缓存的定期清理任务。
*/
start = (): void => {
if (this.state === "connected" || this.state === "connecting") {
console.log(`${this.logPrefix} 已连接或正在连接,跳过`);
return;
}
this.connect();
this.startMsgIdCleanup();
};
/**
* 停止 WebSocket 连接
* @description
* 主动断开连接时调用。会:
* 1. 将状态设为 "disconnected"(阻止断开后触发自动重连)
* 2. 清理所有定时器(重连、心跳、消息 ID 清理)
* 3. 清空消息 ID 缓存
* 4. 关闭 WebSocket 连接
*/
stop = (): void => {
console.log(`${this.logPrefix} 正在停止...`);
this.state = "disconnected";
this.clearReconnectTimer();
this.clearHeartbeat();
this.clearWakeupDetection();
this.clearMsgIdCleanup();
this.processedMsgIds.clear();
if (this.ws) {
this.ws.close();
this.ws = null;
}
console.log(`${this.logPrefix} 已停止`);
};
/**
* 获取当前连接状态
* @returns "disconnected" | "connecting" | "connected" | "reconnecting"
*/
getState = (): ConnectionState => this.state;
/**
* 更新事件回调
* @description 使用对象展开合并,只更新传入的回调,保留未传入的原有回调
*/
setCallbacks = (callbacks: Partial<WebSocketClientCallbacks>): void => {
this.callbacks = { ...this.callbacks, ...callbacks };
};
/**
* 发送 session.update 消息 — 流式中间更新(文本块)
* @param sessionId - 所属 Session ID
* @param promptId - 所属 Turn ID
* @param content - 文本内容块type: "text"
* @description
* 在 Agent 生成回复的过程中,将增量文本实时推送给服务端,
* 服务端再转发给用户端展示流式输出效果。
*/
sendMessageChunk = (sessionId: string, promptId: string, content: ContentBlock, guid?: string, userId?: string): void => {
console.log(`${this.logPrefix} [sendMessageChunk] sessionId=${sessionId}, promptId=${promptId}, guid=${guid}, userId=${userId}, content=${JSON.stringify(content).substring(0, 200)}`);
const payload: UpdatePayload = {
session_id: sessionId,
prompt_id: promptId,
update_type: "message_chunk",
content,
};
this.sendEnvelope("session.update", payload, guid, userId);
};
/**
* 发送 session.update 消息 — 工具调用开始
* @param sessionId - 所属 Session ID
* @param promptId - 所属 Turn ID
* @param toolCall - 工具调用信息(包含 tool_call_id、title、kind、status
* @description
* 当 Agent 开始调用某个工具时发送,通知服务端展示工具调用状态。
*/
sendToolCall = (sessionId: string, promptId: string, toolCall: ToolCall, guid?: string, userId?: string): void => {
console.log(`${this.logPrefix} [sendToolCall] sessionId=${sessionId}, promptId=${promptId}, guid=${guid}, userId=${userId}, toolCall=${JSON.stringify(toolCall)}`);
const payload: UpdatePayload = {
session_id: sessionId,
prompt_id: promptId,
update_type: "tool_call",
tool_call: toolCall,
};
this.sendEnvelope("session.update", payload, guid, userId);
};
/**
* 发送 session.update 消息 — 工具调用状态变更
* @param sessionId - 所属 Session ID
* @param promptId - 所属 Turn ID
* @param toolCall - 更新后的工具调用信息status 变为 completed/failed
* @description
* 当工具执行完成或失败时发送,通知服务端更新工具调用的展示状态。
*/
sendToolCallUpdate = (sessionId: string, promptId: string, toolCall: ToolCall, guid?: string, userId?: string): void => {
console.log(`${this.logPrefix} [sendToolCallUpdate] sessionId=${sessionId}, promptId=${promptId}, guid=${guid}, userId=${userId}, toolCall=${JSON.stringify(toolCall)}`);
const payload: UpdatePayload = {
session_id: sessionId,
prompt_id: promptId,
update_type: "tool_call_update",
tool_call: toolCall,
};
this.sendEnvelope("session.update", payload, guid, userId);
};
/**
* 发送 session.promptResponse 消息 — 最终结果
* @param payload - 包含 stop_reason、content、error 等最终结果信息
* @description
* Agent 处理完成后发送,告知服务端本次 Turn 已结束。
* stop_reason 可以是end_turn正常完成、cancelled被取消、error出错
*/
sendPromptResponse = (payload: PromptResponsePayload, guid?: string, userId?: string): void => {
const contentPreview = payload.content ? JSON.stringify(payload.content).substring(0, 200) : '(empty)';
console.log(`${this.logPrefix} [sendPromptResponse] sessionId=${payload.session_id}, promptId=${payload.prompt_id}, stopReason=${payload.stop_reason}, guid=${guid}, userId=${userId}, content=${contentPreview}`);
this.sendEnvelope("session.promptResponse", payload, guid, userId);
};
/**
* 建立 WebSocket 连接
* @description
* 使用 ws 库的 `new WebSocket(url)` 创建连接。
* ws 库会在内部自动完成 TCP 握手和 WebSocket 升级协议HTTP Upgrade
* 连接是异步建立的,实际连接成功会触发 "open" 事件。
*/
private connect = (): void => {
// url 为空时不进行连接,避免 new URL("") 抛出 TypeError
if (!this.config.url) {
console.error(`${this.logPrefix} wsUrl 未配置,跳过连接`);
this.state = "disconnected";
return;
}
// token 为空时不进行连接,避免无效请求
if (!this.config.token) {
console.error(`${this.logPrefix} token 为空,跳过 WebSocket 连接`);
this.state = "disconnected";
return;
}
this.state = "connecting";
console.error(`${this.logPrefix} 连接配置: url=${this.config.url}, token=${this.config.token.substring(0, 6) + '...'}, guid=${this.config.guid}, userId=${this.config.userId}`);
const wsUrl = this.buildConnectionUrl();
console.error(`${this.logPrefix} 正在连接: ${wsUrl}`);
try {
// new WebSocket(url) 立即返回,不会阻塞
// 连接过程在后台异步进行,通过事件通知结果
this.ws = new WebSocket(wsUrl);
this.setupEventHandlers();
} catch (error) {
// 同步错误(如 URL 格式非法)会在这里捕获
// 异步连接失败(如服务端拒绝)会触发 "error" 事件
console.error(`${this.logPrefix} 创建连接失败:`, error);
this.handleConnectionError(error instanceof Error ? error : new Error(String(error)));
}
};
/**
* 构建 WebSocket 连接 URL
* @description
* 使用 Node.js 内置的 `URL` 类(全局可用,无需 import构建带查询参数的 URL。
* `url.searchParams.set()` 会自动对参数值进行 URL 编码encodeURIComponent
* 避免特殊字符导致的 URL 解析问题。
*
* 最终格式ws://host:port/?token={token}
*/
private buildConnectionUrl = (): string => {
const url = new URL(this.config.url);
if (this.config.token) {
url.searchParams.set("token", this.config.token);
}
return url.toString();
};
/**
* 注册 ws 库的事件监听器
* @description
* ws 库使用 Node.js EventEmitter 风格的 `.on(event, handler)` 注册事件,
* 而非浏览器的 `.addEventListener(event, handler)`。
* 两者功能相同,但回调参数类型不同:
*
* | 事件 | 浏览器原生参数 | ws 库参数 |
* |---------|----------------------|----------------------------------|
* | open | Event | 无参数 |
* | message | MessageEvent | (data: RawData, isBinary: bool) |
* | close | CloseEvent | (code: number, reason: Buffer) |
* | error | Event | (error: Error) |
* | pong | 不支持 | 无参数ws 库特有) |
*/
private setupEventHandlers = (): void => {
if (!this.ws) return;
this.ws.on("open", this.handleOpen);
this.ws.on("message", this.handleRawMessage);
this.ws.on("close", this.handleClose);
this.ws.on("error", this.handleError);
// "pong" 是 ws 库特有的事件,当收到服务端的 pong 控制帧时触发
// 浏览器原生 WebSocket API 不暴露此事件
this.ws.on("pong", this.handlePong);
};
// ============================================
// 事件处理
// ============================================
/**
* 处理连接建立事件
* @description
* ws 库的 "open" 事件在 WebSocket 握手完成后触发,此时可以开始收发消息。
* 连接成功后:
* 1. 更新状态为 "connected"
* 2. 重置重连计数器
* 3. 重置 pong 时间戳
* 4. 启动心跳定时器
* 5. 启动系统唤醒检测
* 6. 触发 onConnected 回调
*/
private handleOpen = (): void => {
console.log(`${this.logPrefix} 连接成功`);
this.state = "connected";
this.reconnectAttempts = 0;
this.lastPongTime = Date.now();
this.startHeartbeat();
this.startWakeupDetection();
this.callbacks.onConnected?.();
};
/**
* 处理收到的原始消息
* @param data - ws 库的原始消息数据,类型为 `WebSocket.RawData`
* @description
* `WebSocket.RawData` 是 ws 库定义的联合类型:`Buffer | ArrayBuffer | Buffer[]`
* - 文本消息text frame通常是 Buffer 类型
* - 二进制消息binary frame可能是 Buffer 或 ArrayBuffer
*
* 处理步骤:
* 1. 将 RawData 转为字符串Buffer.toString() 默认使用 UTF-8 编码)
* 2. JSON.parse 解析为 AGPEnvelope 对象
* 3. 检查 msg_id 去重
* 4. 根据 method 字段分发到对应的回调
*/
private handleRawMessage = (data: WebSocket.RawData): void => {
try {
// Buffer.toString() 默认 UTF-8 编码,等同于 data.toString("utf8")
// 如果 data 已经是 string 类型(理论上 ws 库不会这样,但做兼容处理)
const raw = typeof data === "string" ? data : data.toString();
const envelope = JSON.parse(raw) as AGPEnvelope;
// 消息去重:同一个 msg_id 只处理一次
// 网络不稳定时服务端可能重发消息,通过 msg_id 避免重复处理
if (this.processedMsgIds.has(envelope.msg_id)) {
console.log(`${this.logPrefix} 重复消息,跳过: ${envelope.msg_id}`);
return;
}
this.processedMsgIds.add(envelope.msg_id);
console.log(`${this.logPrefix} 收到消息: method=${envelope.method}, msg_id=${envelope.msg_id}`);
// 根据 method 字段分发消息到对应的业务处理回调
switch (envelope.method) {
case "session.prompt":
// 下行:服务端下发用户指令,需要调用 Agent 处理
this.callbacks.onPrompt?.(envelope as PromptMessage);
break;
case "session.cancel":
// 下行:服务端要求取消正在处理的 Turn
this.callbacks.onCancel?.(envelope as CancelMessage);
break;
default:
console.warn(`${this.logPrefix} 未知消息类型: ${envelope.method}`);
}
} catch (error) {
console.error(`${this.logPrefix} 消息解析失败:`, error, '原始数据:', data);
this.callbacks.onError?.(
error instanceof Error ? error : new Error(`消息解析失败: ${String(error)}`)
);
}
};
/**
* 处理连接关闭事件
* @param code - WebSocket 关闭状态码RFC 6455 定义)
* 常见值:
* - 1000: 正常关闭
* - 1001: 端点离开(如服务端重启)
* - 1006: 异常关闭(连接被强制断开,无关闭握手)
* - 1008: 策略违规(如 token 不匹配)
* @param reason - 关闭原因ws 库中类型为 `Buffer`,需要调用 `.toString()` 转为字符串
* @description
* 注意ws 库的 close 事件参数与浏览器不同:
* - 浏览器:`(event: CloseEvent)` → 通过 event.code 和 event.reason 获取
* - ws 库:`(code: number, reason: Buffer)` → 直接获取reason 是 Buffer 需要转换
*
* 只有在非主动关闭state !== "disconnected")时才触发重连,
* 避免调用 stop() 后又自动重连。
*/
private handleClose = (code: number, reason: Buffer): void => {
// Buffer.toString() 将 Buffer 转为 UTF-8 字符串
// 如果 reason 为空 BuffertoString() 返回空字符串,此时用 code 作为描述
const reasonStr = reason.toString() || `code=${code}`;
console.log(`${this.logPrefix} 连接关闭: ${reasonStr}`);
this.clearHeartbeat();
this.clearWakeupDetection();
this.ws = null;
// 仅在非主动关闭的情况下尝试重连
// 主动调用 stop() 时会先将 state 设为 "disconnected",此处就不会触发重连
if (this.state !== "disconnected") {
this.callbacks.onDisconnected?.(reasonStr);
this.scheduleReconnect();
}
};
/**
* 处理 pong 控制帧
* @description
* 当服务端收到我们发送的 ping 帧后,会自动回复一个 pong 帧。
* ws 库会触发 "pong" 事件通知我们。
* 记录收到 pong 的时间戳,供心跳定时器检测连接是否假死。
* 如果长时间未收到 pong说明连接已不可用如电脑休眠导致 TCP 断开)。
*/
private handlePong = (): void => {
this.lastPongTime = Date.now();
};
/**
* 处理连接错误事件
* @param error - ws 库直接传递 Error 对象(浏览器原生 API 传递的是 Event 对象)
* @description
* ws 库的 "error" 事件在以下情况触发:
* - 连接被拒绝(如服务端不可达)
* - TLS 握手失败
* - 消息发送失败
* 注意error 事件之后通常会紧跟 close 事件,重连逻辑在 handleClose 中处理。
*/
private handleError = (error: Error): void => {
console.error(`${this.logPrefix} 连接错误:`, error);
this.callbacks.onError?.(error);
};
/**
* 处理连接创建时的同步错误
* @description
* 当 `new WebSocket(url)` 抛出同步异常时调用(如 URL 格式非法)。
* 此时不会触发 "error" 和 "close" 事件,需要手动触发重连。
*/
private handleConnectionError = (error: Error): void => {
this.callbacks.onError?.(error);
this.scheduleReconnect();
};
/**
* 安排下一次重连
* @description
* 使用指数退避Exponential Backoff策略计算重连延迟
* delay = min(reconnectInterval × 1.5^(attempts-1), 30000)
*
* 例如 reconnectInterval=3000 时:
* 第 1 次3000ms
* 第 2 次4500ms
* 第 3 次6750ms
* 第 4 次10125ms
* 第 5 次15187ms之后趋近 30000ms 上限)
*
* 指数退避的目的:避免服务端故障时大量客户端同时重连造成雪崩效应。
*
* `setTimeout` 是 Node.js 全局函数,在指定延迟后执行一次回调。
* 返回值是 Timeout 对象Node.js或 number浏览器
* 需要保存以便后续调用 clearTimeout 取消。
*/
private scheduleReconnect = (): void => {
// 检查是否超过最大重连次数0 表示无限重连)
if (
this.config.maxReconnectAttempts > 0 &&
this.reconnectAttempts >= this.config.maxReconnectAttempts
) {
console.error(`${this.logPrefix} 已达最大重连次数 (${this.config.maxReconnectAttempts}),停止重连`);
this.state = "disconnected";
return;
}
this.state = "reconnecting";
this.reconnectAttempts++;
// 指数退避:每次重连等待时间递增,最大 25 秒
const delay = Math.min(
this.config.reconnectInterval * Math.pow(1.5, this.reconnectAttempts - 1),
25000
);
console.log(`${this.logPrefix} ${delay}ms 后尝试第 ${this.reconnectAttempts} 次重连...`);
// setTimeout 返回的句柄保存到 reconnectTimer
// 以便在 stop() 或成功连接时通过 clearTimeout 取消待执行的重连
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect();
}, delay);
};
/**
* 清除重连定时器
* @description
* `clearTimeout` 是 Node.js 全局函数,取消由 setTimeout 创建的定时器。
* 如果定时器已执行或已被取消,调用 clearTimeout 不会报错(安全操作)。
*/
private clearReconnectTimer = (): void => {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
};
// ============================================
// 心跳保活
// ============================================
/**
* 启动心跳定时器
* @description
* 使用 `setInterval` 定期发送 WebSocket ping 控制帧,并检测 pong 超时。
*
* `ws.ping()` 发送 WebSocket 协议层的 ping 控制帧opcode=0x9
* 服务端必须自动回复 pong 帧。
*
* Pong 超时检测:
* 如果超过 2 倍心跳间隔仍未收到 pong判定连接已死如休眠后 TCP 已断),
* 主动 terminate 触发 close 事件 → 自动重连。
*
* Ping 失败处理:
* 如果 ping 发送抛异常(底层 socket 已关闭),也主动 terminate 触发重连。
*/
private startHeartbeat = (): void => {
this.clearHeartbeat();
this.heartbeatTimer = setInterval(() => {
if (this.ws && this.state === "connected") {
// 检测 pong 超时:超过 2 倍心跳间隔未收到 pong判定连接已死
const pongTimeout = this.config.heartbeatInterval * 2;
if (Date.now() - this.lastPongTime > pongTimeout) {
console.warn(`${this.logPrefix} pong 超时 (${pongTimeout}ms 未收到),判定连接已死,主动断开`);
this.ws.terminate();
return;
}
try {
// ws.ping() 发送 WebSocket 原生 ping 控制帧
this.ws.ping();
} catch {
console.warn(`${this.logPrefix} 心跳发送失败,主动断开触发重连`);
this.ws?.terminate();
}
}
}, this.config.heartbeatInterval);
};
/**
* 清除心跳定时器
* @description
* `clearInterval` 是 Node.js 全局函数,停止由 setInterval 创建的定时器。
* 在连接关闭或主动停止时调用,避免向已断开的连接发送 ping。
*/
private clearHeartbeat = (): void => {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
};
// ============================================
// 系统唤醒检测
// ============================================
/**
* 启动系统唤醒检测
* @description
* 电脑休眠时 setInterval 会被冻结,唤醒后恢复。
* 利用「两次 tick 之间实际经过的时间」远大于「setInterval 设定的间隔」来检测唤醒事件。
*
* 例如CHECK_INTERVAL = 5s但实际两次 tick 间隔了 60s → 说明系统休眠了约 55s。
* 此时 TCP 连接大概率已被服务端超时关闭,需要主动 terminate 触发重连。
*
* 同时重置重连计数器,确保唤醒后有足够的重连机会。
*/
private startWakeupDetection = (): void => {
this.clearWakeupDetection();
this.lastTickTime = Date.now();
const CHECK_INTERVAL = 5000; // 每 5 秒检查一次
const WAKEUP_THRESHOLD = 15000; // 实际间隔超过 15 秒视为休眠唤醒
this.wakeupCheckTimer = setInterval(() => {
const now = Date.now();
const elapsed = now - this.lastTickTime;
this.lastTickTime = now;
if (elapsed > WAKEUP_THRESHOLD) {
console.warn(`${this.logPrefix} 检测到系统唤醒 (tick 间隔 ${elapsed}ms阈值 ${WAKEUP_THRESHOLD}ms)`);
// 重置重连计数器,给予唤醒后充足的重连机会
this.reconnectAttempts = 0;
// 如果当前连接还标记为已连接,主动断开触发重连
if (this.ws && this.state === "connected") {
console.warn(`${this.logPrefix} 唤醒后主动断开连接,触发重连`);
this.ws.terminate();
}
}
}, CHECK_INTERVAL);
};
/**
* 清除系统唤醒检测定时器
*/
private clearWakeupDetection = (): void => {
if (this.wakeupCheckTimer) {
clearInterval(this.wakeupCheckTimer);
this.wakeupCheckTimer = null;
}
};
// ============================================
// 消息发送
// ============================================
/**
* 发送 AGP 信封消息(内部通用方法)
* @param method - AGP 消息类型(如 "session.update"、"session.promptResponse"
* @param payload - 消息载荷,泛型 T 由调用方决定具体类型
* @description
* 所有上行消息都通过此方法发送,统一处理:
* 1. 检查连接状态
* 2. 构建 AGP 信封(添加 msg_id等公共字段
* 3. JSON 序列化
* 4. 调用 ws.send() 发送文本帧
*
* `ws.send(data)` 是 ws 库的发送方法:
* - 传入 string发送文本帧opcode=0x1
* - 传入 Buffer/ArrayBuffer发送二进制帧opcode=0x2
* - 这里传入 JSON 字符串,发送文本帧
*
* `randomUUID()` 为每条消息生成唯一 ID服务端可用于去重和追踪。
*/
private sendEnvelope = <T>(method: AGPMethod, payload: T, guid?: string, userId?: string): void => {
if (!this.ws || this.state !== "connected") {
console.warn(`${this.logPrefix} 无法发送消息,当前状态: ${this.state}`);
return;
}
const envelope: AGPEnvelope<T> = {
msg_id: randomUUID(),
guid: guid ?? this.config.guid,
user_id: userId ?? this.config.userId,
method,
payload,
};
try {
const data = JSON.stringify(envelope);
// ws.send() 将字符串作为 WebSocket 文本帧发送
this.ws.send(data);
// 截断过长的 JSON 日志,避免日志文件膨胀
const jsonPreview = data.length > 500 ? data.substring(0, 500) + `...(truncated, total ${data.length} chars)` : data;
console.log(`${this.logPrefix} 发送消息: method=${method}, msg_id=${envelope.msg_id}, json=${jsonPreview}`);
} catch (error) {
console.error(`${this.logPrefix} 消息发送失败:`, error);
this.callbacks.onError?.(
error instanceof Error ? error : new Error(`消息发送失败: ${String(error)}`)
);
}
};
// ============================================
// 消息 ID 缓存清理
// ============================================
/**
* 启动消息 ID 缓存定期清理任务
* @description
* `processedMsgIds` 是一个 Set会随着消息的接收不断增长。
* 如果不清理,长时间运行后会占用大量内存(内存泄漏)。
*
* 清理策略:
* - 每 5 分钟检查一次
* - 当 Set 大小超过 MAX_MSG_ID_CACHE1000时触发清理
* - 清理时保留最新的一半500 条),丢弃最旧的一半
*
* 为什么保留最新的一半而不是全部清空?
* 因为刚处理过的消息 ID 最有可能被重发,保留它们可以继续防重。
*
* `[...this.processedMsgIds]` 将 Set 转为数组,
* Set 的迭代顺序是插入顺序,所以 slice(-500) 取的是最后插入的 500 条(最新的)。
*/
private startMsgIdCleanup = (): void => {
this.clearMsgIdCleanup();
this.msgIdCleanupTimer = setInterval(() => {
if (this.processedMsgIds.size > WechatAccessWebSocketClient.MAX_MSG_ID_CACHE) {
console.log(`${this.logPrefix} 清理消息 ID 缓存: ${this.processedMsgIds.size}${WechatAccessWebSocketClient.MAX_MSG_ID_CACHE / 2}`);
// 将 Set 转为数组(保持插入顺序),取后半部分(最新的),重建 Set
const entries = [...this.processedMsgIds];
this.processedMsgIds.clear();
entries.slice(-WechatAccessWebSocketClient.MAX_MSG_ID_CACHE / 2).forEach((id) => {
this.processedMsgIds.add(id);
});
}
}, 5 * 60 * 1000); // 每 5 分钟执行一次
};
/**
* 清除消息 ID 缓存清理定时器
*/
private clearMsgIdCleanup = (): void => {
if (this.msgIdCleanupTimer) {
clearInterval(this.msgIdCleanupTimer);
this.msgIdCleanupTimer = null;
}
};
}