Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { and, eq, isNull, lt, lte, ne, not, or, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { verifyCronAuth } from '@/lib/auth/internal'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { getJobQueue } from '@/lib/core/async-jobs'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import {
Expand Down Expand Up @@ -153,7 +154,7 @@ export async function GET(request: NextRequest) {
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
)

if (shouldExecuteInline()) {
if (!isBullMQEnabled() && !isTriggerDevEnabled) {
try {
await jobQueue.startJob(jobId)
const output = await executeScheduleJob(payload)
Expand Down
15 changes: 12 additions & 3 deletions apps/sim/app/api/workflows/[id]/execute/route.async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,24 @@ vi.mock('@/lib/core/async-jobs', () => ({
completeJob: vi.fn(),
markJobFailed: vi.fn(),
}),
shouldExecuteInline: vi.fn().mockReturnValue(false),
shouldUseBullMQ: vi.fn().mockReturnValue(true),
}))

vi.mock('@/lib/core/bullmq', () => ({
createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({ payload, metadata })),
createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({
payload,
metadata: metadata ?? {},
})),
isBullMQEnabled: vi.fn().mockReturnValue(true),
}))

vi.mock('@/lib/core/workspace-dispatch', () => ({
DispatchQueueFullError: class DispatchQueueFullError extends Error {
statusCode = 503
constructor(scope: string, depth: number, limit: number) {
super(`${scope} queue at capacity (${depth}/${limit})`)
this.name = 'DispatchQueueFullError'
}
},
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
waitForDispatchJob: vi.fn(),
}))
Expand Down
13 changes: 7 additions & 6 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { AuthType, checkHybridAuth, hasExternalApiCredentials } from '@/lib/auth/hybrid'
import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate'
import { getJobQueue, shouldExecuteInline, shouldUseBullMQ } from '@/lib/core/async-jobs'
import { createBullMQJobData } from '@/lib/core/bullmq'
import { getJobQueue } from '@/lib/core/async-jobs'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import {
createTimeoutAbortController,
getTimeoutErrorMessage,
Expand Down Expand Up @@ -209,7 +210,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
}

try {
const useBullMQ = shouldUseBullMQ()
const useBullMQ = isBullMQEnabled()
const jobQueue = useBullMQ ? null : await getJobQueue()
const jobId = useBullMQ
? await enqueueWorkspaceDispatch({
Expand Down Expand Up @@ -238,7 +239,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
jobId,
})

if (shouldExecuteInline() && jobQueue) {
if (!isBullMQEnabled() && !isTriggerDevEnabled && jobQueue) {
const inlineJobQueue = jobQueue
void (async () => {
try {
Expand Down Expand Up @@ -792,7 +793,7 @@ async function handleExecutePost(

const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}

if (shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)) {
if (isBullMQEnabled() && !INLINE_TRIGGER_TYPES.has(triggerType)) {
try {
const dispatchJobId = await enqueueDirectWorkflowExecution(
{
Expand Down Expand Up @@ -992,7 +993,7 @@ async function handleExecutePost(
}

if (shouldUseDraftState) {
const shouldDispatchViaQueue = shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)
const shouldDispatchViaQueue = isBullMQEnabled() && !INLINE_TRIGGER_TYPES.has(triggerType)
if (shouldDispatchViaQueue) {
const metadata: ExecutionMetadata = {
requestId,
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/executor/execution/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class ExecutionSnapshot {
data.workflow,
data.input,
data.workflowVariables,
data.selectedOutputs,
Array.isArray(data.selectedOutputs) ? data.selectedOutputs : [],
data.state
)
}
Expand Down
12 changes: 0 additions & 12 deletions apps/sim/lib/core/async-jobs/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,6 @@ export async function getInlineJobQueue(): Promise<JobQueueBackend> {
return cachedInlineBackend
}

/**
* Checks if jobs should be executed inline in-process.
* Database fallback is the only mode that still relies on inline execution.
*/
export function shouldExecuteInline(): boolean {
return getAsyncBackendType() === 'database'
}

export function shouldUseBullMQ(): boolean {
return isBullMQEnabled()
}

/**
* Resets the cached backend (useful for testing)
*/
Expand Down
2 changes: 0 additions & 2 deletions apps/sim/lib/core/async-jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ export {
getInlineJobQueue,
getJobQueue,
resetJobQueueCache,
shouldExecuteInline,
shouldUseBullMQ,
} from './config'
export type {
AsyncBackendType,
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/lib/core/bullmq/connection.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { ConnectionOptions } from 'bullmq'
import { env, isTruthy } from '@/lib/core/config/env'
import { env } from '@/lib/core/config/env'

export function isBullMQEnabled(): boolean {
return isTruthy(env.CONCURRENCY_CONTROL_ENABLED) && Boolean(env.REDIS_URL)
return Boolean(env.REDIS_URL)
}

export function getBullMQConnectionOptions(): ConnectionOptions {
Expand Down
28 changes: 14 additions & 14 deletions apps/sim/lib/core/bullmq/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ function getQueueDefaultOptions(type: JobType) {
return {
attempts: 3,
backoff: { type: 'exponential' as const, delay: 1000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 1000 },
removeOnFail: { age: 2 * 60 * 60, count: 500 },
}
case 'webhook-execution':
return {
attempts: 2,
backoff: { type: 'exponential' as const, delay: 2000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 3 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 1000 },
removeOnFail: { age: 2 * 60 * 60, count: 500 },
}
case 'schedule-execution':
return {
attempts: 2,
backoff: { type: 'exponential' as const, delay: 5000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 3 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 1000 },
removeOnFail: { age: 2 * 60 * 60, count: 500 },
}
}
}
Expand All @@ -69,8 +69,8 @@ function createNamedQueue(
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 500 },
removeOnFail: { age: 2 * 60 * 60, count: 200 },
},
})
case KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE:
Expand All @@ -79,26 +79,26 @@ function createNamedQueue(
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 500 },
removeOnFail: { age: 2 * 60 * 60, count: 200 },
},
})
case MOTHERSHIP_JOB_EXECUTION_QUEUE:
return new Queue(name, {
connection: getBullMQConnectionOptions(),
defaultJobOptions: {
attempts: 1,
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 500 },
removeOnFail: { age: 2 * 60 * 60, count: 200 },
},
})
case WORKSPACE_NOTIFICATION_DELIVERY_QUEUE:
return new Queue(name, {
connection: getBullMQConnectionOptions(),
defaultJobOptions: {
attempts: 1,
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 500 },
removeOnFail: { age: 2 * 60 * 60, count: 200 },
},
})
}
Expand Down
1 change: 0 additions & 1 deletion apps/sim/lib/core/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ export const env = createEnv({
FREE_PLAN_LOG_RETENTION_DAYS: z.string().optional(), // Log retention days for free plan users

// Admission & Burst Protection
CONCURRENCY_CONTROL_ENABLED: z.string().optional().default('false'), // Set to 'true' to enable BullMQ-based concurrency control (default: inline execution)
ADMISSION_GATE_MAX_INFLIGHT: z.string().optional().default('500'), // Max concurrent in-flight execution requests per pod
DISPATCH_MAX_QUEUE_PER_WORKSPACE: z.string().optional().default('1000'), // Max queued dispatch jobs per workspace
DISPATCH_MAX_QUEUE_GLOBAL: z.string().optional().default('50000'), // Max queued dispatch jobs globally
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/lib/core/workspace-dispatch/memory-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ export class MemoryWorkspaceDispatchStorage implements WorkspaceDispatchStorageA
status: 'completed',
completedAt: Date.now(),
output,
bullmqPayload: undefined,
}))
}

Expand All @@ -482,6 +483,7 @@ export class MemoryWorkspaceDispatchStorage implements WorkspaceDispatchStorageA
status: 'failed',
completedAt: Date.now(),
error,
bullmqPayload: undefined,
}))
}

Expand Down
75 changes: 51 additions & 24 deletions apps/sim/lib/core/workspace-dispatch/redis-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
const logger = createLogger('WorkspaceDispatchRedisStore')

const DISPATCH_PREFIX = 'workspace-dispatch:v1'
const JOB_TTL_SECONDS = 48 * 60 * 60
const JOB_TTL_SECONDS = 2 * 60 * 60
const SEQUENCE_KEY = `${DISPATCH_PREFIX}:sequence`
const ACTIVE_WORKSPACES_KEY = `${DISPATCH_PREFIX}:workspaces`
const GLOBAL_DEPTH_KEY = `${DISPATCH_PREFIX}:global-depth`
Expand All @@ -27,7 +27,6 @@ local sequenceKey = ARGV[7]
local activeWorkspacesKey = ARGV[8]
local jobPrefix = ARGV[9]
local workspacePrefix = ARGV[10]
local jobTtlSeconds = tonumber(ARGV[11])

local function laneKey(lane)
return workspacePrefix .. workspaceId .. ':lane:' .. lane
Expand Down Expand Up @@ -121,17 +120,6 @@ if selectedRecord == nil then
end

redis.call('ZADD', leaseKey(), leaseExpiresAt, leaseId)
selectedRecord.status = 'admitting'
selectedRecord.lease = {
workspaceId = workspaceId,
leaseId = leaseId
}
if selectedRecord.metadata == nil then
selectedRecord.metadata = {}
end
selectedRecord.metadata.dispatchLeaseExpiresAt = leaseExpiresAt

redis.call('SET', jobPrefix .. selectedId, cjson.encode(selectedRecord), 'EX', jobTtlSeconds)
redis.call('ZREM', laneKey(selectedLane), selectedId)

local hasPending, minReadyAt = workspaceHasPending()
Expand All @@ -146,7 +134,7 @@ end

return cjson.encode({
type = 'admitted',
record = selectedRecord,
jobId = selectedId,
leaseId = leaseId,
leaseExpiresAt = leaseExpiresAt
})
Expand Down Expand Up @@ -321,21 +309,58 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd
SEQUENCE_KEY,
ACTIVE_WORKSPACES_KEY,
`${DISPATCH_PREFIX}:job:`,
`${DISPATCH_PREFIX}:workspace:`,
String(JOB_TTL_SECONDS)
`${DISPATCH_PREFIX}:workspace:`
)

const parsed = JSON.parse(String(raw)) as WorkspaceDispatchClaimResult
switch (parsed.type) {
case WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED:
case WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED:
interface LuaClaimResponse {
type: string
jobId?: string
leaseId?: string
leaseExpiresAt?: number
nextReadyAt?: number
}

const lua: LuaClaimResponse = JSON.parse(String(raw))
switch (lua.type) {
case WORKSPACE_DISPATCH_CLAIM_RESULTS.LIMIT_REACHED:
return { type: WORKSPACE_DISPATCH_CLAIM_RESULTS.LIMIT_REACHED }
case WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY:
return parsed
return { type: WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY }
case WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED:
return {
type: WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED,
nextReadyAt: lua.nextReadyAt ?? Date.now(),
}
case WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED: {
const record = await this.getDispatchJobRecord(lua.jobId!)
if (!record) {
await this.redis.zrem(workspaceLeaseKey(workspaceId), lua.leaseId!).catch(() => undefined)
logger.warn('Claimed job record expired before status update, lease released', {
jobId: lua.jobId,
})
return { type: WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY }
}

const updatedRecord: WorkspaceDispatchJobRecord = {
...record,
status: 'admitting',
lease: { workspaceId, leaseId: lua.leaseId! },
metadata: {
...record.metadata,
dispatchLeaseExpiresAt: lua.leaseExpiresAt!,
},
}
await this.saveDispatchJob(updatedRecord)

return {
type: WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED,
record: updatedRecord,
leaseId: lua.leaseId!,
leaseExpiresAt: lua.leaseExpiresAt!,
}
}
default:
throw new Error(
`Unknown dispatch claim result: ${String((parsed as { type?: string }).type)}`
)
throw new Error(`Unknown dispatch claim result: ${String(lua.type)}`)
}
}

Expand Down Expand Up @@ -536,6 +561,7 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd
status: 'completed',
completedAt: Date.now(),
output,
bullmqPayload: undefined,
}))
await this.redis.decr(GLOBAL_DEPTH_KEY).catch(() => undefined)
}
Expand All @@ -546,6 +572,7 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd
status: 'failed',
completedAt: Date.now(),
error,
bullmqPayload: undefined,
}))
await this.redis.decr(GLOBAL_DEPTH_KEY).catch(() => undefined)
}
Expand Down
Loading
Loading