diff --git a/src/gateway.ts b/src/gateway.ts index 7f332fd..db3db4b 100644 --- a/src/gateway.ts +++ b/src/gateway.ts @@ -1,6 +1,6 @@ import WebSocket from "ws"; import type { ResolvedQQBotAccount, WSPayload, C2CMessageEvent, GuildMessageEvent, GroupMessageEvent } from "./types.js"; -import { getAccessToken, getGatewayUrl, sendC2CMessage, sendChannelMessage, sendGroupMessage } from "./api.js"; +import { getAccessToken, getGatewayUrl, sendC2CMessage, sendChannelMessage, sendGroupMessage, clearTokenCache } from "./api.js"; import { getQQBotRuntime } from "./runtime.js"; // QQ Bot intents @@ -10,6 +10,10 @@ const INTENTS = { GROUP_AND_C2C: 1 << 25, // 群聊和 C2C 私聊 }; +// 重连配置 +const RECONNECT_DELAYS = [1000, 2000, 5000, 10000, 30000, 60000]; // 递增延迟 +const MAX_RECONNECT_ATTEMPTS = 100; + export interface GatewayContext { account: ResolvedQQBotAccount; abortSignal: AbortSignal; @@ -24,7 +28,7 @@ export interface GatewayContext { } /** - * 启动 Gateway WebSocket 连接 + * 启动 Gateway WebSocket 连接(带自动重连) */ export async function startGateway(ctx: GatewayContext): Promise { const { account, abortSignal, cfg, onReady, onError, log } = ctx; @@ -33,256 +37,404 @@ export async function startGateway(ctx: GatewayContext): Promise { throw new Error("QQBot not configured (missing appId or clientSecret)"); } - const pluginRuntime = getQQBotRuntime(); - const accessToken = await getAccessToken(account.appId, account.clientSecret); - const gatewayUrl = await getGatewayUrl(accessToken); - - log?.info(`[qqbot:${account.accountId}] Connecting to ${gatewayUrl}`); - - const ws = new WebSocket(gatewayUrl); + let reconnectAttempts = 0; + let isAborted = false; + let currentWs: WebSocket | null = null; let heartbeatInterval: ReturnType | null = null; + let sessionId: string | null = null; let lastSeq: number | null = null; + abortSignal.addEventListener("abort", () => { + isAborted = true; + cleanup(); + }); + const cleanup = () => { if (heartbeatInterval) { clearInterval(heartbeatInterval); heartbeatInterval = null; } - if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) { - ws.close(); + if (currentWs && (currentWs.readyState === WebSocket.OPEN || currentWs.readyState === WebSocket.CONNECTING)) { + currentWs.close(); } + currentWs = null; }; - abortSignal.addEventListener("abort", cleanup); - - // 处理收到的消息 - const handleMessage = async (event: { - type: "c2c" | "guild" | "dm" | "group"; - senderId: string; - senderName?: string; - content: string; - messageId: string; - timestamp: string; - channelId?: string; - guildId?: string; - groupOpenid?: string; - }) => { - log?.info(`[qqbot:${account.accountId}] Processing message from ${event.senderId}: ${event.content}`); - - pluginRuntime.channel.activity.record({ - channel: "qqbot", - accountId: account.accountId, - direction: "inbound", - }); - - const isGroup = event.type === "guild" || event.type === "group"; - const peerId = event.type === "guild" ? `channel:${event.channelId}` - : event.type === "group" ? `group:${event.groupOpenid}` - : event.senderId; - - const route = pluginRuntime.channel.routing.resolveAgentRoute({ - cfg, - channel: "qqbot", - accountId: account.accountId, - peer: { - kind: isGroup ? "group" : "dm", - id: peerId, - }, - }); - - const envelopeOptions = pluginRuntime.channel.reply.resolveEnvelopeFormatOptions(cfg); - - const body = pluginRuntime.channel.reply.formatInboundEnvelope({ - channel: "QQBot", - from: event.senderName ?? event.senderId, - timestamp: new Date(event.timestamp).getTime(), - body: event.content, - chatType: isGroup ? "group" : "direct", - sender: { - id: event.senderId, - name: event.senderName, - }, - envelope: envelopeOptions, - }); - - const fromAddress = event.type === "guild" ? `qqbot:channel:${event.channelId}` - : event.type === "group" ? `qqbot:group:${event.groupOpenid}` - : `qqbot:${event.senderId}`; - const toAddress = fromAddress; - - const ctxPayload = pluginRuntime.channel.reply.finalizeInboundContext({ - Body: body, - RawBody: event.content, - CommandBody: event.content, - From: fromAddress, - To: toAddress, - SessionKey: route.sessionKey, - AccountId: route.accountId, - ChatType: isGroup ? "group" : "direct", - SenderId: event.senderId, - SenderName: event.senderName, - Provider: "qqbot", - Surface: "qqbot", - MessageSid: event.messageId, - Timestamp: new Date(event.timestamp).getTime(), - OriginatingChannel: "qqbot", - OriginatingTo: toAddress, - // QQBot 特有字段 - QQChannelId: event.channelId, - QQGuildId: event.guildId, - QQGroupOpenid: event.groupOpenid, - }); - - // 分发到 AI 系统 - try { - const messagesConfig = pluginRuntime.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId); - - await pluginRuntime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ - ctx: ctxPayload, - cfg, - dispatcherOptions: { - responsePrefix: messagesConfig.responsePrefix, - deliver: async (payload: { text?: string }) => { - const replyText = payload.text ?? ""; - if (!replyText.trim()) return; - - try { - if (event.type === "c2c") { - await sendC2CMessage(accessToken, event.senderId, replyText, event.messageId); - } else if (event.type === "group" && event.groupOpenid) { - await sendGroupMessage(accessToken, event.groupOpenid, replyText, event.messageId); - } else if (event.channelId) { - await sendChannelMessage(accessToken, event.channelId, replyText, event.messageId); - } - log?.info(`[qqbot:${account.accountId}] Sent reply`); - - pluginRuntime.channel.activity.record({ - channel: "qqbot", - accountId: account.accountId, - direction: "outbound", - }); - } catch (err) { - log?.error(`[qqbot:${account.accountId}] Send failed: ${err}`); - } - }, - onError: (err: unknown) => { - log?.error(`[qqbot:${account.accountId}] Dispatch error: ${err}`); - }, - }, - replyOptions: {}, - }); - } catch (err) { - log?.error(`[qqbot:${account.accountId}] Message processing failed: ${err}`); - } + const getReconnectDelay = () => { + const idx = Math.min(reconnectAttempts, RECONNECT_DELAYS.length - 1); + return RECONNECT_DELAYS[idx]; }; - ws.on("open", () => { - log?.info(`[qqbot:${account.accountId}] WebSocket connected`); - }); + const scheduleReconnect = () => { + if (isAborted || reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { + log?.error(`[qqbot:${account.accountId}] Max reconnect attempts reached or aborted`); + return; + } - ws.on("message", async (data) => { - try { - const payload = JSON.parse(data.toString()) as WSPayload; - const { op, d, s, t } = payload; + const delay = getReconnectDelay(); + reconnectAttempts++; + log?.info(`[qqbot:${account.accountId}] Reconnecting in ${delay}ms (attempt ${reconnectAttempts})`); - if (s) lastSeq = s; - - log?.debug?.(`[qqbot:${account.accountId}] Received op=${op} t=${t}`); - - switch (op) { - case 10: // Hello - log?.info(`[qqbot:${account.accountId}] Hello received, starting heartbeat`); - // Identify - ws.send( - JSON.stringify({ - op: 2, - d: { - token: `QQBot ${accessToken}`, - intents: INTENTS.PUBLIC_GUILD_MESSAGES | INTENTS.DIRECT_MESSAGE | INTENTS.GROUP_AND_C2C, - shard: [0, 1], - }, - }) - ); - // Heartbeat - const interval = (d as { heartbeat_interval: number }).heartbeat_interval; - heartbeatInterval = setInterval(() => { - ws.send(JSON.stringify({ op: 1, d: lastSeq })); - }, interval); - break; - - case 0: // Dispatch - if (t === "READY") { - log?.info(`[qqbot:${account.accountId}] Ready`); - onReady?.(d); - } else if (t === "C2C_MESSAGE_CREATE") { - const event = d as C2CMessageEvent; - await handleMessage({ - type: "c2c", - senderId: event.author.user_openid, - content: event.content, - messageId: event.id, - timestamp: event.timestamp, - }); - } else if (t === "AT_MESSAGE_CREATE") { - const event = d as GuildMessageEvent; - await handleMessage({ - type: "guild", - senderId: event.author.id, - senderName: event.author.username, - content: event.content, - messageId: event.id, - timestamp: event.timestamp, - channelId: event.channel_id, - guildId: event.guild_id, - }); - } else if (t === "DIRECT_MESSAGE_CREATE") { - const event = d as GuildMessageEvent; - await handleMessage({ - type: "dm", - senderId: event.author.id, - senderName: event.author.username, - content: event.content, - messageId: event.id, - timestamp: event.timestamp, - guildId: event.guild_id, - }); - } else if (t === "GROUP_AT_MESSAGE_CREATE") { - const event = d as GroupMessageEvent; - await handleMessage({ - type: "group", - senderId: event.author.member_openid, - content: event.content, - messageId: event.id, - timestamp: event.timestamp, - groupOpenid: event.group_openid, - }); - } - break; - - case 11: // Heartbeat ACK - log?.debug?.(`[qqbot:${account.accountId}] Heartbeat ACK`); - break; - - case 9: // Invalid Session - log?.error(`[qqbot:${account.accountId}] Invalid session`); - onError?.(new Error("Invalid session")); - cleanup(); - break; + setTimeout(() => { + if (!isAborted) { + connect(); } + }, delay); + }; + + const connect = async () => { + try { + cleanup(); + + // 刷新 token(可能过期了) + clearTokenCache(); + const accessToken = await getAccessToken(account.appId, account.clientSecret); + const gatewayUrl = await getGatewayUrl(accessToken); + + log?.info(`[qqbot:${account.accountId}] Connecting to ${gatewayUrl}`); + + const ws = new WebSocket(gatewayUrl); + currentWs = ws; + + const pluginRuntime = getQQBotRuntime(); + + // 处理收到的消息 + const handleMessage = async (event: { + type: "c2c" | "guild" | "dm" | "group"; + senderId: string; + senderName?: string; + content: string; + messageId: string; + timestamp: string; + channelId?: string; + guildId?: string; + groupOpenid?: string; + }) => { + log?.info(`[qqbot:${account.accountId}] Processing message from ${event.senderId}: ${event.content}`); + + pluginRuntime.channel.activity.record({ + channel: "qqbot", + accountId: account.accountId, + direction: "inbound", + }); + + const isGroup = event.type === "guild" || event.type === "group"; + const peerId = event.type === "guild" ? `channel:${event.channelId}` + : event.type === "group" ? `group:${event.groupOpenid}` + : event.senderId; + + const route = pluginRuntime.channel.routing.resolveAgentRoute({ + cfg, + channel: "qqbot", + accountId: account.accountId, + peer: { + kind: isGroup ? "group" : "dm", + id: peerId, + }, + }); + + const envelopeOptions = pluginRuntime.channel.reply.resolveEnvelopeFormatOptions(cfg); + + const body = pluginRuntime.channel.reply.formatInboundEnvelope({ + channel: "QQBot", + from: event.senderName ?? event.senderId, + timestamp: new Date(event.timestamp).getTime(), + body: event.content, + chatType: isGroup ? "group" : "direct", + sender: { + id: event.senderId, + name: event.senderName, + }, + envelope: envelopeOptions, + }); + + const fromAddress = event.type === "guild" ? `qqbot:channel:${event.channelId}` + : event.type === "group" ? `qqbot:group:${event.groupOpenid}` + : `qqbot:${event.senderId}`; + const toAddress = fromAddress; + + const ctxPayload = pluginRuntime.channel.reply.finalizeInboundContext({ + Body: body, + RawBody: event.content, + CommandBody: event.content, + From: fromAddress, + To: toAddress, + SessionKey: route.sessionKey, + AccountId: route.accountId, + ChatType: isGroup ? "group" : "direct", + SenderId: event.senderId, + SenderName: event.senderName, + Provider: "qqbot", + Surface: "qqbot", + MessageSid: event.messageId, + Timestamp: new Date(event.timestamp).getTime(), + OriginatingChannel: "qqbot", + OriginatingTo: toAddress, + QQChannelId: event.channelId, + QQGuildId: event.guildId, + QQGroupOpenid: event.groupOpenid, + }); + + // 发送错误提示的辅助函数 + const sendErrorMessage = async (errorText: string) => { + try { + const token = await getAccessToken(account.appId, account.clientSecret); + if (event.type === "c2c") { + await sendC2CMessage(token, event.senderId, errorText, event.messageId); + } else if (event.type === "group" && event.groupOpenid) { + await sendGroupMessage(token, event.groupOpenid, errorText, event.messageId); + } else if (event.channelId) { + await sendChannelMessage(token, event.channelId, errorText, event.messageId); + } + } catch (sendErr) { + log?.error(`[qqbot:${account.accountId}] Failed to send error message: ${sendErr}`); + } + }; + + try { + const messagesConfig = pluginRuntime.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId); + + // 每次发消息前刷新 token + const freshToken = await getAccessToken(account.appId, account.clientSecret); + + // 追踪是否有响应 + let hasResponse = false; + const responseTimeout = 30000; // 30秒超时 + let timeoutId: ReturnType | null = null; + + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + if (!hasResponse) { + reject(new Error("Response timeout")); + } + }, responseTimeout); + }); + + const dispatchPromise = pluginRuntime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg, + dispatcherOptions: { + responsePrefix: messagesConfig.responsePrefix, + deliver: async (payload: { text?: string }) => { + hasResponse = true; + if (timeoutId) { + clearTimeout(timeoutId); + timeoutId = null; + } + + const replyText = payload.text ?? ""; + if (!replyText.trim()) return; + + try { + if (event.type === "c2c") { + await sendC2CMessage(freshToken, event.senderId, replyText, event.messageId); + } else if (event.type === "group" && event.groupOpenid) { + await sendGroupMessage(freshToken, event.groupOpenid, replyText, event.messageId); + } else if (event.channelId) { + await sendChannelMessage(freshToken, event.channelId, replyText, event.messageId); + } + log?.info(`[qqbot:${account.accountId}] Sent reply`); + + pluginRuntime.channel.activity.record({ + channel: "qqbot", + accountId: account.accountId, + direction: "outbound", + }); + } catch (err) { + log?.error(`[qqbot:${account.accountId}] Send failed: ${err}`); + } + }, + onError: async (err: unknown) => { + log?.error(`[qqbot:${account.accountId}] Dispatch error: ${err}`); + hasResponse = true; + if (timeoutId) { + clearTimeout(timeoutId); + timeoutId = null; + } + // 发送错误提示给用户 + const errMsg = String(err); + if (errMsg.includes("401") || errMsg.includes("key") || errMsg.includes("auth")) { + await sendErrorMessage("[ClawdBot] 大模型 API Key 可能无效,请检查配置"); + } else { + await sendErrorMessage(`[ClawdBot] 处理消息时出错: ${errMsg.slice(0, 100)}`); + } + }, + }, + replyOptions: {}, + }); + + // 等待分发完成或超时 + try { + await Promise.race([dispatchPromise, timeoutPromise]); + } catch (err) { + if (timeoutId) { + clearTimeout(timeoutId); + } + if (!hasResponse) { + log?.error(`[qqbot:${account.accountId}] No response within timeout`); + await sendErrorMessage("[ClawdBot] 未收到响应,请检查大模型 API Key 是否正确配置"); + } + } + } catch (err) { + log?.error(`[qqbot:${account.accountId}] Message processing failed: ${err}`); + await sendErrorMessage(`[ClawdBot] 处理消息失败: ${String(err).slice(0, 100)}`); + } + }; + + ws.on("open", () => { + log?.info(`[qqbot:${account.accountId}] WebSocket connected`); + reconnectAttempts = 0; // 连接成功,重置重试计数 + }); + + ws.on("message", async (data) => { + try { + const payload = JSON.parse(data.toString()) as WSPayload; + const { op, d, s, t } = payload; + + if (s) lastSeq = s; + + log?.debug?.(`[qqbot:${account.accountId}] Received op=${op} t=${t}`); + + switch (op) { + case 10: // Hello + log?.info(`[qqbot:${account.accountId}] Hello received`); + + // 如果有 session_id,尝试 Resume + if (sessionId && lastSeq !== null) { + log?.info(`[qqbot:${account.accountId}] Attempting to resume session ${sessionId}`); + ws.send(JSON.stringify({ + op: 6, // Resume + d: { + token: `QQBot ${accessToken}`, + session_id: sessionId, + seq: lastSeq, + }, + })); + } else { + // 新连接,发送 Identify + ws.send(JSON.stringify({ + op: 2, + d: { + token: `QQBot ${accessToken}`, + intents: INTENTS.PUBLIC_GUILD_MESSAGES | INTENTS.DIRECT_MESSAGE | INTENTS.GROUP_AND_C2C, + shard: [0, 1], + }, + })); + } + + // 启动心跳 + const interval = (d as { heartbeat_interval: number }).heartbeat_interval; + if (heartbeatInterval) clearInterval(heartbeatInterval); + heartbeatInterval = setInterval(() => { + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ op: 1, d: lastSeq })); + log?.debug?.(`[qqbot:${account.accountId}] Heartbeat sent`); + } + }, interval); + break; + + case 0: // Dispatch + if (t === "READY") { + const readyData = d as { session_id: string }; + sessionId = readyData.session_id; + log?.info(`[qqbot:${account.accountId}] Ready, session: ${sessionId}`); + onReady?.(d); + } else if (t === "RESUMED") { + log?.info(`[qqbot:${account.accountId}] Session resumed`); + } else if (t === "C2C_MESSAGE_CREATE") { + const event = d as C2CMessageEvent; + await handleMessage({ + type: "c2c", + senderId: event.author.user_openid, + content: event.content, + messageId: event.id, + timestamp: event.timestamp, + }); + } else if (t === "AT_MESSAGE_CREATE") { + const event = d as GuildMessageEvent; + await handleMessage({ + type: "guild", + senderId: event.author.id, + senderName: event.author.username, + content: event.content, + messageId: event.id, + timestamp: event.timestamp, + channelId: event.channel_id, + guildId: event.guild_id, + }); + } else if (t === "DIRECT_MESSAGE_CREATE") { + const event = d as GuildMessageEvent; + await handleMessage({ + type: "dm", + senderId: event.author.id, + senderName: event.author.username, + content: event.content, + messageId: event.id, + timestamp: event.timestamp, + guildId: event.guild_id, + }); + } else if (t === "GROUP_AT_MESSAGE_CREATE") { + const event = d as GroupMessageEvent; + await handleMessage({ + type: "group", + senderId: event.author.member_openid, + content: event.content, + messageId: event.id, + timestamp: event.timestamp, + groupOpenid: event.group_openid, + }); + } + break; + + case 11: // Heartbeat ACK + log?.debug?.(`[qqbot:${account.accountId}] Heartbeat ACK`); + break; + + case 7: // Reconnect + log?.info(`[qqbot:${account.accountId}] Server requested reconnect`); + cleanup(); + scheduleReconnect(); + break; + + case 9: // Invalid Session + const canResume = d as boolean; + log?.error(`[qqbot:${account.accountId}] Invalid session, can resume: ${canResume}`); + if (!canResume) { + sessionId = null; + lastSeq = null; + } + cleanup(); + scheduleReconnect(); + break; + } + } catch (err) { + log?.error(`[qqbot:${account.accountId}] Message parse error: ${err}`); + } + }); + + ws.on("close", (code, reason) => { + log?.info(`[qqbot:${account.accountId}] WebSocket closed: ${code} ${reason.toString()}`); + cleanup(); + + // 非正常关闭则重连 + if (!isAborted && code !== 1000) { + scheduleReconnect(); + } + }); + + ws.on("error", (err) => { + log?.error(`[qqbot:${account.accountId}] WebSocket error: ${err.message}`); + onError?.(err); + }); + } catch (err) { - log?.error(`[qqbot:${account.accountId}] Message parse error: ${err}`); + log?.error(`[qqbot:${account.accountId}] Connection failed: ${err}`); + scheduleReconnect(); } - }); + }; - ws.on("close", (code, reason) => { - log?.info(`[qqbot:${account.accountId}] WebSocket closed: ${code} ${reason}`); - cleanup(); - }); - - ws.on("error", (err) => { - log?.error(`[qqbot:${account.accountId}] WebSocket error: ${err.message}`); - onError?.(err); - }); + // 开始连接 + await connect(); // 等待 abort 信号 return new Promise((resolve) => {