package qq import ( "context" "fmt" "log" "path" "regexp" "strconv" "strings" "time" "ops-assistant/internal/core/ops" "ops-assistant/internal/service" "ops-assistant/internal/core/ai" "ops-assistant/models" "github.com/tencent-connect/botgo" "github.com/tencent-connect/botgo/dto" "github.com/tencent-connect/botgo/dto/message" "github.com/tencent-connect/botgo/event" "github.com/tencent-connect/botgo/openapi" "github.com/tencent-connect/botgo/token" "github.com/tidwall/gjson" "gorm.io/gorm" ) // DefaultUserID 统一用户ID,使所有平台共享同一份账本 const DefaultUserID int64 = 1 type QQBot struct { api openapi.OpenAPI finance *service.FinanceService credentials *token.QQBotCredentials db *gorm.DB opsSvc *ops.Service aiClient *ai.Client aiAutoReload time.Time } func NewQQBot(db *gorm.DB, appID string, secret string, finance *service.FinanceService, opsSvc *ops.Service) *QQBot { return &QQBot{ db: db, finance: finance, opsSvc: opsSvc, credentials: &token.QQBotCredentials{ AppID: appID, AppSecret: secret, }, aiClient: buildAIClient(db), aiAutoReload: time.Now(), } } func (b *QQBot) Start(ctx context.Context) { tokenSource := token.NewQQBotTokenSource(b.credentials) if err := token.StartRefreshAccessToken(ctx, tokenSource); err != nil { log.Printf("❌ QQ Bot Token 刷新失败: %v", err) return } b.api = botgo.NewOpenAPI(b.credentials.AppID, tokenSource).WithTimeout(5 * time.Second) _ = event.RegisterHandlers( b.groupATMessageHandler(), b.c2cMessageHandler(), b.channelATMessageHandler(), ) wsInfo, err := b.api.WS(ctx, nil, "") if err != nil { log.Printf("❌ QQ Bot 获取 WS 信息失败: %v", err) return } intent := dto.Intent(1<<25 | 1<<30) log.Printf("🚀 QQ Bot 已启动 (WebSocket, shards=%d)", wsInfo.Shards) if err := botgo.NewSessionManager().Start(wsInfo, tokenSource, &intent); err != nil { log.Printf("❌ QQ Bot WebSocket 断开: %v", err) } } func isCommand(text string, keywords ...string) bool { for _, kw := range keywords { if text == kw { return true } } return false } func (b *QQBot) isDuplicate(eventID string) bool { if b.db == nil || strings.TrimSpace(eventID) == "" { return false } var existed models.MessageDedup if err := b.db.Where("platform = ? AND event_id = ?", "qqbot_official", eventID).First(&existed).Error; err == nil { return true } _ = b.db.Create(&models.MessageDedup{Platform: "qqbot_official", EventID: eventID, ProcessedAt: time.Now()}).Error return false } func (b *QQBot) processAndReply(userID string, content string) string { text := strings.TrimSpace(message.ETLInput(content)) if text == "" { return "" } // 先交给 opsSvc 处理命令 if b.opsSvc != nil { if handled, out := b.opsSvc.Handle(DefaultUserID, text); handled { if strings.HasPrefix(text, "/cpa ") || text == "/cpa" || strings.HasPrefix(text, "/cf ") || strings.HasPrefix(text, "/mail ") { jobID := parseJobID(out) if jobID > 0 { go b.waitAndPushJobResult(userID, jobID, text) } } return out } } // 规则兜底:常见自然语映射到标准命令 if b.opsSvc != nil { norm := strings.ReplaceAll(strings.ReplaceAll(strings.TrimSpace(text), " ", ""), " ", "") if (strings.Contains(norm, "域名") || strings.Contains(norm, "站点")) && !strings.Contains(norm, "解析") && !strings.Contains(strings.ToLower(norm), "dns") { cmd := "/cf zones" if handled, out := b.opsSvc.Handle(DefaultUserID, cmd); handled { jobID := parseJobID(out) if jobID > 0 { go b.waitAndPushJobResult(userID, jobID, cmd) } return out } } switch norm { case "cpa状态", "CPA状态", "cpaStatus", "cpastatus": cmd := "/cpa status" if handled, out := b.opsSvc.Handle(DefaultUserID, cmd); handled { jobID := parseJobID(out) if jobID > 0 { go b.waitAndPushJobResult(userID, jobID, cmd) } return out } case "功能", "菜单", "帮助", "help", "Help", "HELP", "你能做什么", "你会什么": cmd := "/help" if handled, out := b.opsSvc.Handle(DefaultUserID, cmd); handled { return out } case "cf状态", "cf配置", "cf配置状态", "cloudflare状态", "cloudflare配置": cmd := "/cf status" if handled, out := b.opsSvc.Handle(DefaultUserID, cmd); handled { jobID := parseJobID(out) if jobID > 0 { go b.waitAndPushJobResult(userID, jobID, cmd) } return out } case "cf域名", "cf账号域名", "cfzones", "cf zones", "cloudflare域名", "cloudflare站点", "站点列表", "域名列表", "我的域名", "域名清单": cmd := "/cf zones" if handled, out := b.opsSvc.Handle(DefaultUserID, cmd); handled { jobID := parseJobID(out) if jobID > 0 { go b.waitAndPushJobResult(userID, jobID, cmd) } return out } case "cf解析", "cf记录", "解析记录", "dns记录", "dns列表", "列解析", "列记录": return "❌ 缺少 zone_id,请用:/cf dns list " case "cfworkers", "cf workers", "workers列表", "workers list", "列workers": cmd := "/cf workers list" if handled, out := b.opsSvc.Handle(DefaultUserID, cmd); handled { jobID := parseJobID(out) if jobID > 0 { go b.waitAndPushJobResult(userID, jobID, cmd) } return out } } } // 非命令:尝试 AI 翻译 -> 标准命令 if time.Since(b.aiAutoReload) > 3*time.Second { b.aiClient = buildAIClient(b.db) b.aiAutoReload = time.Now() } if b.aiClient != nil { if cmd, err := b.aiClient.Suggest(text); err == nil { cmd = strings.TrimSpace(cmd) if cmd != "" && cmd != "FAIL" && strings.HasPrefix(cmd, "/") { // 仅翻译成命令,交给模块处理(不回译文) if b.opsSvc != nil { if handled, out := b.opsSvc.Handle(DefaultUserID, cmd); handled { if strings.HasPrefix(cmd, "/cpa ") || cmd == "/cpa" || strings.HasPrefix(cmd, "/cf ") || strings.HasPrefix(cmd, "/mail ") { jobID := parseJobID(out) if jobID > 0 { go b.waitAndPushJobResult(userID, jobID, cmd) } } return out } } return "❌ 无法识别,请使用标准命令" } return "❌ 无法识别,请使用标准命令" } else { if strings.Contains(err.Error(), "rate limited") { return "⚠️ AI 服务繁忙,请稍后再试或使用标准命令" } return "❌ 无法识别,请使用标准命令" } } // 仍然无法处理 return "❌ 无法识别,请使用标准命令" } func parseJobID(out string) uint { re := regexp.MustCompile(`job=(\d+)`) m := re.FindStringSubmatch(out) if len(m) < 2 { return 0 } n, _ := strconv.Atoi(m[1]) if n <= 0 { return 0 } return uint(n) } func (b *QQBot) waitAndPushJobResult(userID string, jobID uint, cmd string) { if b.db == nil { return } var job models.OpsJob for i := 0; i < 15; i++ { // 最多等 ~30s time.Sleep(2 * time.Second) if err := b.db.First(&job, jobID).Error; err != nil { continue } if job.Status == "pending" || job.Status == "running" { continue } break } if job.ID == 0 || job.Status == "pending" || job.Status == "running" { return } msg := formatJobResult(b.db, jobID, cmd) if strings.TrimSpace(msg) == "" { return } _, err := b.api.PostC2CMessage(context.Background(), userID, dto.MessageToCreate{Content: msg}) if err != nil { log.Printf("QQ 推送任务结果失败: %v", err) } } func formatJobResult(db *gorm.DB, jobID uint, cmd string) string { var job models.OpsJob if err := db.First(&job, jobID).Error; err != nil { return "" } if job.Runbook == "cpa_status" { return formatCPAStatusResult(db, jobID, job.Status) } if job.Runbook == "cpa_usage_backup" { return formatCPAUsageBackupResult(db, jobID) } if job.Runbook == "cf_zones" { return formatCFZonesResult(db, jobID, cmd) } if job.Runbook == "cf_workers_list" { return formatCFWorkersResult(db, jobID, cmd) } return fmt.Sprintf("📦 %s 结果:%s (job=%d)", strings.TrimSpace(cmd), job.Status, jobID) } func formatCFZonesResult(db *gorm.DB, jobID uint, cmd string) string { var steps []models.OpsJobStep _ = db.Where("job_id = ?", jobID).Order("id asc").Find(&steps).Error var raw string for _, st := range steps { if st.StepID == "list_zones" { raw = strings.TrimSpace(st.StdoutTail) break } } if raw == "" { return fmt.Sprintf("📦 %s 结果:success (job=%d)", strings.TrimSpace(cmd), jobID) } arr := gjson.Get(raw, "zones").Array() if len(arr) == 0 { arr = gjson.Get(raw, "result").Array() } if len(arr) == 0 { return fmt.Sprintf("📦 %s 结果:success (job=%d)\n(no zones)", strings.TrimSpace(cmd), jobID) } lines := make([]string, 0, len(arr)+2) lines = append(lines, fmt.Sprintf("✅ %s 完成 (job=%d)", strings.TrimSpace(cmd), jobID)) limit := len(arr) if limit > 50 { limit = 50 } for i := 0; i < limit; i++ { name := arr[i].Get("name").String() id := arr[i].Get("id").String() if name == "" && id == "" { continue } if id != "" { lines = append(lines, fmt.Sprintf("- %s (%s)", name, id)) } else { lines = append(lines, fmt.Sprintf("- %s", name)) } } if len(arr) > limit { lines = append(lines, fmt.Sprintf("... 共 %d 个,已展示前 %d 个", len(arr), limit)) } return strings.Join(lines, "\n") } func formatCFWorkersResult(db *gorm.DB, jobID uint, cmd string) string { var steps []models.OpsJobStep _ = db.Where("job_id = ?", jobID).Order("id asc").Find(&steps).Error var raw string for _, st := range steps { if st.StepID == "list_workers" { raw = strings.TrimSpace(st.StdoutTail) break } } if raw == "" { return fmt.Sprintf("📦 %s 结果:success (job=%d)", strings.TrimSpace(cmd), jobID) } arr := gjson.Get(raw, "workers").Array() if len(arr) == 0 { arr = gjson.Get(raw, "result").Array() } lines := make([]string, 0, len(arr)+2) lines = append(lines, fmt.Sprintf("✅ %s 完成 (job=%d)", strings.TrimSpace(cmd), jobID)) if len(arr) == 0 { lines = append(lines, "(no workers)") return strings.Join(lines, "\n") } limit := len(arr) if limit > 50 { limit = 50 } for i := 0; i < limit; i++ { name := arr[i].Get("id").String() if name == "" { name = arr[i].Get("name").String() } if name == "" { continue } lines = append(lines, fmt.Sprintf("- %s", name)) } if len(arr) > limit { lines = append(lines, fmt.Sprintf("... 共 %d 个,已展示前 %d 个", len(arr), limit)) } return strings.Join(lines, "\n") } func formatCPAUsageBackupResult(db *gorm.DB, jobID uint) string { var steps []models.OpsJobStep _ = db.Where("job_id = ?", jobID).Order("id asc").Find(&steps).Error var raw string for _, st := range steps { if st.StepID == "export_and_package" { raw = strings.TrimSpace(st.StdoutTail) break } } if raw == "" { return fmt.Sprintf("✅ /cpa usage backup 执行成功(job=%d)", jobID) } backup := "" for _, line := range strings.Split(raw, "\n") { if strings.HasPrefix(line, "backup=") { backup = strings.TrimSpace(strings.TrimPrefix(line, "backup=")) break } } if backup == "" { return fmt.Sprintf("✅ /cpa usage backup 执行成功(job=%d)", jobID) } file := path.Base(backup) return fmt.Sprintf("✅ /cpa usage backup 执行成功(job=%d)\n📦 备份文件:%s\n📁 路径:%s", jobID, file, path.Dir(backup)+"/") } func formatCPAStatusResult(db *gorm.DB, jobID uint, status string) string { var steps []models.OpsJobStep _ = db.Where("job_id = ?", jobID).Order("id asc").Find(&steps).Error var svc, usage string for _, st := range steps { if st.StepID == "service_status" { svc = strings.TrimSpace(st.StdoutTail) } if st.StepID == "usage_snapshot" { usage = strings.TrimSpace(st.StdoutTail) } } tr := gjson.Get(usage, "usage.total_requests").String() tt := gjson.Get(usage, "usage.total_tokens").String() if tr == "" { tr = "-" } if tt == "" { tt = "-" } return fmt.Sprintf("✅ /cpa status 完成 (job=%d)\nservice=%s\nrequests=%s\ntokens=%s", jobID, svc, tr, tt) } func (b *QQBot) channelATMessageHandler() event.ATMessageEventHandler { return func(ev *dto.WSPayload, data *dto.WSATMessageData) error { eventID := "qq:channel:" + strings.TrimSpace(data.ID) if b.isDuplicate(eventID) { return nil } log.Printf("📩 inbound platform=qqbot_official event=%s chat=%s user=%s text=%q", eventID, data.ChannelID, data.Author.ID, strings.TrimSpace(message.ETLInput(data.Content))) reply := b.processAndReply(data.Author.ID, data.Content) if reply == "" { return nil } _, err := b.api.PostMessage(context.Background(), data.ChannelID, &dto.MessageToCreate{MsgID: data.ID, Content: reply}) if err != nil { log.Printf("QQ频道消息发送失败: %v", err) } return nil } } func (b *QQBot) groupATMessageHandler() event.GroupATMessageEventHandler { return func(ev *dto.WSPayload, data *dto.WSGroupATMessageData) error { eventID := "qq:group:" + strings.TrimSpace(data.ID) if b.isDuplicate(eventID) { return nil } log.Printf("📩 inbound platform=qqbot_official event=%s chat=%s user=%s text=%q", eventID, data.GroupID, data.Author.ID, strings.TrimSpace(message.ETLInput(data.Content))) reply := b.processAndReply(data.Author.ID, data.Content) if reply == "" { return nil } _, err := b.api.PostGroupMessage(context.Background(), data.GroupID, dto.MessageToCreate{MsgID: data.ID, Content: reply}) if err != nil { log.Printf("QQ群消息发送失败: %v", err) } return nil } } func (b *QQBot) c2cMessageHandler() event.C2CMessageEventHandler { return func(ev *dto.WSPayload, data *dto.WSC2CMessageData) error { eventID := "qq:c2c:" + strings.TrimSpace(data.ID) if b.isDuplicate(eventID) { return nil } log.Printf("📩 inbound platform=qqbot_official event=%s chat=%s user=%s text=%q", eventID, data.Author.ID, data.Author.ID, strings.TrimSpace(message.ETLInput(data.Content))) reply := b.processAndReply(data.Author.ID, data.Content) if reply == "" { return nil } _, err := b.api.PostC2CMessage(context.Background(), data.Author.ID, dto.MessageToCreate{MsgID: data.ID, Content: reply}) if err != nil { log.Printf("QQ私聊消息发送失败: %v", err) } return nil } }