package runbook import ( "bytes" "context" "encoding/json" "fmt" "os" "os/exec" "path/filepath" "strconv" "strings" "time" "ops-assistant/internal/core/ecode" "ops-assistant/models" "gorm.io/gorm" ) type Executor struct { db *gorm.DB runbookDir string } func NewExecutor(db *gorm.DB, runbookDir string) *Executor { return &Executor{db: db, runbookDir: runbookDir} } func (e *Executor) Run(commandText, runbookName string, operator int64) (uint, string, error) { return e.RunWithInputsAndMeta(commandText, runbookName, operator, map[string]string{}, NewMeta()) } func (e *Executor) RunWithInputs(commandText, runbookName string, operator int64, inputs map[string]string) (uint, string, error) { return e.RunWithInputsAndMeta(commandText, runbookName, operator, inputs, NewMeta()) } func (e *Executor) RunWithInputsAndMeta(commandText, runbookName string, operator int64, inputs map[string]string, meta RunMeta) (uint, string, error) { started := time.Now() inputJSON := "{}" if b, err := json.Marshal(inputs); err == nil { inputJSON = string(b) } job := models.OpsJob{ Command: commandText, Runbook: runbookName, Operator: operator, Target: strings.TrimSpace(meta.Target), RiskLevel: strings.TrimSpace(meta.RiskLevel), RequestID: strings.TrimSpace(meta.RequestID), ConfirmHash: strings.TrimSpace(meta.ConfirmHash), InputJSON: inputJSON, Status: "pending", StartedAt: started, } if job.RiskLevel == "" { job.RiskLevel = "low" } if err := e.db.Create(&job).Error; err != nil { return 0, "", err } release := acquireTargetLock(job.Target) defer release() job.Status = "running" _ = e.db.Save(&job).Error specPath := filepath.Join(e.runbookDir, runbookName+".yaml") data, err := os.ReadFile(specPath) if err != nil { e.finishJob(&job, "failed", "runbook not found") return job.ID, "", err } spec, err := Parse(data) if err != nil { e.finishJob(&job, "failed", "runbook parse failed") return job.ID, "", err } outputs := map[string]string{} ctx := map[string]string{} jobCtx, jobCancel := context.WithCancel(context.Background()) registerJobCancel(job.ID, jobCancel) defer clearJobCancel(job.ID) for k, v := range inputs { ctx["inputs."+k] = v } if t := strings.TrimSpace(os.Getenv("CPA_MANAGEMENT_BASE")); t != "" { ctx["env.cpa_management_base"] = t } else { var sset models.AppSetting if err := e.db.Where("key = ?", "cpa_management_base").First(&sset).Error; err == nil { if strings.TrimSpace(sset.Value) != "" { ctx["env.cpa_management_base"] = strings.TrimSpace(sset.Value) } } } if t := strings.TrimSpace(os.Getenv("CPA_MANAGEMENT_TOKEN")); t != "" { ctx["env.cpa_management_token"] = t } else { var sset models.AppSetting if err := e.db.Where("key = ?", "cpa_management_token").First(&sset).Error; err == nil { if strings.TrimSpace(sset.Value) != "" { ctx["env.cpa_management_token"] = strings.TrimSpace(sset.Value) } } } // Cloudflare settings if t := strings.TrimSpace(os.Getenv("CF_ACCOUNT_ID")); t != "" { ctx["env_cf_account_id"] = t } else { var sset models.AppSetting if err := e.db.Where("key = ?", "cf_account_id").First(&sset).Error; err == nil { if strings.TrimSpace(sset.Value) != "" { ctx["env_cf_account_id"] = strings.TrimSpace(sset.Value) } } } if t := strings.TrimSpace(os.Getenv("CF_API_EMAIL")); t != "" { ctx["env_cf_api_email"] = t } else { var sset models.AppSetting if err := e.db.Where("key = ?", "cf_api_email").First(&sset).Error; err == nil { if strings.TrimSpace(sset.Value) != "" { ctx["env_cf_api_email"] = strings.TrimSpace(sset.Value) } } } if t := strings.TrimSpace(os.Getenv("CF_API_TOKEN")); t != "" { ctx["env_cf_api_token"] = t } else { var sset models.AppSetting if err := e.db.Where("key = ?", "cf_api_token").First(&sset).Error; err == nil { if strings.TrimSpace(sset.Value) != "" { ctx["env_cf_api_token"] = strings.TrimSpace(sset.Value) } } } // inject input env vars for runbook steps for k, v := range inputs { if strings.TrimSpace(v) != "" { ctx["env.INPUT_"+strings.ToUpper(k)] = v } } for _, st := range spec.Steps { if isJobCancelled(e.db, job.ID) { e.finishJob(&job, "cancelled", ecode.Tag(ecode.ErrJobCancelled, "cancelled by user")) return job.ID, "", fmt.Errorf(ecode.Tag(ecode.ErrJobCancelled, "cancelled by user")) } rendered := renderStep(st, ctx) step := models.OpsJobStep{JobID: job.ID, StepID: rendered.ID, Action: rendered.Action, Status: "running", StartedAt: time.Now()} _ = e.db.Create(&step).Error timeout := meta.timeoutOrDefault() rc, stdout, stderr, runErr := e.execStep(jobCtx, rendered, outputs, timeout) step.RC = rc step.StdoutTail = tail(stdout, 1200) step.StderrTail = tail(stderr, 1200) step.EndedAt = time.Now() if runErr != nil || rc != 0 { step.Status = "failed" _ = e.db.Save(&step).Error e.finishJob(&job, "failed", fmt.Sprintf("%s: step=%s failed", ecode.ErrStepFailed, rendered.ID)) if runErr == nil { runErr = fmt.Errorf("rc=%d", rc) } return job.ID, "", fmt.Errorf(ecode.Tag(ecode.ErrStepFailed, fmt.Sprintf("step %s failed: %v", rendered.ID, runErr))) } step.Status = "success" _ = e.db.Save(&step).Error outputs[rendered.ID] = stdout ctx["steps."+rendered.ID+".output"] = stdout } e.finishJob(&job, "success", "ok") return job.ID, "ok", nil } func (e *Executor) execStep(parent context.Context, st Step, outputs map[string]string, timeout time.Duration) (int, string, string, error) { switch st.Action { case "ssh.exec": target := strings.TrimSpace(fmt.Sprintf("%v", st.With["target"])) cmdText := strings.TrimSpace(fmt.Sprintf("%v", st.With["command"])) if target == "" || cmdText == "" { return 1, "", "missing target/command", fmt.Errorf("missing target/command") } resolved := resolveTarget(e.db, target) if !resolved.Found { return 1, "", "invalid target", fmt.Errorf("invalid target: %s", target) } ctx, cancel := context.WithTimeout(parent, timeout) defer cancel() args := []string{"-p", strconv.Itoa(resolved.Port), resolved.User + "@" + resolved.Host, cmdText} cmd := exec.CommandContext(ctx, "ssh", args...) var outb, errb bytes.Buffer cmd.Stdout = &outb cmd.Stderr = &errb err := cmd.Run() rc := 0 if err != nil { rc = 1 if ctx.Err() == context.DeadlineExceeded { return rc, strings.TrimSpace(outb.String()), strings.TrimSpace(errb.String()), fmt.Errorf(ecode.Tag(ecode.ErrStepTimeout, "ssh step timeout")) } } return rc, strings.TrimSpace(outb.String()), strings.TrimSpace(errb.String()), err case "shell.exec": cmdText := strings.TrimSpace(fmt.Sprintf("%v", st.With["command"])) if cmdText == "" { return 1, "", "missing command", fmt.Errorf("missing command") } ctx, cancel := context.WithTimeout(parent, timeout) defer cancel() cmd := exec.CommandContext(ctx, "bash", "-lc", cmdText) var outb, errb bytes.Buffer cmd.Stdout = &outb cmd.Stderr = &errb err := cmd.Run() rc := 0 if err != nil { rc = 1 if ctx.Err() == context.DeadlineExceeded { return rc, strings.TrimSpace(outb.String()), strings.TrimSpace(errb.String()), fmt.Errorf(ecode.Tag(ecode.ErrStepTimeout, "shell step timeout")) } } return rc, strings.TrimSpace(outb.String()), strings.TrimSpace(errb.String()), err case "assert.json": sourceStep := strings.TrimSpace(fmt.Sprintf("%v", st.With["source_step"])) if sourceStep == "" { return 1, "", "missing source_step", fmt.Errorf("missing source_step") } raw, ok := outputs[sourceStep] if !ok { return 1, "", "source step output not found", fmt.Errorf("source step output not found: %s", sourceStep) } var payload any if err := json.Unmarshal([]byte(raw), &payload); err != nil { return 1, "", "invalid json", err } rules := parseRequiredPaths(st.With["required_paths"]) if len(rules) == 0 { return 1, "", "required_paths empty", fmt.Errorf("required_paths empty") } for _, p := range rules { if _, ok := lookupPath(payload, p); !ok { return 1, "", "json path not found: " + p, fmt.Errorf("json path not found: %s", p) } } return 0, "assert ok", "", nil case "sleep": ms := 1000 if v, ok := st.With["ms"]; ok { switch t := v.(type) { case int: ms = t case int64: ms = int(t) case float64: ms = int(t) case string: if n, err := strconv.Atoi(strings.TrimSpace(t)); err == nil { ms = n } } } if ms < 0 { ms = 0 } time.Sleep(time.Duration(ms) * time.Millisecond) return 0, fmt.Sprintf("slept %dms", ms), "", nil default: return 1, "", "unsupported action", fmt.Errorf("unsupported action: %s", st.Action) } } func renderStep(st Step, ctx map[string]string) Step { out := st out.ID = renderString(out.ID, ctx) out.Action = renderString(out.Action, ctx) if out.With == nil { return out } m := make(map[string]any, len(out.With)) for k, v := range out.With { switch t := v.(type) { case string: m[k] = renderString(t, ctx) case []any: arr := make([]any, 0, len(t)) for _, it := range t { if s, ok := it.(string); ok { arr = append(arr, renderString(s, ctx)) } else { arr = append(arr, it) } } m[k] = arr default: m[k] = v } } out.With = m return out } func renderString(s string, ctx map[string]string) string { res := s for k, v := range ctx { res = strings.ReplaceAll(res, "${"+k+"}", v) } return res } func parseRequiredPaths(v any) []string { res := []string{} switch t := v.(type) { case []any: for _, it := range t { res = append(res, strings.TrimSpace(fmt.Sprintf("%v", it))) } case []string: for _, it := range t { res = append(res, strings.TrimSpace(it)) } } out := make([]string, 0, len(res)) for _, p := range res { if p != "" { out = append(out, p) } } return out } func lookupPath(root any, path string) (any, bool) { parts := strings.Split(path, ".") cur := root for _, part := range parts { part = strings.TrimSpace(part) if part == "" { return nil, false } m, ok := cur.(map[string]any) if !ok { return nil, false } next, exists := m[part] if !exists { return nil, false } cur = next } return cur, true } func (e *Executor) finishJob(job *models.OpsJob, status, summary string) { job.Status = status job.Summary = summary job.EndedAt = time.Now() _ = e.db.Save(job).Error } func isJobCancelled(db *gorm.DB, jobID uint) bool { var j models.OpsJob if err := db.Select("status").First(&j, jobID).Error; err != nil { return false } return strings.EqualFold(strings.TrimSpace(j.Status), "cancelled") } func tail(s string, max int) string { s = strings.TrimSpace(s) if len(s) <= max { return s } return s[len(s)-max:] }