feat(qqbot): 流式消息输出与架构重构

**流式消息**
- 新增 StreamSender 类,支持流式消息分片发送
- 实现消息队列异步处理,防止阻塞心跳
- 支持 C2C/Group 流式消息

**架构重构**
- 移除 clawdbot/moltbot 旧配置,统一为 qqbot
- 新增 upgrade-and-run.sh 一键升级脚本
- 重构 api/channel/gateway/outbound 模块
- 新增富媒体消息发送接口
This commit is contained in:
rianli
2026-01-31 19:49:27 +08:00
parent 35cb5ec1d6
commit 50422aac14
16 changed files with 2933 additions and 1501 deletions

View File

@@ -1,9 +1,11 @@
import WebSocket from "ws";
import path from "node:path";
import type { ResolvedQQBotAccount, WSPayload, C2CMessageEvent, GuildMessageEvent, GroupMessageEvent } from "./types.js";
import { getAccessToken, getGatewayUrl, sendC2CMessage, sendChannelMessage, sendGroupMessage, clearTokenCache, sendC2CImageMessage, sendGroupImageMessage } from "./api.js";
import { StreamState } from "./types.js";
import { getAccessToken, getGatewayUrl, sendC2CMessage, sendChannelMessage, sendGroupMessage, clearTokenCache, sendC2CImageMessage, sendGroupImageMessage, initApiConfig, sendC2CInputNotify } from "./api.js";
import { getQQBotRuntime } from "./runtime.js";
import { startImageServer, saveImage, saveImageFromPath, isImageServerRunning, downloadFile, type ImageServerConfig } from "./image-server.js";
import { createStreamSender } from "./outbound.js";
// QQ Bot intents - 按权限级别分组
const INTENTS = {
@@ -50,6 +52,11 @@ const IMAGE_SERVER_PORT = parseInt(process.env.QQBOT_IMAGE_SERVER_PORT || "18765
// 使用绝对路径,确保文件保存和读取使用同一目录
const IMAGE_SERVER_DIR = process.env.QQBOT_IMAGE_SERVER_DIR || path.join(process.env.HOME || "/home/ubuntu", "clawd", "qqbot-images");
// 流式消息配置
const STREAM_CHUNK_INTERVAL = 500; // 流式消息分片间隔(毫秒)
const STREAM_MIN_CHUNK_SIZE = 10; // 最小分片大小(字符)
const STREAM_KEEPALIVE_INTERVAL = 8000; // 流式心跳间隔(毫秒),需要在 10 秒内发送
export interface GatewayContext {
account: ResolvedQQBotAccount;
abortSignal: AbortSignal;
@@ -90,6 +97,7 @@ async function ensureImageServer(log?: GatewayContext["log"], publicBaseUrl?: st
/**
* 启动 Gateway WebSocket 连接(带自动重连)
* 支持流式消息发送
*/
export async function startGateway(ctx: GatewayContext): Promise<void> {
const { account, abortSignal, cfg, onReady, onError, log } = ctx;
@@ -98,6 +106,12 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
throw new Error("QQBot not configured (missing appId or clientSecret)");
}
// 初始化 API 配置markdown 支持)
initApiConfig({
markdownSupport: account.markdownSupport,
});
log?.info(`[qqbot:${account.accountId}] API config: markdownSupport=${account.markdownSupport !== false}`);
// 如果配置了公网 URL启动图床服务器
let imageServerBaseUrl: string | null = null;
if (account.imageServerBaseUrl) {
@@ -218,6 +232,17 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
log?.info(`[qqbot:${account.accountId}] Attachments: ${event.attachments.length}`);
}
// 对于 C2C 消息,先发送输入状态提示用户机器人正在输入
if (event.type === "c2c") {
try {
const token = await getAccessToken(account.appId, account.clientSecret);
await sendC2CInputNotify(token, event.senderId, event.messageId, 60);
log?.info(`[qqbot:${account.accountId}] Sent input notify to ${event.senderId}`);
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Failed to send input notify: ${err}`);
}
}
pluginRuntime.channel.activity.record({
channel: "qqbot",
accountId: account.accountId,
@@ -242,7 +267,7 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
const envelopeOptions = pluginRuntime.channel.reply.resolveEnvelopeFormatOptions(cfg);
// 组装消息体,添加系统提示词
let builtinPrompt = "由于平台限制你的回复中不可以包含任何URL。";
let builtinPrompt = "";
// 只有配置了图床公网地址,才告诉 AI 可以发送图片
if (imageServerBaseUrl) {
@@ -309,7 +334,7 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
const fromAddress = event.type === "guild" ? `qqbot:channel:${event.channelId}`
: event.type === "group" ? `qqbot:group:${event.groupOpenid}`
: `qqbot:${event.senderId}`;
: `qqbot:c2c:${event.senderId}`;
const toAddress = fromAddress;
const ctxPayload = pluginRuntime.channel.reply.finalizeInboundContext({
@@ -386,22 +411,178 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
}, responseTimeout);
});
// ============ 流式消息发送器 ============
// 确定发送目标
const targetTo = event.type === "c2c" ? event.senderId
: event.type === "group" ? `group:${event.groupOpenid}`
: `channel:${event.channelId}`;
// 判断是否支持流式(仅 c2c 支持,群聊不支持流式)
const supportsStream = event.type === "c2c";
// 创建流式发送器
const streamSender = supportsStream ? createStreamSender(account, targetTo, event.messageId) : null;
let streamBuffer = ""; // 累积的全部文本(用于记录完整内容)
let lastSentLength = 0; // 上次发送时的文本长度(用于计算增量)
let lastStreamSendTime = 0; // 上次流式发送时间
let streamStarted = false; // 是否已开始流式发送
let streamEnded = false; // 流式是否已结束
let sendingLock = false; // 发送锁,防止并发发送
let pendingFullText = ""; // 待发送的完整文本(在锁定期间积累)
let keepaliveTimer: ReturnType<typeof setTimeout> | null = null; // 心跳定时器
// 清理心跳定时器
const clearKeepalive = () => {
if (keepaliveTimer) {
clearTimeout(keepaliveTimer);
keepaliveTimer = null;
}
};
// 重置心跳定时器(每次发送后调用)
const resetKeepalive = () => {
clearKeepalive();
if (streamSender && streamStarted && !streamEnded) {
keepaliveTimer = setTimeout(async () => {
// 10 秒内没有新消息,发送空分片保持连接
if (!streamEnded && !sendingLock) {
log?.info(`[qqbot:${account.accountId}] Sending keepalive empty chunk`);
sendingLock = true;
try {
// 发送空内容
await streamSender!.send("", false);
lastStreamSendTime = Date.now();
resetKeepalive(); // 继续下一个心跳
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Keepalive failed: ${err}`);
} finally {
sendingLock = false;
}
}
}, STREAM_KEEPALIVE_INTERVAL);
}
};
// 流式发送函数 - 用于 onPartialReply 实时发送(增量模式)
// markdown 分片需要以 \n 结尾
const sendStreamChunk = async (text: string, isEnd: boolean): Promise<boolean> => {
if (!streamSender || streamEnded) return false;
// markdown 分片需要以 \n 结尾(除非是空内容或结束标记)
let contentToSend = text;
if (isEnd && contentToSend && !contentToSend.endsWith("\n") && !isEnd) {
contentToSend = contentToSend + "\n";
}
const result = await streamSender.send(contentToSend, isEnd);
if (result.error) {
log?.error(`[qqbot:${account.accountId}] Stream send error: ${result.error}`);
return false;
} else {
log?.debug?.(`[qqbot:${account.accountId}] Stream chunk sent, index: ${streamSender.getContext().index - 1}, isEnd: ${isEnd}, text: "${text.slice(0, 50)}..."`);
}
if (isEnd) {
streamEnded = true;
clearKeepalive();
} else {
// 发送成功后重置心跳
resetKeepalive();
}
return true;
};
// 执行一次流式发送(带锁保护)
const doStreamSend = async (fullText: string, forceEnd: boolean = false): Promise<void> => {
// 如果正在发送,记录待发送的完整文本,稍后处理
if (sendingLock) {
pendingFullText = fullText;
return;
}
sendingLock = true;
try {
// 发送当前增量
if (fullText.length > lastSentLength) {
const increment = fullText.slice(lastSentLength);
const success = await sendStreamChunk(increment, forceEnd);
if (success) {
lastSentLength = fullText.length;
lastStreamSendTime = Date.now();
streamStarted = true;
log?.info(`[qqbot:${account.accountId}] Stream partial #${streamSender!.getContext().index}, increment: ${increment.length} chars, total: ${fullText.length} chars`);
}
} else if (forceEnd && !streamEnded) {
// 没有新内容但需要结束
await sendStreamChunk("", true);
}
} finally {
sendingLock = false;
}
// 处理在锁定期间积累的内容
if (pendingFullText && pendingFullText.length > lastSentLength && !streamEnded) {
const pending = pendingFullText;
pendingFullText = "";
// 递归发送积累的内容(不强制结束)
await doStreamSend(pending, false);
}
};
// onPartialReply 回调 - 实时接收 AI 生成的文本payload.text 是累积的全文)
const handlePartialReply = async (payload: { text?: string }) => {
if (!streamSender || streamEnded) {
log?.debug?.(`[qqbot:${account.accountId}] handlePartialReply skipped: streamSender=${!!streamSender}, streamEnded=${streamEnded}`);
return;
}
const fullText = payload.text ?? "";
if (!fullText) {
log?.debug?.(`[qqbot:${account.accountId}] handlePartialReply: empty text`);
return;
}
// 始终更新累积缓冲区(即使不发送,也要记录最新内容)
streamBuffer = fullText;
hasResponse = true;
log?.debug?.(`[qqbot:${account.accountId}] handlePartialReply: fullText.length=${fullText.length}, lastSentLength=${lastSentLength}`);
// 如果没有新内容,跳过
if (fullText.length <= lastSentLength) return;
const now = Date.now();
// 控制发送频率:首次发送或间隔超过阈值
if (!streamStarted || now - lastStreamSendTime >= STREAM_CHUNK_INTERVAL) {
log?.info(`[qqbot:${account.accountId}] handlePartialReply: sending stream chunk, length=${fullText.length}`);
await doStreamSend(fullText, false);
} else {
// 不到发送时间,但记录待发送内容,确保最终会被发送
pendingFullText = fullText;
}
};
const dispatchPromise = pluginRuntime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
responsePrefix: messagesConfig.responsePrefix,
deliver: async (payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string }) => {
deliver: async (payload: { text?: string; mediaUrls?: string[]; mediaUrl?: string }, info: { kind: string }) => {
hasResponse = true;
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
log?.info(`[qqbot:${account.accountId}] deliver called, payload keys: ${Object.keys(payload).join(", ")}`);
log?.info(`[qqbot:${account.accountId}] deliver called, kind: ${info.kind}, payload keys: ${Object.keys(payload).join(", ")}`);
let replyText = payload.text ?? "";
// 更新 streamBuffer确保最终内容不会丢失
if (replyText.length > streamBuffer.length) {
streamBuffer = replyText;
}
// 收集所有图片路径
const imageUrls: string[] = [];
@@ -430,70 +611,52 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
}
}
// 如果没有文本也没有图片,跳过
if (!replyText.trim() && imageUrls.length === 0) {
log?.info(`[qqbot:${account.accountId}] Empty reply, skipping`);
return;
}
// 0. 提取 MEDIA: 前缀的本地文件路径(从文本中)
// 提取文本中的各种图片格式
// 0. 提取 MEDIA: 前缀的本地文件路径
const mediaPathRegex = /MEDIA:([^\s\n]+)/gi;
const mediaMatches = [...replyText.matchAll(mediaPathRegex)];
for (const match of mediaMatches) {
const localPath = match[1];
if (localPath && imageServerBaseUrl) {
// 将本地文件复制到图床
try {
const savedUrl = saveImageFromPath(localPath);
if (savedUrl) {
imageUrls.push(savedUrl);
log?.info(`[qqbot:${account.accountId}] Saved local image to server: ${localPath}`);
} else {
log?.error(`[qqbot:${account.accountId}] Failed to save local image (not found or not image): ${localPath}`);
}
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Failed to save local image: ${err}`);
}
}
// 从文本中移除 MEDIA: 行
replyText = replyText.replace(match[0], "").trim();
}
// 0.5. 提取本地绝对文件路径/path/to/image.png 或 /path/to/image_123_png 格式)
// 支持标准扩展名和下划线替换后的扩展名
// 0.5. 提取本地绝对文件路径
const localPathRegex = /(\/[^\s\n]+?(?:\.(?:png|jpg|jpeg|gif|webp)|_(?:png|jpg|jpeg|gif|webp)(?:\s|$)))/gi;
const localPathMatches = [...replyText.matchAll(localPathRegex)];
for (const match of localPathMatches) {
let localPath = match[1].trim();
if (localPath && imageServerBaseUrl) {
// 如果是下划线格式的扩展名,转换回点格式
localPath = localPath.replace(/_(?=(?:png|jpg|jpeg|gif|webp)$)/, ".");
try {
const savedUrl = saveImageFromPath(localPath);
if (savedUrl) {
imageUrls.push(savedUrl);
log?.info(`[qqbot:${account.accountId}] Saved local path image to server: ${localPath}`);
} else {
log?.error(`[qqbot:${account.accountId}] Local path not found or not image: ${localPath}`);
}
} catch (err) {
log?.error(`[qqbot:${account.accountId}] Failed to save local path image: ${err}`);
}
}
// 从文本中移除本地路径
replyText = replyText.replace(match[0], "").trim();
}
// 1. 提取 base64 图片data:image/xxx;base64,...
// 1. 提取 base64 图片
const base64ImageRegex = /!\[([^\]]*)\]\((data:image\/[^;]+;base64,[A-Za-z0-9+/=]+)\)|(?<![(\[])(data:image\/[^;]+;base64,[A-Za-z0-9+/=]+)/gi;
const base64Matches = [...replyText.matchAll(base64ImageRegex)];
for (const match of base64Matches) {
const dataUrl = match[2] || match[3];
if (dataUrl && imageServerBaseUrl) {
// 将 base64 保存到本地图床
try {
const savedUrl = saveImage(dataUrl);
imageUrls.push(savedUrl);
@@ -502,42 +665,37 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
log?.error(`[qqbot:${account.accountId}] Failed to save base64 image: ${err}`);
}
}
// 从文本中移除 base64
replyText = replyText.replace(match[0], "").trim();
}
// 2. 提取 URL 图片Markdown 格式或纯 URL
// 2. 提取 URL 图片
const imageUrlRegex = /!\[([^\]]*)\]\((https?:\/\/[^\s)]+\.(?:png|jpg|jpeg|gif|webp)(?:\?[^\s)]*)?)\)|(?<![(\[])(https?:\/\/[^\s)]+\.(?:png|jpg|jpeg|gif|webp)(?:\?[^\s]*)?)/gi;
const urlMatches = [...replyText.matchAll(imageUrlRegex)];
for (const match of urlMatches) {
// match[2] 是 Markdown 格式的 URLmatch[3] 是纯 URL
const url = match[2] || match[3];
if (url) {
imageUrls.push(url);
}
}
// 从文本中移除图片 URL,避免被 QQ 拦截
// 从文本中移除图片 URL
let textWithoutImages = replyText;
for (const match of urlMatches) {
textWithoutImages = textWithoutImages.replace(match[0], "").trim();
}
// 处理剩余文本中的 URL 点号(只有在没有图片的情况下才替换,避免误伤
// 处理剩余文本中的 URL 点号(只有在没有图片的情况下才替换)
const hasImages = imageUrls.length > 0;
let hasReplacement = false;
if (!hasImages) {
if (!hasImages && textWithoutImages) {
const originalText = textWithoutImages;
textWithoutImages = textWithoutImages.replace(/([a-zA-Z0-9])\.([a-zA-Z0-9])/g, "$1_$2");
hasReplacement = textWithoutImages !== originalText;
if (hasReplacement && textWithoutImages.trim()) {
if (textWithoutImages !== originalText && textWithoutImages.trim()) {
textWithoutImages += "\n\n由于平台限制回复中的部分符号已被替换";
}
}
try {
// 发送图片(如果有)
// 发送图片(如果有)
for (const imageUrl of imageUrls) {
try {
await sendWithTokenRetry(async (token) => {
@@ -546,29 +704,24 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
} else if (event.type === "group" && event.groupOpenid) {
await sendGroupImageMessage(token, event.groupOpenid, imageUrl, event.messageId);
}
// 频道消息暂不支持富媒体,跳过图片
});
log?.info(`[qqbot:${account.accountId}] Sent image: ${imageUrl.slice(0, 50)}...`);
} catch (imgErr) {
log?.error(`[qqbot:${account.accountId}] Failed to send image: ${imgErr}`);
// 图片发送失败时,显示错误信息而不是 URL
const errMsg = String(imgErr).slice(0, 200);
textWithoutImages = `[图片发送失败: ${errMsg}]\n${textWithoutImages}`;
}
}
// 再发送文本(如果有)
if (textWithoutImages.trim()) {
// 只有频道和群聊消息(不支持流式)在 deliver 中发送文本
// c2c 的文本通过 onPartialReply 流式发送
if (!supportsStream && textWithoutImages.trim()) {
await sendWithTokenRetry(async (token) => {
if (event.type === "c2c") {
await sendC2CMessage(token, event.senderId, textWithoutImages, event.messageId);
} else if (event.type === "group" && event.groupOpenid) {
if (event.type === "group" && event.groupOpenid) {
await sendGroupMessage(token, event.groupOpenid, textWithoutImages, event.messageId);
} else if (event.channelId) {
await sendChannelMessage(token, event.channelId, textWithoutImages, event.messageId);
}
});
log?.info(`[qqbot:${account.accountId}] Sent text reply`);
log?.info(`[qqbot:${account.accountId}] Sent text reply (${event.type}, non-stream)`);
}
pluginRuntime.channel.activity.record({
@@ -587,6 +740,28 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
clearTimeout(timeoutId);
timeoutId = null;
}
// 清理心跳定时器
clearKeepalive();
// 如果在流式模式中出错,发送结束标记(增量模式)
if (streamSender && !streamEnded && streamBuffer) {
try {
// 等待发送锁释放
while (sendingLock) {
await new Promise(resolve => setTimeout(resolve, 50));
}
// 发送剩余增量 + 错误标记
const remainingIncrement = streamBuffer.slice(lastSentLength);
const errorIncrement = remainingIncrement + "\n\n[生成中断]";
await streamSender.end(errorIncrement);
streamEnded = true;
log?.info(`[qqbot:${account.accountId}] Stream ended due to error`);
} catch (endErr) {
log?.error(`[qqbot:${account.accountId}] Failed to end stream: ${endErr}`);
}
}
// 发送错误提示给用户,显示完整错误信息
const errMsg = String(err);
if (errMsg.includes("401") || errMsg.includes("key") || errMsg.includes("auth")) {
@@ -597,13 +772,47 @@ export async function startGateway(ctx: GatewayContext): Promise<void> {
}
},
},
replyOptions: {},
replyOptions: {
// 使用 onPartialReply 实现真正的流式消息
// 这个回调在 AI 生成过程中被实时调用
onPartialReply: supportsStream ? handlePartialReply : undefined,
// 禁用 block streaming因为我们用 onPartialReply 实现更实时的流式
disableBlockStreaming: supportsStream,
},
});
// 等待分发完成或超时
try {
await Promise.race([dispatchPromise, timeoutPromise]);
// 清理心跳定时器
clearKeepalive();
// 分发完成后,如果使用了流式且有内容,发送结束标记
if (streamSender && !streamEnded) {
// 等待发送锁释放
while (sendingLock) {
await new Promise(resolve => setTimeout(resolve, 50));
}
// 确保所有待发送内容都发送出去
// 优先使用 pendingFullText因为它可能包含最新的完整文本
const finalFullText = pendingFullText && pendingFullText.length > streamBuffer.length
? pendingFullText
: streamBuffer;
// 计算剩余未发送的增量内容
const remainingIncrement = finalFullText.slice(lastSentLength);
if (remainingIncrement || streamStarted) {
// 有剩余内容或者已开始流式,都需要发送结束标记
await streamSender.end(remainingIncrement);
streamEnded = true;
log?.info(`[qqbot:${account.accountId}] Stream completed, final increment: ${remainingIncrement.length} chars, total: ${finalFullText.length} chars, chunks: ${streamSender.getContext().index}`);
}
}
} catch (err) {
// 清理心跳定时器
clearKeepalive();
if (timeoutId) {
clearTimeout(timeoutId);
}