172 lines
4.7 KiB
Go
172 lines
4.7 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"errors"
|
|
"log"
|
|
"time"
|
|
|
|
"asset-tracker/internal/metrics"
|
|
"asset-tracker/internal/model"
|
|
|
|
"github.com/robfig/cron/v3"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
func StartReminderScan(db *gorm.DB) *cron.Cron {
|
|
c := cron.New(cron.WithSeconds())
|
|
|
|
_, err := c.AddFunc("0 */5 * * * *", func() {
|
|
runReminderScan(db)
|
|
})
|
|
if err != nil {
|
|
log.Printf("[scheduler] add reminder scan job failed: %v", err)
|
|
return c
|
|
}
|
|
|
|
_, err = c.AddFunc("0 10 2 * * *", func() {
|
|
runCompensationScan(db)
|
|
})
|
|
if err != nil {
|
|
log.Printf("[scheduler] add compensation job failed: %v", err)
|
|
return c
|
|
}
|
|
|
|
c.Start()
|
|
log.Println("[scheduler] reminder scan started: every 5 minutes, compensation daily 02:10")
|
|
return c
|
|
}
|
|
|
|
func runReminderScan(db *gorm.DB) {
|
|
now := time.Now().UTC()
|
|
windowEnd := now.Add(5 * time.Minute)
|
|
|
|
var pending []model.Reminder
|
|
qStart := time.Now()
|
|
err := db.Where("status = ? AND remind_at <= ?", "pending", windowEnd).Order("status asc, remind_at asc").Limit(200).Find(&pending).Error
|
|
metrics.ObserveDB("scan_pending", "reminders", err == nil, time.Since(qStart))
|
|
if err != nil {
|
|
log.Printf("[scheduler] pending reminder query error: %v", err)
|
|
return
|
|
}
|
|
|
|
for _, r := range pending {
|
|
processReminder(db, r, now)
|
|
}
|
|
|
|
var failed []model.Reminder
|
|
qStart = time.Now()
|
|
err = db.Where("status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?", "failed", now).Order("status asc, next_retry_at asc").Limit(200).Find(&failed).Error
|
|
metrics.ObserveDB("scan_failed", "reminders", err == nil, time.Since(qStart))
|
|
if err != nil {
|
|
log.Printf("[scheduler] failed reminder query error: %v", err)
|
|
return
|
|
}
|
|
for _, r := range failed {
|
|
processReminder(db, r, now)
|
|
}
|
|
}
|
|
|
|
func processReminder(db *gorm.DB, r model.Reminder, now time.Time) {
|
|
claim := db.Model(&model.Reminder{}).Where("id = ? AND status IN ?", r.ID, []string{"pending", "failed"}).Updates(map[string]interface{}{
|
|
"status": "sending",
|
|
"last_error": "",
|
|
"next_retry_at": nil,
|
|
})
|
|
if claim.Error != nil {
|
|
log.Printf("[scheduler] claim reminder id=%d failed: %v", r.ID, claim.Error)
|
|
return
|
|
}
|
|
if claim.RowsAffected == 0 {
|
|
return
|
|
}
|
|
|
|
if err := deliverReminder(r); err != nil {
|
|
metrics.ReminderSendTotal.WithLabelValues("failed").Inc()
|
|
retryCount := r.RetryCount + 1
|
|
if retryCount >= maxRetryCount() {
|
|
_ = db.Transaction(func(tx *gorm.DB) error {
|
|
if err := tx.Model(&model.Reminder{}).Where("id = ?", r.ID).Updates(map[string]interface{}{
|
|
"status": "failed",
|
|
"retry_count": retryCount,
|
|
"last_error": "retry limit reached: " + err.Error(),
|
|
"next_retry_at": nil,
|
|
}).Error; err != nil {
|
|
return err
|
|
}
|
|
dl := model.ReminderDeadLetter{
|
|
ReminderID: r.ID,
|
|
UserID: r.UserID,
|
|
AssetID: r.AssetID,
|
|
RemindAt: r.RemindAt,
|
|
Channel: r.Channel,
|
|
Status: "failed",
|
|
RetryCount: retryCount,
|
|
LastError: "retry limit reached: " + err.Error(),
|
|
}
|
|
return tx.Where("reminder_id = ?", r.ID).FirstOrCreate(&dl).Error
|
|
})
|
|
return
|
|
}
|
|
metrics.ReminderRetryTotal.Inc()
|
|
next := now.Add(retryDelay(retryCount))
|
|
_ = db.Model(&model.Reminder{}).Where("id = ?", r.ID).Updates(map[string]interface{}{
|
|
"status": "failed",
|
|
"retry_count": retryCount,
|
|
"next_retry_at": &next,
|
|
"last_error": err.Error(),
|
|
}).Error
|
|
return
|
|
}
|
|
|
|
metrics.ReminderSendTotal.WithLabelValues("sent").Inc()
|
|
sentAt := now
|
|
_ = db.Model(&model.Reminder{}).Where("id = ?", r.ID).Updates(map[string]interface{}{
|
|
"status": "sent",
|
|
"sent_at": &sentAt,
|
|
"last_error": "",
|
|
"next_retry_at": nil,
|
|
}).Error
|
|
}
|
|
|
|
func runCompensationScan(db *gorm.DB) {
|
|
now := time.Now().UTC()
|
|
cutoff := now.Add(-10 * time.Minute)
|
|
var missed []model.Reminder
|
|
qStart := time.Now()
|
|
err := db.Where("status = ? AND remind_at <= ?", "pending", cutoff).Order("remind_at asc").Limit(500).Find(&missed).Error
|
|
metrics.ObserveDB("compensation_scan", "reminders", err == nil, time.Since(qStart))
|
|
if err != nil {
|
|
log.Printf("[scheduler] compensation query error: %v", err)
|
|
return
|
|
}
|
|
if len(missed) > 0 {
|
|
log.Printf("[scheduler] compensation scan found %d pending overdue reminders", len(missed))
|
|
}
|
|
for _, r := range missed {
|
|
processReminder(db, r, now)
|
|
}
|
|
}
|
|
|
|
func deliverReminder(r model.Reminder) error {
|
|
if r.Channel != "in_app" {
|
|
return errors.New("unsupported channel")
|
|
}
|
|
log.Printf("[reminder] user=%d asset=%d channel=%s remind_at=%s dedupe=%s", r.UserID, r.AssetID, r.Channel, r.RemindAt.Format(time.RFC3339), r.DedupeKey)
|
|
return nil
|
|
}
|
|
|
|
func retryDelay(retryCount int) time.Duration {
|
|
switch retryCount {
|
|
case 1:
|
|
return 5 * time.Minute
|
|
case 2:
|
|
return 30 * time.Minute
|
|
default:
|
|
return 2 * time.Hour
|
|
}
|
|
}
|
|
|
|
func maxRetryCount() int {
|
|
return 8
|
|
}
|