fix: harden ops runbooks and execution
This commit is contained in:
60
internal/core/ops/retry.go
Normal file
60
internal/core/ops/retry.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package ops
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"ops-assistant/internal/core/runbook"
|
||||
"ops-assistant/models"
|
||||
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func decodeInputJSON(raw string, out *map[string]string) error {
|
||||
if strings.TrimSpace(raw) == "" {
|
||||
return nil
|
||||
}
|
||||
return json.Unmarshal([]byte(raw), out)
|
||||
}
|
||||
|
||||
func RetryJobWithDB(db *gorm.DB, baseDir string, jobID uint) (uint, error) {
|
||||
if db == nil {
|
||||
return 0, errors.New("db is nil")
|
||||
}
|
||||
var old models.OpsJob
|
||||
if err := db.First(&old, jobID).Error; err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if strings.TrimSpace(old.Status) != "failed" {
|
||||
return 0, errors.New("only failed jobs can retry")
|
||||
}
|
||||
|
||||
inputs := map[string]string{}
|
||||
if strings.TrimSpace(old.InputJSON) != "" {
|
||||
_ = decodeInputJSON(old.InputJSON, &inputs)
|
||||
}
|
||||
|
||||
meta := runbook.NewMeta()
|
||||
meta.Target = old.Target
|
||||
meta.RiskLevel = old.RiskLevel
|
||||
meta.RequestID = old.RequestID + "-retry"
|
||||
meta.ConfirmHash = old.ConfirmHash
|
||||
|
||||
exec := runbook.NewExecutor(db, filepath.Join(baseDir, "runbooks"))
|
||||
newID, _, err := exec.RunWithInputsAndMeta(old.Command, old.Runbook, old.Operator, inputs, meta)
|
||||
if err != nil {
|
||||
return newID, err
|
||||
}
|
||||
return newID, nil
|
||||
}
|
||||
|
||||
func RetryJob(dbPath, baseDir string, jobID uint) (uint, error) {
|
||||
db, err := gorm.Open(sqlite.Open(dbPath), &gorm.Config{})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return RetryJobWithDB(db, baseDir, jobID)
|
||||
}
|
||||
32
internal/core/runbook/cancel.go
Normal file
32
internal/core/runbook/cancel.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package runbook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var jobCancelMap sync.Map
|
||||
|
||||
func registerJobCancel(jobID uint, cancel context.CancelFunc) {
|
||||
jobCancelMap.Store(jobID, cancel)
|
||||
}
|
||||
|
||||
func clearJobCancel(jobID uint) {
|
||||
if v, ok := jobCancelMap.Load(jobID); ok {
|
||||
if cancel, ok2 := v.(context.CancelFunc); ok2 {
|
||||
cancel()
|
||||
}
|
||||
jobCancelMap.Delete(jobID)
|
||||
}
|
||||
}
|
||||
|
||||
func CancelJob(jobID uint) bool {
|
||||
if v, ok := jobCancelMap.Load(jobID); ok {
|
||||
if cancel, ok2 := v.(context.CancelFunc); ok2 {
|
||||
cancel()
|
||||
}
|
||||
jobCancelMap.Delete(jobID)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
387
internal/core/runbook/executor.go
Normal file
387
internal/core/runbook/executor.go
Normal file
@@ -0,0 +1,387 @@
|
||||
package runbook
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ops-assistant/internal/core/ecode"
|
||||
"ops-assistant/models"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Executor struct {
|
||||
db *gorm.DB
|
||||
runbookDir string
|
||||
}
|
||||
|
||||
func NewExecutor(db *gorm.DB, runbookDir string) *Executor {
|
||||
return &Executor{db: db, runbookDir: runbookDir}
|
||||
}
|
||||
|
||||
func (e *Executor) Run(commandText, runbookName string, operator int64) (uint, string, error) {
|
||||
return e.RunWithInputsAndMeta(commandText, runbookName, operator, map[string]string{}, NewMeta())
|
||||
}
|
||||
|
||||
func (e *Executor) RunWithInputs(commandText, runbookName string, operator int64, inputs map[string]string) (uint, string, error) {
|
||||
return e.RunWithInputsAndMeta(commandText, runbookName, operator, inputs, NewMeta())
|
||||
}
|
||||
|
||||
func (e *Executor) RunWithInputsAndMeta(commandText, runbookName string, operator int64, inputs map[string]string, meta RunMeta) (uint, string, error) {
|
||||
started := time.Now()
|
||||
inputJSON := "{}"
|
||||
if b, err := json.Marshal(inputs); err == nil {
|
||||
inputJSON = string(b)
|
||||
}
|
||||
job := models.OpsJob{
|
||||
Command: commandText,
|
||||
Runbook: runbookName,
|
||||
Operator: operator,
|
||||
Target: strings.TrimSpace(meta.Target),
|
||||
RiskLevel: strings.TrimSpace(meta.RiskLevel),
|
||||
RequestID: strings.TrimSpace(meta.RequestID),
|
||||
ConfirmHash: strings.TrimSpace(meta.ConfirmHash),
|
||||
InputJSON: inputJSON,
|
||||
Status: "pending",
|
||||
StartedAt: started,
|
||||
}
|
||||
if job.RiskLevel == "" {
|
||||
job.RiskLevel = "low"
|
||||
}
|
||||
if err := e.db.Create(&job).Error; err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
|
||||
release := acquireTargetLock(job.Target)
|
||||
defer release()
|
||||
|
||||
job.Status = "running"
|
||||
_ = e.db.Save(&job).Error
|
||||
|
||||
specPath := filepath.Join(e.runbookDir, runbookName+".yaml")
|
||||
data, err := os.ReadFile(specPath)
|
||||
if err != nil {
|
||||
e.finishJob(&job, "failed", "runbook not found")
|
||||
return job.ID, "", err
|
||||
}
|
||||
spec, err := Parse(data)
|
||||
if err != nil {
|
||||
e.finishJob(&job, "failed", "runbook parse failed")
|
||||
return job.ID, "", err
|
||||
}
|
||||
|
||||
outputs := map[string]string{}
|
||||
ctx := map[string]string{}
|
||||
|
||||
jobCtx, jobCancel := context.WithCancel(context.Background())
|
||||
registerJobCancel(job.ID, jobCancel)
|
||||
defer clearJobCancel(job.ID)
|
||||
for k, v := range inputs {
|
||||
ctx["inputs."+k] = v
|
||||
}
|
||||
if t := strings.TrimSpace(os.Getenv("CPA_MANAGEMENT_BASE")); t != "" {
|
||||
ctx["env.cpa_management_base"] = t
|
||||
} else {
|
||||
var sset models.AppSetting
|
||||
if err := e.db.Where("key = ?", "cpa_management_base").First(&sset).Error; err == nil {
|
||||
if strings.TrimSpace(sset.Value) != "" {
|
||||
ctx["env.cpa_management_base"] = strings.TrimSpace(sset.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
if t := strings.TrimSpace(os.Getenv("CPA_MANAGEMENT_TOKEN")); t != "" {
|
||||
ctx["env.cpa_management_token"] = t
|
||||
} else {
|
||||
var sset models.AppSetting
|
||||
if err := e.db.Where("key = ?", "cpa_management_token").First(&sset).Error; err == nil {
|
||||
if strings.TrimSpace(sset.Value) != "" {
|
||||
ctx["env.cpa_management_token"] = strings.TrimSpace(sset.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Cloudflare settings
|
||||
if t := strings.TrimSpace(os.Getenv("CF_ACCOUNT_ID")); t != "" {
|
||||
ctx["env_cf_account_id"] = t
|
||||
} else {
|
||||
var sset models.AppSetting
|
||||
if err := e.db.Where("key = ?", "cf_account_id").First(&sset).Error; err == nil {
|
||||
if strings.TrimSpace(sset.Value) != "" {
|
||||
ctx["env_cf_account_id"] = strings.TrimSpace(sset.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
if t := strings.TrimSpace(os.Getenv("CF_API_EMAIL")); t != "" {
|
||||
ctx["env_cf_api_email"] = t
|
||||
} else {
|
||||
var sset models.AppSetting
|
||||
if err := e.db.Where("key = ?", "cf_api_email").First(&sset).Error; err == nil {
|
||||
if strings.TrimSpace(sset.Value) != "" {
|
||||
ctx["env_cf_api_email"] = strings.TrimSpace(sset.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
if t := strings.TrimSpace(os.Getenv("CF_API_TOKEN")); t != "" {
|
||||
ctx["env_cf_api_token"] = t
|
||||
} else {
|
||||
var sset models.AppSetting
|
||||
if err := e.db.Where("key = ?", "cf_api_token").First(&sset).Error; err == nil {
|
||||
if strings.TrimSpace(sset.Value) != "" {
|
||||
ctx["env_cf_api_token"] = strings.TrimSpace(sset.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// inject input env vars for runbook steps
|
||||
for k, v := range inputs {
|
||||
if strings.TrimSpace(v) != "" {
|
||||
ctx["env.INPUT_"+strings.ToUpper(k)] = v
|
||||
}
|
||||
}
|
||||
for _, st := range spec.Steps {
|
||||
if isJobCancelled(e.db, job.ID) {
|
||||
e.finishJob(&job, "cancelled", ecode.Tag(ecode.ErrJobCancelled, "cancelled by user"))
|
||||
return job.ID, "", fmt.Errorf(ecode.Tag(ecode.ErrJobCancelled, "cancelled by user"))
|
||||
}
|
||||
|
||||
rendered := renderStep(st, ctx)
|
||||
step := models.OpsJobStep{JobID: job.ID, StepID: rendered.ID, Action: rendered.Action, Status: "running", StartedAt: time.Now()}
|
||||
_ = e.db.Create(&step).Error
|
||||
|
||||
timeout := meta.timeoutOrDefault()
|
||||
rc, stdout, stderr, runErr := e.execStep(jobCtx, rendered, outputs, timeout)
|
||||
step.RC = rc
|
||||
step.StdoutTail = tail(stdout, 1200)
|
||||
step.StderrTail = tail(stderr, 1200)
|
||||
step.EndedAt = time.Now()
|
||||
if runErr != nil || rc != 0 {
|
||||
step.Status = "failed"
|
||||
_ = e.db.Save(&step).Error
|
||||
e.finishJob(&job, "failed", fmt.Sprintf("%s: step=%s failed", ecode.ErrStepFailed, rendered.ID))
|
||||
if runErr == nil {
|
||||
runErr = fmt.Errorf("rc=%d", rc)
|
||||
}
|
||||
return job.ID, "", fmt.Errorf(ecode.Tag(ecode.ErrStepFailed, fmt.Sprintf("step %s failed: %v", rendered.ID, runErr)))
|
||||
}
|
||||
step.Status = "success"
|
||||
_ = e.db.Save(&step).Error
|
||||
outputs[rendered.ID] = stdout
|
||||
ctx["steps."+rendered.ID+".output"] = stdout
|
||||
}
|
||||
|
||||
e.finishJob(&job, "success", "ok")
|
||||
return job.ID, "ok", nil
|
||||
}
|
||||
|
||||
func (e *Executor) execStep(parent context.Context, st Step, outputs map[string]string, timeout time.Duration) (int, string, string, error) {
|
||||
switch st.Action {
|
||||
case "ssh.exec":
|
||||
target := strings.TrimSpace(fmt.Sprintf("%v", st.With["target"]))
|
||||
cmdText := strings.TrimSpace(fmt.Sprintf("%v", st.With["command"]))
|
||||
if target == "" || cmdText == "" {
|
||||
return 1, "", "missing target/command", fmt.Errorf("missing target/command")
|
||||
}
|
||||
resolved := resolveTarget(e.db, target)
|
||||
if !resolved.Found {
|
||||
return 1, "", "invalid target", fmt.Errorf("invalid target: %s", target)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(parent, timeout)
|
||||
defer cancel()
|
||||
args := []string{"-p", strconv.Itoa(resolved.Port), resolved.User + "@" + resolved.Host, cmdText}
|
||||
cmd := exec.CommandContext(ctx, "ssh", args...)
|
||||
var outb, errb bytes.Buffer
|
||||
cmd.Stdout = &outb
|
||||
cmd.Stderr = &errb
|
||||
err := cmd.Run()
|
||||
rc := 0
|
||||
if err != nil {
|
||||
rc = 1
|
||||
if ctx.Err() == context.DeadlineExceeded {
|
||||
return rc, strings.TrimSpace(outb.String()), strings.TrimSpace(errb.String()), fmt.Errorf(ecode.Tag(ecode.ErrStepTimeout, "ssh step timeout"))
|
||||
}
|
||||
}
|
||||
return rc, strings.TrimSpace(outb.String()), strings.TrimSpace(errb.String()), err
|
||||
|
||||
case "shell.exec":
|
||||
cmdText := strings.TrimSpace(fmt.Sprintf("%v", st.With["command"]))
|
||||
if cmdText == "" {
|
||||
return 1, "", "missing command", fmt.Errorf("missing command")
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(parent, timeout)
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(ctx, "bash", "-lc", cmdText)
|
||||
var outb, errb bytes.Buffer
|
||||
cmd.Stdout = &outb
|
||||
cmd.Stderr = &errb
|
||||
err := cmd.Run()
|
||||
rc := 0
|
||||
if err != nil {
|
||||
rc = 1
|
||||
if ctx.Err() == context.DeadlineExceeded {
|
||||
return rc, strings.TrimSpace(outb.String()), strings.TrimSpace(errb.String()), fmt.Errorf(ecode.Tag(ecode.ErrStepTimeout, "shell step timeout"))
|
||||
}
|
||||
}
|
||||
return rc, strings.TrimSpace(outb.String()), strings.TrimSpace(errb.String()), err
|
||||
|
||||
case "assert.json":
|
||||
sourceStep := strings.TrimSpace(fmt.Sprintf("%v", st.With["source_step"]))
|
||||
if sourceStep == "" {
|
||||
return 1, "", "missing source_step", fmt.Errorf("missing source_step")
|
||||
}
|
||||
raw, ok := outputs[sourceStep]
|
||||
if !ok {
|
||||
return 1, "", "source step output not found", fmt.Errorf("source step output not found: %s", sourceStep)
|
||||
}
|
||||
|
||||
var payload any
|
||||
if err := json.Unmarshal([]byte(raw), &payload); err != nil {
|
||||
return 1, "", "invalid json", err
|
||||
}
|
||||
|
||||
rules := parseRequiredPaths(st.With["required_paths"])
|
||||
if len(rules) == 0 {
|
||||
return 1, "", "required_paths empty", fmt.Errorf("required_paths empty")
|
||||
}
|
||||
for _, p := range rules {
|
||||
if _, ok := lookupPath(payload, p); !ok {
|
||||
return 1, "", "json path not found: " + p, fmt.Errorf("json path not found: %s", p)
|
||||
}
|
||||
}
|
||||
return 0, "assert ok", "", nil
|
||||
|
||||
case "sleep":
|
||||
ms := 1000
|
||||
if v, ok := st.With["ms"]; ok {
|
||||
switch t := v.(type) {
|
||||
case int:
|
||||
ms = t
|
||||
case int64:
|
||||
ms = int(t)
|
||||
case float64:
|
||||
ms = int(t)
|
||||
case string:
|
||||
if n, err := strconv.Atoi(strings.TrimSpace(t)); err == nil {
|
||||
ms = n
|
||||
}
|
||||
}
|
||||
}
|
||||
if ms < 0 {
|
||||
ms = 0
|
||||
}
|
||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
||||
return 0, fmt.Sprintf("slept %dms", ms), "", nil
|
||||
|
||||
default:
|
||||
return 1, "", "unsupported action", fmt.Errorf("unsupported action: %s", st.Action)
|
||||
}
|
||||
}
|
||||
|
||||
func renderStep(st Step, ctx map[string]string) Step {
|
||||
out := st
|
||||
out.ID = renderString(out.ID, ctx)
|
||||
out.Action = renderString(out.Action, ctx)
|
||||
if out.With == nil {
|
||||
return out
|
||||
}
|
||||
m := make(map[string]any, len(out.With))
|
||||
for k, v := range out.With {
|
||||
switch t := v.(type) {
|
||||
case string:
|
||||
m[k] = renderString(t, ctx)
|
||||
case []any:
|
||||
arr := make([]any, 0, len(t))
|
||||
for _, it := range t {
|
||||
if s, ok := it.(string); ok {
|
||||
arr = append(arr, renderString(s, ctx))
|
||||
} else {
|
||||
arr = append(arr, it)
|
||||
}
|
||||
}
|
||||
m[k] = arr
|
||||
default:
|
||||
m[k] = v
|
||||
}
|
||||
}
|
||||
out.With = m
|
||||
return out
|
||||
}
|
||||
|
||||
func renderString(s string, ctx map[string]string) string {
|
||||
res := s
|
||||
for k, v := range ctx {
|
||||
res = strings.ReplaceAll(res, "${"+k+"}", v)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func parseRequiredPaths(v any) []string {
|
||||
res := []string{}
|
||||
switch t := v.(type) {
|
||||
case []any:
|
||||
for _, it := range t {
|
||||
res = append(res, strings.TrimSpace(fmt.Sprintf("%v", it)))
|
||||
}
|
||||
case []string:
|
||||
for _, it := range t {
|
||||
res = append(res, strings.TrimSpace(it))
|
||||
}
|
||||
}
|
||||
out := make([]string, 0, len(res))
|
||||
for _, p := range res {
|
||||
if p != "" {
|
||||
out = append(out, p)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func lookupPath(root any, path string) (any, bool) {
|
||||
parts := strings.Split(path, ".")
|
||||
cur := root
|
||||
for _, part := range parts {
|
||||
part = strings.TrimSpace(part)
|
||||
if part == "" {
|
||||
return nil, false
|
||||
}
|
||||
m, ok := cur.(map[string]any)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
next, exists := m[part]
|
||||
if !exists {
|
||||
return nil, false
|
||||
}
|
||||
cur = next
|
||||
}
|
||||
return cur, true
|
||||
}
|
||||
|
||||
func (e *Executor) finishJob(job *models.OpsJob, status, summary string) {
|
||||
job.Status = status
|
||||
job.Summary = summary
|
||||
job.EndedAt = time.Now()
|
||||
_ = e.db.Save(job).Error
|
||||
}
|
||||
|
||||
func isJobCancelled(db *gorm.DB, jobID uint) bool {
|
||||
var j models.OpsJob
|
||||
if err := db.Select("status").First(&j, jobID).Error; err != nil {
|
||||
return false
|
||||
}
|
||||
return strings.EqualFold(strings.TrimSpace(j.Status), "cancelled")
|
||||
}
|
||||
|
||||
func tail(s string, max int) string {
|
||||
s = strings.TrimSpace(s)
|
||||
if len(s) <= max {
|
||||
return s
|
||||
}
|
||||
return s[len(s)-max:]
|
||||
}
|
||||
37
internal/core/runbook/targets.go
Normal file
37
internal/core/runbook/targets.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package runbook
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"ops-assistant/models"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type ResolvedTarget struct {
|
||||
Found bool
|
||||
User string
|
||||
Host string
|
||||
Port int
|
||||
}
|
||||
|
||||
func resolveTarget(db *gorm.DB, name string) ResolvedTarget {
|
||||
trim := strings.TrimSpace(name)
|
||||
if trim == "" {
|
||||
return ResolvedTarget{}
|
||||
}
|
||||
var t models.OpsTarget
|
||||
if err := db.Where("name = ? AND enabled = ?", trim, true).First(&t).Error; err != nil {
|
||||
return ResolvedTarget{}
|
||||
}
|
||||
user := strings.TrimSpace(t.User)
|
||||
host := strings.TrimSpace(t.Host)
|
||||
port := t.Port
|
||||
if user == "" || host == "" {
|
||||
return ResolvedTarget{}
|
||||
}
|
||||
if port <= 0 {
|
||||
port = 22
|
||||
}
|
||||
return ResolvedTarget{Found: true, User: user, Host: host, Port: port}
|
||||
}
|
||||
Reference in New Issue
Block a user