This commit is contained in:
sliverp
2026-01-29 10:11:31 +08:00
parent c0688b1a24
commit 7098f9f007

View File

@@ -1,6 +1,6 @@
import WebSocket from "ws";
import type { ResolvedQQBotAccount, WSPayload, C2CMessageEvent, GuildMessageEvent, GroupMessageEvent } from "./types.js";
import { getAccessToken, getGatewayUrl, sendC2CMessage, sendChannelMessage, sendGroupMessage } from "./api.js";
import { getAccessToken, getGatewayUrl, sendC2CMessage, sendChannelMessage, sendGroupMessage, clearTokenCache } from "./api.js";
import { getQQBotRuntime } from "./runtime.js";
// QQ Bot intents
@@ -10,6 +10,10 @@ const INTENTS = {
GROUP_AND_C2C: 1 << 25, // 群聊和 C2C 私聊
};
// 重连配置
const RECONNECT_DELAYS = [1000, 2000, 5000, 10000, 30000, 60000]; // 递增延迟
const MAX_RECONNECT_ATTEMPTS = 100;
export interface GatewayContext {
account: ResolvedQQBotAccount;
abortSignal: AbortSignal;
@@ -24,7 +28,7 @@ export interface GatewayContext {
}
/**
* 启动 Gateway WebSocket 连接
* 启动 Gateway WebSocket 连接(带自动重连)
*/
export async function startGateway(ctx: GatewayContext): Promise<void> {
const { account, abortSignal, cfg, onReady, onError, log } = ctx;
@@ -33,256 +37,404 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
throw new Error("QQBot not configured (missing appId or clientSecret)");
}
const pluginRuntime = getQQBotRuntime();
const accessToken = await getAccessToken(account.appId, account.clientSecret);
const gatewayUrl = await getGatewayUrl(accessToken);
log?.info(`[qqbot:${account.accountId}] Connecting to ${gatewayUrl}`);
const ws = new WebSocket(gatewayUrl);
let reconnectAttempts = 0;
let isAborted = false;
let currentWs: WebSocket | null = null;
let heartbeatInterval: ReturnType<typeof setInterval> | null = null;
let sessionId: string | null = null;
let lastSeq: number | null = null;
abortSignal.addEventListener("abort", () => {
isAborted = true;
cleanup();
});
const cleanup = () => {
if (heartbeatInterval) {
clearInterval(heartbeatInterval);
heartbeatInterval = null;
}
if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) {
ws.close();
if (currentWs && (currentWs.readyState === WebSocket.OPEN || currentWs.readyState === WebSocket.CONNECTING)) {
currentWs.close();
}
currentWs = null;
};
abortSignal.addEventListener("abort", cleanup);
// 处理收到的消息
const handleMessage = async (event: {
type: "c2c" | "guild" | "dm" | "group";
senderId: string;
senderName?: string;
content: string;
messageId: string;
timestamp: string;
channelId?: string;
guildId?: string;
groupOpenid?: string;
}) => {
log?.info(`[qqbot:${account.accountId}] Processing message from ${event.senderId}: ${event.content}`);
pluginRuntime.channel.activity.record({
channel: "qqbot",
accountId: account.accountId,
direction: "inbound",
});
const isGroup = event.type === "guild" || event.type === "group";
const peerId = event.type === "guild" ? `channel:${event.channelId}`
: event.type === "group" ? `group:${event.groupOpenid}`
: event.senderId;
const route = pluginRuntime.channel.routing.resolveAgentRoute({
cfg,
channel: "qqbot",
accountId: account.accountId,
peer: {
kind: isGroup ? "group" : "dm",
id: peerId,
},
});
const envelopeOptions = pluginRuntime.channel.reply.resolveEnvelopeFormatOptions(cfg);
const body = pluginRuntime.channel.reply.formatInboundEnvelope({
channel: "QQBot",
from: event.senderName ?? event.senderId,
timestamp: new Date(event.timestamp).getTime(),
body: event.content,
chatType: isGroup ? "group" : "direct",
sender: {
id: event.senderId,
name: event.senderName,
},
envelope: envelopeOptions,
});
const fromAddress = event.type === "guild" ? `qqbot:channel:${event.channelId}`
: event.type === "group" ? `qqbot:group:${event.groupOpenid}`
: `qqbot:${event.senderId}`;
const toAddress = fromAddress;
const ctxPayload = pluginRuntime.channel.reply.finalizeInboundContext({
Body: body,
RawBody: event.content,
CommandBody: event.content,
From: fromAddress,
To: toAddress,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
SenderId: event.senderId,
SenderName: event.senderName,
Provider: "qqbot",
Surface: "qqbot",
MessageSid: event.messageId,
Timestamp: new Date(event.timestamp).getTime(),
OriginatingChannel: "qqbot",
OriginatingTo: toAddress,
// QQBot 特有字段
QQChannelId: event.channelId,
QQGuildId: event.guildId,
QQGroupOpenid: event.groupOpenid,
});
// 分发到 AI 系统
try {
const messagesConfig = pluginRuntime.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId);
await pluginRuntime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
responsePrefix: messagesConfig.responsePrefix,
deliver: async (payload: { text?: string }) => {
const replyText = payload.text ?? "";
if (!replyText.trim()) return;
try {
if (event.type === "c2c") {
await sendC2CMessage(accessToken, event.senderId, replyText, event.messageId);
} else if (event.type === "group" && event.groupOpenid) {
await sendGroupMessage(accessToken, event.groupOpenid, replyText, event.messageId);
} else if (event.channelId) {
await sendChannelMessage(accessToken, event.channelId, replyText, event.messageId);
}
log?.info(`[qqbot:${account.accountId}] Sent reply`);
pluginRuntime.channel.activity.record({
channel: "qqbot",
accountId: account.accountId,
direction: "outbound",
});
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Send failed: ${err}`);
}
},
onError: (err: unknown) => {
log?.error(`[qqbot:${account.accountId}] Dispatch error: ${err}`);
},
},
replyOptions: {},
});
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Message processing failed: ${err}`);
}
const getReconnectDelay = () => {
const idx = Math.min(reconnectAttempts, RECONNECT_DELAYS.length - 1);
return RECONNECT_DELAYS[idx];
};
ws.on("open", () => {
log?.info(`[qqbot:${account.accountId}] WebSocket connected`);
});
const scheduleReconnect = () => {
if (isAborted || reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
log?.error(`[qqbot:${account.accountId}] Max reconnect attempts reached or aborted`);
return;
}
ws.on("message", async (data) => {
try {
const payload = JSON.parse(data.toString()) as WSPayload;
const { op, d, s, t } = payload;
const delay = getReconnectDelay();
reconnectAttempts++;
log?.info(`[qqbot:${account.accountId}] Reconnecting in ${delay}ms (attempt ${reconnectAttempts})`);
if (s) lastSeq = s;
log?.debug?.(`[qqbot:${account.accountId}] Received op=${op} t=${t}`);
switch (op) {
case 10: // Hello
log?.info(`[qqbot:${account.accountId}] Hello received, starting heartbeat`);
// Identify
ws.send(
JSON.stringify({
op: 2,
d: {
token: `QQBot ${accessToken}`,
intents: INTENTS.PUBLIC_GUILD_MESSAGES | INTENTS.DIRECT_MESSAGE | INTENTS.GROUP_AND_C2C,
shard: [0, 1],
},
})
);
// Heartbeat
const interval = (d as { heartbeat_interval: number }).heartbeat_interval;
heartbeatInterval = setInterval(() => {
ws.send(JSON.stringify({ op: 1, d: lastSeq }));
}, interval);
break;
case 0: // Dispatch
if (t === "READY") {
log?.info(`[qqbot:${account.accountId}] Ready`);
onReady?.(d);
} else if (t === "C2C_MESSAGE_CREATE") {
const event = d as C2CMessageEvent;
await handleMessage({
type: "c2c",
senderId: event.author.user_openid,
content: event.content,
messageId: event.id,
timestamp: event.timestamp,
});
} else if (t === "AT_MESSAGE_CREATE") {
const event = d as GuildMessageEvent;
await handleMessage({
type: "guild",
senderId: event.author.id,
senderName: event.author.username,
content: event.content,
messageId: event.id,
timestamp: event.timestamp,
channelId: event.channel_id,
guildId: event.guild_id,
});
} else if (t === "DIRECT_MESSAGE_CREATE") {
const event = d as GuildMessageEvent;
await handleMessage({
type: "dm",
senderId: event.author.id,
senderName: event.author.username,
content: event.content,
messageId: event.id,
timestamp: event.timestamp,
guildId: event.guild_id,
});
} else if (t === "GROUP_AT_MESSAGE_CREATE") {
const event = d as GroupMessageEvent;
await handleMessage({
type: "group",
senderId: event.author.member_openid,
content: event.content,
messageId: event.id,
timestamp: event.timestamp,
groupOpenid: event.group_openid,
});
}
break;
case 11: // Heartbeat ACK
log?.debug?.(`[qqbot:${account.accountId}] Heartbeat ACK`);
break;
case 9: // Invalid Session
log?.error(`[qqbot:${account.accountId}] Invalid session`);
onError?.(new Error("Invalid session"));
cleanup();
break;
setTimeout(() => {
if (!isAborted) {
connect();
}
}, delay);
};
const connect = async () => {
try {
cleanup();
// 刷新 token可能过期了
clearTokenCache();
const accessToken = await getAccessToken(account.appId, account.clientSecret);
const gatewayUrl = await getGatewayUrl(accessToken);
log?.info(`[qqbot:${account.accountId}] Connecting to ${gatewayUrl}`);
const ws = new WebSocket(gatewayUrl);
currentWs = ws;
const pluginRuntime = getQQBotRuntime();
// 处理收到的消息
const handleMessage = async (event: {
type: "c2c" | "guild" | "dm" | "group";
senderId: string;
senderName?: string;
content: string;
messageId: string;
timestamp: string;
channelId?: string;
guildId?: string;
groupOpenid?: string;
}) => {
log?.info(`[qqbot:${account.accountId}] Processing message from ${event.senderId}: ${event.content}`);
pluginRuntime.channel.activity.record({
channel: "qqbot",
accountId: account.accountId,
direction: "inbound",
});
const isGroup = event.type === "guild" || event.type === "group";
const peerId = event.type === "guild" ? `channel:${event.channelId}`
: event.type === "group" ? `group:${event.groupOpenid}`
: event.senderId;
const route = pluginRuntime.channel.routing.resolveAgentRoute({
cfg,
channel: "qqbot",
accountId: account.accountId,
peer: {
kind: isGroup ? "group" : "dm",
id: peerId,
},
});
const envelopeOptions = pluginRuntime.channel.reply.resolveEnvelopeFormatOptions(cfg);
const body = pluginRuntime.channel.reply.formatInboundEnvelope({
channel: "QQBot",
from: event.senderName ?? event.senderId,
timestamp: new Date(event.timestamp).getTime(),
body: event.content,
chatType: isGroup ? "group" : "direct",
sender: {
id: event.senderId,
name: event.senderName,
},
envelope: envelopeOptions,
});
const fromAddress = event.type === "guild" ? `qqbot:channel:${event.channelId}`
: event.type === "group" ? `qqbot:group:${event.groupOpenid}`
: `qqbot:${event.senderId}`;
const toAddress = fromAddress;
const ctxPayload = pluginRuntime.channel.reply.finalizeInboundContext({
Body: body,
RawBody: event.content,
CommandBody: event.content,
From: fromAddress,
To: toAddress,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
SenderId: event.senderId,
SenderName: event.senderName,
Provider: "qqbot",
Surface: "qqbot",
MessageSid: event.messageId,
Timestamp: new Date(event.timestamp).getTime(),
OriginatingChannel: "qqbot",
OriginatingTo: toAddress,
QQChannelId: event.channelId,
QQGuildId: event.guildId,
QQGroupOpenid: event.groupOpenid,
});
// 发送错误提示的辅助函数
const sendErrorMessage = async (errorText: string) => {
try {
const token = await getAccessToken(account.appId, account.clientSecret);
if (event.type === "c2c") {
await sendC2CMessage(token, event.senderId, errorText, event.messageId);
} else if (event.type === "group" && event.groupOpenid) {
await sendGroupMessage(token, event.groupOpenid, errorText, event.messageId);
} else if (event.channelId) {
await sendChannelMessage(token, event.channelId, errorText, event.messageId);
}
} catch (sendErr) {
log?.error(`[qqbot:${account.accountId}] Failed to send error message: ${sendErr}`);
}
};
try {
const messagesConfig = pluginRuntime.channel.reply.resolveEffectiveMessagesConfig(cfg, route.agentId);
// 每次发消息前刷新 token
const freshToken = await getAccessToken(account.appId, account.clientSecret);
// 追踪是否有响应
let hasResponse = false;
const responseTimeout = 30000; // 30秒超时
let timeoutId: ReturnType<typeof setTimeout> | null = null;
const timeoutPromise = new Promise<void>((_, reject) => {
timeoutId = setTimeout(() => {
if (!hasResponse) {
reject(new Error("Response timeout"));
}
}, responseTimeout);
});
const dispatchPromise = pluginRuntime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
responsePrefix: messagesConfig.responsePrefix,
deliver: async (payload: { text?: string }) => {
hasResponse = true;
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
const replyText = payload.text ?? "";
if (!replyText.trim()) return;
try {
if (event.type === "c2c") {
await sendC2CMessage(freshToken, event.senderId, replyText, event.messageId);
} else if (event.type === "group" && event.groupOpenid) {
await sendGroupMessage(freshToken, event.groupOpenid, replyText, event.messageId);
} else if (event.channelId) {
await sendChannelMessage(freshToken, event.channelId, replyText, event.messageId);
}
log?.info(`[qqbot:${account.accountId}] Sent reply`);
pluginRuntime.channel.activity.record({
channel: "qqbot",
accountId: account.accountId,
direction: "outbound",
});
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Send failed: ${err}`);
}
},
onError: async (err: unknown) => {
log?.error(`[qqbot:${account.accountId}] Dispatch error: ${err}`);
hasResponse = true;
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
// 发送错误提示给用户
const errMsg = String(err);
if (errMsg.includes("401") || errMsg.includes("key") || errMsg.includes("auth")) {
await sendErrorMessage("[ClawdBot] 大模型 API Key 可能无效,请检查配置");
} else {
await sendErrorMessage(`[ClawdBot] 处理消息时出错: ${errMsg.slice(0, 100)}`);
}
},
},
replyOptions: {},
});
// 等待分发完成或超时
try {
await Promise.race([dispatchPromise, timeoutPromise]);
} catch (err) {
if (timeoutId) {
clearTimeout(timeoutId);
}
if (!hasResponse) {
log?.error(`[qqbot:${account.accountId}] No response within timeout`);
await sendErrorMessage("[ClawdBot] 未收到响应,请检查大模型 API Key 是否正确配置");
}
}
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Message processing failed: ${err}`);
await sendErrorMessage(`[ClawdBot] 处理消息失败: ${String(err).slice(0, 100)}`);
}
};
ws.on("open", () => {
log?.info(`[qqbot:${account.accountId}] WebSocket connected`);
reconnectAttempts = 0; // 连接成功,重置重试计数
});
ws.on("message", async (data) => {
try {
const payload = JSON.parse(data.toString()) as WSPayload;
const { op, d, s, t } = payload;
if (s) lastSeq = s;
log?.debug?.(`[qqbot:${account.accountId}] Received op=${op} t=${t}`);
switch (op) {
case 10: // Hello
log?.info(`[qqbot:${account.accountId}] Hello received`);
// 如果有 session_id尝试 Resume
if (sessionId && lastSeq !== null) {
log?.info(`[qqbot:${account.accountId}] Attempting to resume session ${sessionId}`);
ws.send(JSON.stringify({
op: 6, // Resume
d: {
token: `QQBot ${accessToken}`,
session_id: sessionId,
seq: lastSeq,
},
}));
} else {
// 新连接,发送 Identify
ws.send(JSON.stringify({
op: 2,
d: {
token: `QQBot ${accessToken}`,
intents: INTENTS.PUBLIC_GUILD_MESSAGES | INTENTS.DIRECT_MESSAGE | INTENTS.GROUP_AND_C2C,
shard: [0, 1],
},
}));
}
// 启动心跳
const interval = (d as { heartbeat_interval: number }).heartbeat_interval;
if (heartbeatInterval) clearInterval(heartbeatInterval);
heartbeatInterval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ op: 1, d: lastSeq }));
log?.debug?.(`[qqbot:${account.accountId}] Heartbeat sent`);
}
}, interval);
break;
case 0: // Dispatch
if (t === "READY") {
const readyData = d as { session_id: string };
sessionId = readyData.session_id;
log?.info(`[qqbot:${account.accountId}] Ready, session: ${sessionId}`);
onReady?.(d);
} else if (t === "RESUMED") {
log?.info(`[qqbot:${account.accountId}] Session resumed`);
} else if (t === "C2C_MESSAGE_CREATE") {
const event = d as C2CMessageEvent;
await handleMessage({
type: "c2c",
senderId: event.author.user_openid,
content: event.content,
messageId: event.id,
timestamp: event.timestamp,
});
} else if (t === "AT_MESSAGE_CREATE") {
const event = d as GuildMessageEvent;
await handleMessage({
type: "guild",
senderId: event.author.id,
senderName: event.author.username,
content: event.content,
messageId: event.id,
timestamp: event.timestamp,
channelId: event.channel_id,
guildId: event.guild_id,
});
} else if (t === "DIRECT_MESSAGE_CREATE") {
const event = d as GuildMessageEvent;
await handleMessage({
type: "dm",
senderId: event.author.id,
senderName: event.author.username,
content: event.content,
messageId: event.id,
timestamp: event.timestamp,
guildId: event.guild_id,
});
} else if (t === "GROUP_AT_MESSAGE_CREATE") {
const event = d as GroupMessageEvent;
await handleMessage({
type: "group",
senderId: event.author.member_openid,
content: event.content,
messageId: event.id,
timestamp: event.timestamp,
groupOpenid: event.group_openid,
});
}
break;
case 11: // Heartbeat ACK
log?.debug?.(`[qqbot:${account.accountId}] Heartbeat ACK`);
break;
case 7: // Reconnect
log?.info(`[qqbot:${account.accountId}] Server requested reconnect`);
cleanup();
scheduleReconnect();
break;
case 9: // Invalid Session
const canResume = d as boolean;
log?.error(`[qqbot:${account.accountId}] Invalid session, can resume: ${canResume}`);
if (!canResume) {
sessionId = null;
lastSeq = null;
}
cleanup();
scheduleReconnect();
break;
}
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Message parse error: ${err}`);
}
});
ws.on("close", (code, reason) => {
log?.info(`[qqbot:${account.accountId}] WebSocket closed: ${code} ${reason.toString()}`);
cleanup();
// 非正常关闭则重连
if (!isAborted && code !== 1000) {
scheduleReconnect();
}
});
ws.on("error", (err) => {
log?.error(`[qqbot:${account.accountId}] WebSocket error: ${err.message}`);
onError?.(err);
});
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Message parse error: ${err}`);
log?.error(`[qqbot:${account.accountId}] Connection failed: ${err}`);
scheduleReconnect();
}
});
};
ws.on("close", (code, reason) => {
log?.info(`[qqbot:${account.accountId}] WebSocket closed: ${code} ${reason}`);
cleanup();
});
ws.on("error", (err) => {
log?.error(`[qqbot:${account.accountId}] WebSocket error: ${err.message}`);
onError?.(err);
});
// 开始连接
await connect();
// 等待 abort 信号
return new Promise((resolve) => {