217 lines
5.9 KiB
TypeScript
217 lines
5.9 KiB
TypeScript
import { createScopedLogger } from '@/lib/logging/core'
|
|
import { prisma } from '@/lib/prisma'
|
|
import { addTaskJob } from '@/lib/task/queues'
|
|
import { resolveTaskLocaleFromBody } from '@/lib/task/resolve-locale'
|
|
import { markTaskFailed } from '@/lib/task/service'
|
|
import { publishTaskEvent } from '@/lib/task/publisher'
|
|
import { TASK_EVENT_TYPE, TASK_TYPE, type TaskType } from '@/lib/task/types'
|
|
|
|
const INTERVAL_MS = Number.parseInt(process.env.WATCHDOG_INTERVAL_MS || '30000', 10) || 30000
|
|
const HEARTBEAT_TIMEOUT_MS = Number.parseInt(process.env.TASK_HEARTBEAT_TIMEOUT_MS || '90000', 10) || 90000
|
|
const TASK_TYPE_SET: ReadonlySet<string> = new Set(Object.values(TASK_TYPE))
|
|
const logger = createScopedLogger({
|
|
module: 'watchdog',
|
|
action: 'watchdog.tick',
|
|
})
|
|
|
|
function toTaskType(value: string): TaskType | null {
|
|
if (TASK_TYPE_SET.has(value)) {
|
|
return value as TaskType
|
|
}
|
|
return null
|
|
}
|
|
|
|
function toTaskPayload(value: unknown): Record<string, unknown> | null {
|
|
if (value && typeof value === 'object' && !Array.isArray(value)) {
|
|
return value as Record<string, unknown>
|
|
}
|
|
return null
|
|
}
|
|
|
|
async function recoverQueuedTasks() {
|
|
const rows = await prisma.task.findMany({
|
|
where: {
|
|
status: 'queued',
|
|
enqueuedAt: null,
|
|
},
|
|
take: 100,
|
|
orderBy: { createdAt: 'asc' },
|
|
})
|
|
|
|
for (const task of rows) {
|
|
const taskType = toTaskType(task.type)
|
|
if (!taskType) {
|
|
logger.error({
|
|
action: 'watchdog.reenqueue_invalid_type',
|
|
message: `invalid task type: ${task.type}`,
|
|
taskId: task.id,
|
|
projectId: task.projectId,
|
|
userId: task.userId,
|
|
errorCode: 'INVALID_PARAMS',
|
|
retryable: false,
|
|
})
|
|
continue
|
|
}
|
|
try {
|
|
const locale = resolveTaskLocaleFromBody(task.payload)
|
|
if (!locale) {
|
|
await markTaskFailed(task.id, 'TASK_LOCALE_REQUIRED', 'task locale is missing')
|
|
logger.error({
|
|
action: 'watchdog.reenqueue_locale_missing',
|
|
message: 'task locale is missing',
|
|
taskId: task.id,
|
|
projectId: task.projectId,
|
|
userId: task.userId,
|
|
errorCode: 'TASK_LOCALE_REQUIRED',
|
|
retryable: false,
|
|
})
|
|
continue
|
|
}
|
|
|
|
await addTaskJob({
|
|
taskId: task.id,
|
|
type: taskType,
|
|
locale,
|
|
projectId: task.projectId,
|
|
episodeId: task.episodeId,
|
|
targetType: task.targetType,
|
|
targetId: task.targetId,
|
|
payload: toTaskPayload(task.payload),
|
|
userId: task.userId,
|
|
})
|
|
await prisma.task.update({
|
|
where: { id: task.id },
|
|
data: {
|
|
enqueuedAt: new Date(),
|
|
enqueueAttempts: { increment: 1 },
|
|
lastEnqueueError: null,
|
|
},
|
|
})
|
|
logger.info({
|
|
action: 'watchdog.reenqueue',
|
|
message: 'watchdog re-enqueued queued task',
|
|
taskId: task.id,
|
|
projectId: task.projectId,
|
|
userId: task.userId,
|
|
details: {
|
|
type: task.type,
|
|
targetType: task.targetType,
|
|
targetId: task.targetId,
|
|
},
|
|
})
|
|
} catch (error: unknown) {
|
|
const message = error instanceof Error ? error.message : 're-enqueue failed'
|
|
await prisma.task.update({
|
|
where: { id: task.id },
|
|
data: {
|
|
enqueueAttempts: { increment: 1 },
|
|
lastEnqueueError: message,
|
|
},
|
|
})
|
|
logger.error({
|
|
action: 'watchdog.reenqueue_failed',
|
|
message,
|
|
taskId: task.id,
|
|
projectId: task.projectId,
|
|
userId: task.userId,
|
|
errorCode: 'EXTERNAL_ERROR',
|
|
retryable: true,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
async function cleanupZombieProcessingTasks() {
|
|
const timeoutAt = new Date(Date.now() - HEARTBEAT_TIMEOUT_MS)
|
|
const rows = await prisma.task.findMany({
|
|
where: {
|
|
status: 'processing',
|
|
heartbeatAt: { lt: timeoutAt },
|
|
},
|
|
take: 100,
|
|
})
|
|
|
|
for (const task of rows) {
|
|
if ((task.attempt || 0) >= (task.maxAttempts || 5)) {
|
|
await markTaskFailed(task.id, 'WATCHDOG_TIMEOUT', 'Task heartbeat timeout')
|
|
await publishTaskEvent({
|
|
taskId: task.id,
|
|
projectId: task.projectId,
|
|
userId: task.userId,
|
|
type: TASK_EVENT_TYPE.FAILED,
|
|
payload: { reason: 'watchdog_timeout' },
|
|
})
|
|
logger.error({
|
|
action: 'watchdog.fail_timeout',
|
|
message: 'watchdog marked task as failed due to heartbeat timeout',
|
|
taskId: task.id,
|
|
projectId: task.projectId,
|
|
userId: task.userId,
|
|
errorCode: 'WATCHDOG_TIMEOUT',
|
|
retryable: true,
|
|
})
|
|
continue
|
|
}
|
|
|
|
await prisma.task.update({
|
|
where: { id: task.id },
|
|
data: {
|
|
status: 'queued',
|
|
enqueuedAt: null,
|
|
heartbeatAt: null,
|
|
startedAt: null,
|
|
},
|
|
})
|
|
await publishTaskEvent({
|
|
taskId: task.id,
|
|
projectId: task.projectId,
|
|
userId: task.userId,
|
|
type: TASK_EVENT_TYPE.CREATED,
|
|
payload: { reason: 'watchdog_requeue' },
|
|
})
|
|
logger.warn({
|
|
action: 'watchdog.requeue_processing',
|
|
message: 'watchdog re-queued stalled processing task',
|
|
taskId: task.id,
|
|
projectId: task.projectId,
|
|
userId: task.userId,
|
|
retryable: true,
|
|
})
|
|
}
|
|
}
|
|
|
|
async function tick() {
|
|
const startedAt = Date.now()
|
|
try {
|
|
await recoverQueuedTasks()
|
|
await cleanupZombieProcessingTasks()
|
|
logger.info({
|
|
action: 'watchdog.tick.ok',
|
|
message: 'watchdog tick completed',
|
|
durationMs: Date.now() - startedAt,
|
|
})
|
|
} catch (error: unknown) {
|
|
const message = error instanceof Error ? error.message : 'watchdog tick failed'
|
|
logger.error({
|
|
action: 'watchdog.tick.failed',
|
|
message,
|
|
durationMs: Date.now() - startedAt,
|
|
errorCode: 'INTERNAL_ERROR',
|
|
retryable: true,
|
|
})
|
|
}
|
|
}
|
|
|
|
logger.info({
|
|
action: 'watchdog.started',
|
|
message: 'watchdog started',
|
|
details: {
|
|
intervalMs: INTERVAL_MS,
|
|
heartbeatTimeoutMs: HEARTBEAT_TIMEOUT_MS,
|
|
},
|
|
})
|
|
void tick()
|
|
setInterval(() => {
|
|
void tick()
|
|
}, INTERVAL_MS)
|