feat(qqbot): 定时提醒技能与主动消息系统

**新增提醒技能**
- 新增 skills/qqbot-cron/SKILL.md 定时提醒技能定义
- 支持一次性提醒(--at)和周期性提醒(--cron)
- 支持设置、查询、取消提醒操作

**主动消息系统**
- 新增 src/proactive.ts 主动消息发送模块
- 新增 src/known-users.ts 已知用户管理
- 新增 src/session-store.ts 会话存储
- 支持主动向用户/群组发送消息

**工具脚本**
- 新增 scripts/proactive-api-server.ts 主动消息API服务
This commit is contained in:
rianli
2026-02-01 17:10:55 +08:00
parent 50422aac14
commit a3e87f2f37
15 changed files with 4639 additions and 52 deletions

View File

@@ -2,7 +2,9 @@ import WebSocket from "ws";
import path from "node:path";
import type { ResolvedQQBotAccount, WSPayload, C2CMessageEvent, GuildMessageEvent, GroupMessageEvent } from "./types.js";
import { StreamState } from "./types.js";
import { getAccessToken, getGatewayUrl, sendC2CMessage, sendChannelMessage, sendGroupMessage, clearTokenCache, sendC2CImageMessage, sendGroupImageMessage, initApiConfig, sendC2CInputNotify } from "./api.js";
import { getAccessToken, getGatewayUrl, sendC2CMessage, sendChannelMessage, sendGroupMessage, clearTokenCache, sendC2CImageMessage, sendGroupImageMessage, initApiConfig, startBackgroundTokenRefresh, stopBackgroundTokenRefresh } from "./api.js";
import { loadSession, saveSession, clearSession, type SessionState } from "./session-store.js";
import { recordKnownUser, flushKnownUsers } from "./known-users.js";
import { getQQBotRuntime } from "./runtime.js";
import { startImageServer, saveImage, saveImageFromPath, isImageServerRunning, downloadFile, type ImageServerConfig } from "./image-server.js";
import { createStreamSender } from "./outbound.js";
@@ -55,7 +57,79 @@ const IMAGE_SERVER_DIR = process.env.QQBOT_IMAGE_SERVER_DIR || path.join(process
// 流式消息配置
const STREAM_CHUNK_INTERVAL = 500; // 流式消息分片间隔(毫秒)
const STREAM_MIN_CHUNK_SIZE = 10; // 最小分片大小(字符)
const STREAM_KEEPALIVE_INTERVAL = 8000; // 流式心跳间隔(毫秒),需要在 10 秒内发送
const STREAM_KEEPALIVE_FIRST_DELAY = 3000; // 首次状态保持延迟毫秒openclaw 3s 内未回复时发送
const STREAM_KEEPALIVE_GAP = 10000; // 状态保持消息之间的间隔(毫秒)
const STREAM_KEEPALIVE_MAX_PER_CHUNK = 2; // 每 2 个消息分片之间最多发送的状态保持消息数量
const STREAM_MAX_DURATION = 3 * 60 * 1000; // 流式消息最大持续时间(毫秒),超过 3 分钟自动结束
// 消息队列配置(异步处理,防止阻塞心跳)
const MESSAGE_QUEUE_SIZE = 1000; // 最大队列长度
const MESSAGE_QUEUE_WARN_THRESHOLD = 800; // 队列告警阈值
// ============ 消息回复限流器 ============
// 同一 message_id 1小时内最多回复 4 次超过1小时需降级为主动消息
const MESSAGE_REPLY_LIMIT = 4;
const MESSAGE_REPLY_TTL = 60 * 60 * 1000; // 1小时
interface MessageReplyRecord {
count: number;
firstReplyAt: number;
}
const messageReplyTracker = new Map<string, MessageReplyRecord>();
/**
* 检查是否可以回复该消息(限流检查)
* @param messageId 消息ID
* @returns { allowed: boolean, remaining: number } allowed=是否允许回复remaining=剩余次数
*/
function checkMessageReplyLimit(messageId: string): { allowed: boolean; remaining: number } {
const now = Date.now();
const record = messageReplyTracker.get(messageId);
// 清理过期记录(定期清理,避免内存泄漏)
if (messageReplyTracker.size > 10000) {
for (const [id, rec] of messageReplyTracker) {
if (now - rec.firstReplyAt > MESSAGE_REPLY_TTL) {
messageReplyTracker.delete(id);
}
}
}
if (!record) {
return { allowed: true, remaining: MESSAGE_REPLY_LIMIT };
}
// 检查是否过期
if (now - record.firstReplyAt > MESSAGE_REPLY_TTL) {
messageReplyTracker.delete(messageId);
return { allowed: true, remaining: MESSAGE_REPLY_LIMIT };
}
// 检查是否超过限制
const remaining = MESSAGE_REPLY_LIMIT - record.count;
return { allowed: remaining > 0, remaining: Math.max(0, remaining) };
}
/**
* 记录一次消息回复
* @param messageId 消息ID
*/
function recordMessageReply(messageId: string): void {
const now = Date.now();
const record = messageReplyTracker.get(messageId);
if (!record) {
messageReplyTracker.set(messageId, { count: 1, firstReplyAt: now });
} else {
// 检查是否过期,过期则重新计数
if (now - record.firstReplyAt > MESSAGE_REPLY_TTL) {
messageReplyTracker.set(messageId, { count: 1, firstReplyAt: now });
} else {
record.count++;
}
}
}
export interface GatewayContext {
account: ResolvedQQBotAccount;
@@ -70,6 +144,22 @@ export interface GatewayContext {
};
}
/**
* 消息队列项类型(用于异步处理消息,防止阻塞心跳)
*/
interface QueuedMessage {
type: "c2c" | "guild" | "dm" | "group";
senderId: string;
senderName?: string;
content: string;
messageId: string;
timestamp: string;
channelId?: string;
guildId?: string;
groupOpenid?: string;
attachments?: Array<{ content_type: string; url: string; filename?: string }>;
}
/**
* 启动图床服务器
*/
@@ -137,6 +227,74 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
let intentLevelIndex = 0; // 当前尝试的权限级别索引
let lastSuccessfulIntentLevel = -1; // 上次成功的权限级别
// ============ P1-2: 尝试从持久化存储恢复 Session ============
const savedSession = loadSession(account.accountId);
if (savedSession) {
sessionId = savedSession.sessionId;
lastSeq = savedSession.lastSeq;
intentLevelIndex = savedSession.intentLevelIndex;
lastSuccessfulIntentLevel = savedSession.intentLevelIndex;
log?.info(`[qqbot:${account.accountId}] Restored session from storage: sessionId=${sessionId}, lastSeq=${lastSeq}, intentLevel=${intentLevelIndex}`);
}
// ============ 消息队列(异步处理,防止阻塞心跳) ============
const messageQueue: QueuedMessage[] = [];
let messageProcessorRunning = false;
let messagesProcessed = 0; // 统计已处理消息数
/**
* 将消息加入队列(非阻塞)
*/
const enqueueMessage = (msg: QueuedMessage): void => {
if (messageQueue.length >= MESSAGE_QUEUE_SIZE) {
// 队列满了,丢弃最旧的消息
const dropped = messageQueue.shift();
log?.error(`[qqbot:${account.accountId}] Message queue full, dropping oldest message from ${dropped?.senderId}`);
}
if (messageQueue.length >= MESSAGE_QUEUE_WARN_THRESHOLD) {
log?.info(`[qqbot:${account.accountId}] Message queue size: ${messageQueue.length}/${MESSAGE_QUEUE_SIZE}`);
}
messageQueue.push(msg);
log?.debug?.(`[qqbot:${account.accountId}] Message enqueued, queue size: ${messageQueue.length}`);
};
/**
* 启动消息处理循环(独立于 WS 消息循环)
*/
const startMessageProcessor = (handleMessageFn: (msg: QueuedMessage) => Promise<void>): void => {
if (messageProcessorRunning) return;
messageProcessorRunning = true;
const processLoop = async () => {
while (!isAborted) {
if (messageQueue.length === 0) {
// 队列为空,等待一小段时间
await new Promise(resolve => setTimeout(resolve, 50));
continue;
}
const msg = messageQueue.shift()!;
try {
await handleMessageFn(msg);
messagesProcessed++;
} catch (err) {
// 捕获处理异常,防止影响队列循环
log?.error(`[qqbot:${account.accountId}] Message processor error: ${err}`);
}
}
messageProcessorRunning = false;
log?.info(`[qqbot:${account.accountId}] Message processor stopped`);
};
// 异步启动,不阻塞调用者
processLoop().catch(err => {
log?.error(`[qqbot:${account.accountId}] Message processor crashed: ${err}`);
messageProcessorRunning = false;
});
log?.info(`[qqbot:${account.accountId}] Message processor started`);
};
abortSignal.addEventListener("abort", () => {
isAborted = true;
if (reconnectTimer) {
@@ -144,6 +302,10 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
reconnectTimer = null;
}
cleanup();
// P1-1: 停止后台 Token 刷新
stopBackgroundTokenRefresh();
// P1-3: 保存已知用户数据
flushKnownUsers();
});
const cleanup = () => {
@@ -232,16 +394,9 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
log?.info(`[qqbot:${account.accountId}] Attachments: ${event.attachments.length}`);
}
// 对于 C2C 消息,先发送输入状态提示用户机器人正在输入
if (event.type === "c2c") {
try {
const token = await getAccessToken(account.appId, account.clientSecret);
await sendC2CInputNotify(token, event.senderId, event.messageId, 60);
log?.info(`[qqbot:${account.accountId}] Sent input notify to ${event.senderId}`);
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Failed to send input notify: ${err}`);
}
}
// 流式消息开关(默认启用,仅 c2c 支持)
const streamEnabled = account.streamEnabled !== false;
log?.debug?.(`[qqbot:${account.accountId}] Stream enabled: ${streamEnabled}`);
pluginRuntime.channel.activity.record({
channel: "qqbot",
@@ -269,6 +424,51 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
// 组装消息体,添加系统提示词
let builtinPrompt = "";
// ============ 用户标识信息(用于定时提醒和主动消息) ============
const isGroupChat = event.type === "group";
const targetAddress = isGroupChat ? `group:${event.groupOpenid}` : event.senderId;
builtinPrompt += `
【当前用户信息】
- 用户 openid: ${event.senderId}
- 用户昵称: ${event.senderName || "未知"}
- 消息类型: ${isGroupChat ? "群聊" : "私聊"}
- 当前消息 message_id: ${event.messageId}${isGroupChat ? `
- 群组 group_openid: ${event.groupOpenid}` : ""}
【定时提醒能力】
你可以帮助用户设置定时提醒。使用 openclaw cron 命令:
示例5分钟后提醒用户喝水
\`\`\`bash
openclaw cron add \\
--name "提醒喝水-${event.senderName || "用户"}" \\
--at "5m" \\
--session isolated \\
--message "💧 该喝水啦!" \\
--deliver \\
--channel qqbot \\
--to "${targetAddress}" \\
--reply-to "${event.messageId}" \\
--delete-after-run
\`\`\`
关键参数说明:
- \`--to\`: 目标地址(当前用户: ${targetAddress}
- \`--reply-to\`: 回复消息ID当前消息: ${event.messageId},使提醒能引用原消息)
- \`--at\`: 一次性定时任务的触发时间
- 相对时间格式:数字+单位,如 \`5m\`5分钟\`1h\`1小时\`2d\`2天【注意不要加 + 号】
- 绝对时间格式ISO 8601 带时区,如 \`2026-02-01T14:00:00+08:00\`
- \`--cron\`: 周期性任务(如 \`0 8 * * *\` 每天早上8点
- \`--tz "Asia/Shanghai"\`: 周期任务务必设置时区
- \`--delete-after-run\`: 一次性任务必须添加此参数
- \`--message\`: 消息内容(必填,不能为空!对应 QQ API 的 markdown.content 字段)
⚠️ 重要注意事项:
1. --at 参数格式:相对时间用 \`5m\`\`1h\` 等(不要加 + 号!);绝对时间用完整 ISO 格式
2. 定时提醒消息不支持流式发送,命令中不要添加 --stream 参数
3. --message 参数必须有实际内容,不能为空字符串`;
// 只有配置了图床公网地址,才告诉 AI 可以发送图片
if (imageServerBaseUrl) {
builtinPrompt += `
@@ -400,7 +600,7 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
// 追踪是否有响应
let hasResponse = false;
const responseTimeout = 30000; // 30秒超时
const responseTimeout = 60000; // 60秒超时1分钟
let timeoutId: ReturnType<typeof setTimeout> | null = null;
const timeoutPromise = new Promise<void>((_, reject) => {
@@ -417,19 +617,25 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
: event.type === "group" ? `group:${event.groupOpenid}`
: `channel:${event.channelId}`;
// 判断是否支持流式(仅 c2c 支持,群聊不支持流式)
const supportsStream = event.type === "c2c";
// 判断是否支持流式(仅 c2c 支持,群聊不支持流式,且需要开关启用
const supportsStream = event.type === "c2c" && streamEnabled;
log?.info(`[qqbot:${account.accountId}] Stream support: ${supportsStream} (type=${event.type}, enabled=${streamEnabled})`);
// 创建流式发送器
const streamSender = supportsStream ? createStreamSender(account, targetTo, event.messageId) : null;
let streamBuffer = ""; // 累积的全部文本(用于记录完整内容)
let lastSentLength = 0; // 上次发送时的文本长度(用于计算增量)
let lastSentText = ""; // 上次发送时的完整文本(用于检测新段落)
let currentSegmentStart = 0; // 当前段落在 streamBuffer 中的起始位置
let lastStreamSendTime = 0; // 上次流式发送时间
let streamStarted = false; // 是否已开始流式发送
let streamEnded = false; // 流式是否已结束
let streamStartTime = 0; // 流式消息开始时间(用于超时检查)
let sendingLock = false; // 发送锁,防止并发发送
let pendingFullText = ""; // 待发送的完整文本(在锁定期间积累)
let keepaliveTimer: ReturnType<typeof setTimeout> | null = null; // 心跳定时器
let keepaliveCountSinceLastChunk = 0; // 自上次分片以来发送的状态保持消息数量
let lastChunkSendTime = 0; // 上次分片发送时间(用于判断是否需要发送状态保持)
// 清理心跳定时器
const clearKeepalive = () => {
@@ -440,26 +646,78 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
};
// 重置心跳定时器(每次发送后调用)
const resetKeepalive = () => {
// isContentChunk: 是否为内容分片(非状态保持消息)
const resetKeepalive = (isContentChunk: boolean = false) => {
clearKeepalive();
// 如果是内容分片,重置状态保持计数器和时间
if (isContentChunk) {
keepaliveCountSinceLastChunk = 0;
lastChunkSendTime = Date.now();
}
if (streamSender && streamStarted && !streamEnded) {
// 计算下次状态保持消息的延迟时间
// - 首次3sSTREAM_KEEPALIVE_FIRST_DELAY
// - 后续10sSTREAM_KEEPALIVE_GAP
const delay = keepaliveCountSinceLastChunk === 0
? STREAM_KEEPALIVE_FIRST_DELAY
: STREAM_KEEPALIVE_GAP;
keepaliveTimer = setTimeout(async () => {
// 10 秒内没有新消息,发送空分片保持连接
// 检查流式消息是否超时(超过 3 分钟自动结束)
const elapsed = Date.now() - streamStartTime;
if (elapsed >= STREAM_MAX_DURATION) {
log?.info(`[qqbot:${account.accountId}] Stream timeout after ${Math.round(elapsed / 1000)}s, auto ending stream`);
if (!streamEnded && !sendingLock) {
sendingLock = true;
try {
// 发送结束标记
await streamSender!.send("", true);
streamEnded = true;
clearKeepalive();
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Stream auto-end failed: ${err}`);
} finally {
sendingLock = false;
}
}
return; // 超时后不再继续心跳
}
// 检查是否已达到每2个分片之间的最大状态保持消息数量
if (keepaliveCountSinceLastChunk >= STREAM_KEEPALIVE_MAX_PER_CHUNK) {
log?.debug?.(`[qqbot:${account.accountId}] Max keepalive reached (${keepaliveCountSinceLastChunk}/${STREAM_KEEPALIVE_MAX_PER_CHUNK}), waiting for next content chunk`);
// 不再发送状态保持,但继续监控超时
resetKeepalive(false);
return;
}
// 检查距上次分片是否超过 3s
const timeSinceLastChunk = Date.now() - lastChunkSendTime;
if (timeSinceLastChunk < STREAM_KEEPALIVE_FIRST_DELAY) {
// 还未到发送状态保持的时机,继续等待
resetKeepalive(false);
return;
}
// 发送状态保持消息
if (!streamEnded && !sendingLock) {
log?.info(`[qqbot:${account.accountId}] Sending keepalive empty chunk`);
log?.info(`[qqbot:${account.accountId}] Sending keepalive #${keepaliveCountSinceLastChunk + 1} (elapsed: ${Math.round(elapsed / 1000)}s, since chunk: ${Math.round(timeSinceLastChunk / 1000)}s)`);
sendingLock = true;
try {
// 发送空内容
await streamSender!.send("", false);
lastStreamSendTime = Date.now();
resetKeepalive(); // 继续下一个心跳
keepaliveCountSinceLastChunk++;
resetKeepalive(false); // 继续下一个状态保持(非内容分片)
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Keepalive failed: ${err}`);
} finally {
sendingLock = false;
}
}
}, STREAM_KEEPALIVE_INTERVAL);
}, delay);
}
};
@@ -486,8 +744,9 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
streamEnded = true;
clearKeepalive();
} else {
// 发送成功后重置心跳
resetKeepalive();
// 发送成功后重置心跳,如果是有内容的分片则重置计数器
const isContentChunk = text.length > 0;
resetKeepalive(isContentChunk);
}
return true;
};
@@ -505,11 +764,17 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
// 发送当前增量
if (fullText.length > lastSentLength) {
const increment = fullText.slice(lastSentLength);
// 首次发送前,先设置流式状态和开始时间
if (!streamStarted) {
streamStarted = true;
streamStartTime = Date.now();
log?.info(`[qqbot:${account.accountId}] Stream started, max duration: ${STREAM_MAX_DURATION / 1000}s`);
}
const success = await sendStreamChunk(increment, forceEnd);
if (success) {
lastSentLength = fullText.length;
lastSentText = fullText; // 记录完整发送文本,用于检测新段落
lastStreamSendTime = Date.now();
streamStarted = true;
log?.info(`[qqbot:${account.accountId}] Stream partial #${streamSender!.getContext().index}, increment: ${increment.length} chars, total: ${fullText.length} chars`);
}
} else if (forceEnd && !streamEnded) {
@@ -530,6 +795,8 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
};
// onPartialReply 回调 - 实时接收 AI 生成的文本payload.text 是累积的全文)
// 注意agent 在一次对话中可能产生多个回复段落(如思考、工具调用后继续回复)
// 每个新段落的 text 会从头开始累积,需要检测并处理
const handlePartialReply = async (payload: { text?: string }) => {
if (!streamSender || streamEnded) {
log?.debug?.(`[qqbot:${account.accountId}] handlePartialReply skipped: streamSender=${!!streamSender}, streamEnded=${streamEnded}`);
@@ -542,11 +809,39 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
return;
}
// 始终更新累积缓冲区(即使不发送,也要记录最新内容)
streamBuffer = fullText;
hasResponse = true;
log?.debug?.(`[qqbot:${account.accountId}] handlePartialReply: fullText.length=${fullText.length}, lastSentLength=${lastSentLength}`);
// 检测是否是新段落:
// 1. lastSentText 不为空(说明已经发送过内容)
// 2. 当前文本不是以 lastSentText 开头(说明不是同一段落的增量)
// 3. 当前文本长度小于 lastSentLength说明文本被重置了
const isNewSegment = lastSentText.length > 0 &&
(fullText.length < lastSentLength || !fullText.startsWith(lastSentText.slice(0, Math.min(10, lastSentText.length))));
if (isNewSegment) {
// 新段落开始,将之前的内容追加到 streamBuffer并重置发送位置
log?.info(`[qqbot:${account.accountId}] New segment detected! lastSentLength=${lastSentLength}, newTextLength=${fullText.length}, lastSentText="${lastSentText.slice(0, 20)}...", newText="${fullText.slice(0, 20)}..."`);
// 记录当前段落在 streamBuffer 中的起始位置
currentSegmentStart = streamBuffer.length;
// 追加换行分隔符(如果前面有内容且不以换行结尾)
if (streamBuffer.length > 0 && !streamBuffer.endsWith("\n")) {
streamBuffer += "\n\n";
currentSegmentStart = streamBuffer.length;
}
// 重置发送位置,从新段落开始发送
lastSentLength = 0;
lastSentText = "";
}
// 更新当前段落内容到 streamBuffer
// streamBuffer = 之前的段落内容 + 当前段落的完整内容
const beforeCurrentSegment = streamBuffer.slice(0, currentSegmentStart);
streamBuffer = beforeCurrentSegment + fullText;
log?.debug?.(`[qqbot:${account.accountId}] handlePartialReply: fullText.length=${fullText.length}, lastSentLength=${lastSentLength}, streamBuffer.length=${streamBuffer.length}, isNewSegment=${isNewSegment}`);
// 如果没有新内容,跳过
if (fullText.length <= lastSentLength) return;
@@ -578,9 +873,15 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
let replyText = payload.text ?? "";
// 更新 streamBuffer,确保最终内容不会丢失
if (replyText.length > streamBuffer.length) {
streamBuffer = replyText;
// 更新当前段落内容到 streamBuffer
// deliver 中的 replyText 是当前段落的完整文本
if (replyText.length > 0) {
const beforeCurrentSegment = streamBuffer.slice(0, currentSegmentStart);
const newStreamBuffer = beforeCurrentSegment + replyText;
if (newStreamBuffer.length > streamBuffer.length) {
streamBuffer = newStreamBuffer;
log?.debug?.(`[qqbot:${account.accountId}] deliver: updated streamBuffer, replyText=${replyText.length}, total=${streamBuffer.length}`);
}
}
// 收集所有图片路径
@@ -796,18 +1097,18 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
}
// 确保所有待发送内容都发送出去
// 优先使用 pendingFullText因为它可能包含最新完整文本
const finalFullText = pendingFullText && pendingFullText.length > streamBuffer.length
// 当前段落的最新完整文本
const currentSegmentText = pendingFullText && pendingFullText.length > (streamBuffer.length - currentSegmentStart)
? pendingFullText
: streamBuffer;
: streamBuffer.slice(currentSegmentStart);
// 计算剩余未发送的增量内容
const remainingIncrement = finalFullText.slice(lastSentLength);
// 计算当前段落剩余未发送的增量内容
const remainingIncrement = currentSegmentText.slice(lastSentLength);
if (remainingIncrement || streamStarted) {
// 有剩余内容或者已开始流式,都需要发送结束标记
await streamSender.end(remainingIncrement);
streamEnded = true;
log?.info(`[qqbot:${account.accountId}] Stream completed, final increment: ${remainingIncrement.length} chars, total: ${finalFullText.length} chars, chunks: ${streamSender.getContext().index}`);
log?.info(`[qqbot:${account.accountId}] Stream completed, final increment: ${remainingIncrement.length} chars, total streamBuffer: ${streamBuffer.length} chars, chunks: ${streamSender.getContext().index}`);
}
}
} catch (err) {
@@ -832,6 +1133,12 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
isConnecting = false; // 连接完成,释放锁
reconnectAttempts = 0; // 连接成功,重置重试计数
lastConnectTime = Date.now(); // 记录连接时间
// 启动消息处理器(异步处理,防止阻塞心跳)
startMessageProcessor(handleMessage);
// P1-1: 启动后台 Token 刷新
startBackgroundTokenRefresh(account.appId, account.clientSecret, {
log: log as { info: (msg: string) => void; error: (msg: string) => void; debug?: (msg: string) => void },
});
});
ws.on("message", async (data) => {
@@ -840,7 +1147,20 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
const payload = JSON.parse(rawData) as WSPayload;
const { op, d, s, t } = payload;
if (s) lastSeq = s;
if (s) {
lastSeq = s;
// P1-2: 更新持久化存储中的 lastSeq节流保存
if (sessionId) {
saveSession({
sessionId,
lastSeq,
lastConnectedAt: lastConnectTime,
intentLevelIndex: lastSuccessfulIntentLevel >= 0 ? lastSuccessfulIntentLevel : intentLevelIndex,
accountId: account.accountId,
savedAt: Date.now(),
});
}
}
log?.debug?.(`[qqbot:${account.accountId}] Received op=${op} t=${t}`);
@@ -894,12 +1214,39 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
lastSuccessfulIntentLevel = intentLevelIndex;
const successLevel = INTENT_LEVELS[intentLevelIndex];
log?.info(`[qqbot:${account.accountId}] Ready with ${successLevel.description}, session: ${sessionId}`);
// P1-2: 保存新的 Session 状态
saveSession({
sessionId,
lastSeq,
lastConnectedAt: Date.now(),
intentLevelIndex,
accountId: account.accountId,
savedAt: Date.now(),
});
onReady?.(d);
} else if (t === "RESUMED") {
log?.info(`[qqbot:${account.accountId}] Session resumed`);
// P1-2: 更新 Session 连接时间
if (sessionId) {
saveSession({
sessionId,
lastSeq,
lastConnectedAt: Date.now(),
intentLevelIndex: lastSuccessfulIntentLevel >= 0 ? lastSuccessfulIntentLevel : intentLevelIndex,
accountId: account.accountId,
savedAt: Date.now(),
});
}
} else if (t === "C2C_MESSAGE_CREATE") {
const event = d as C2CMessageEvent;
await handleMessage({
// P1-3: 记录已知用户
recordKnownUser({
openid: event.author.user_openid,
type: "c2c",
accountId: account.accountId,
});
// 使用消息队列异步处理,防止阻塞心跳
enqueueMessage({
type: "c2c",
senderId: event.author.user_openid,
content: event.content,
@@ -909,7 +1256,14 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
});
} else if (t === "AT_MESSAGE_CREATE") {
const event = d as GuildMessageEvent;
await handleMessage({
// P1-3: 记录已知用户(频道用户)
recordKnownUser({
openid: event.author.id,
type: "c2c", // 频道用户按 c2c 类型存储
nickname: event.author.username,
accountId: account.accountId,
});
enqueueMessage({
type: "guild",
senderId: event.author.id,
senderName: event.author.username,
@@ -922,7 +1276,14 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
});
} else if (t === "DIRECT_MESSAGE_CREATE") {
const event = d as GuildMessageEvent;
await handleMessage({
// P1-3: 记录已知用户(频道私信用户)
recordKnownUser({
openid: event.author.id,
type: "c2c",
nickname: event.author.username,
accountId: account.accountId,
});
enqueueMessage({
type: "dm",
senderId: event.author.id,
senderName: event.author.username,
@@ -934,7 +1295,14 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
});
} else if (t === "GROUP_AT_MESSAGE_CREATE") {
const event = d as GroupMessageEvent;
await handleMessage({
// P1-3: 记录已知用户(群组用户)
recordKnownUser({
openid: event.author.member_openid,
type: "group",
groupOpenid: event.group_openid,
accountId: account.accountId,
});
enqueueMessage({
type: "group",
senderId: event.author.member_openid,
content: event.content,
@@ -964,6 +1332,8 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
if (!canResume) {
sessionId = null;
lastSeq = null;
// P1-2: 清除持久化的 Session
clearSession(account.accountId);
// 尝试降级到下一个权限级别
if (intentLevelIndex < INTENT_LEVELS.length - 1) {