Skip to content

fix(concurrency): json serialization issues, ttls, feature flag redirection#3847

Open
icecrasher321 wants to merge 9 commits intostagingfrom
fix/q-sys
Open

fix(concurrency): json serialization issues, ttls, feature flag redirection#3847
icecrasher321 wants to merge 9 commits intostagingfrom
fix/q-sys

Conversation

@icecrasher321
Copy link
Copy Markdown
Collaborator

Summary

Fixes multiple issues surfaced from prod deployment of worker based execution system.

Type of Change

  • Bug fix

Testing

To be tested in Dev Sandbox

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@cursor
Copy link
Copy Markdown

cursor bot commented Mar 30, 2026

PR Summary

Medium Risk
Changes job dispatch/enqueue decision logic and Redis-backed workspace dispatch claiming/retention, which can affect execution routing, queue pressure, and job lifecycle in production.

Overview
Execution routing is simplified and made more explicit. Workflow/schedule execution routes stop using shouldExecuteInline/shouldUseBullMQ and instead gate inline in-process execution on !isBullMQEnabled() && !isTriggerDevEnabled, while BullMQ dispatch selection is consistently driven by isBullMQEnabled().

BullMQ/dispatch retention and claim semantics are tightened. BullMQ job removeOnComplete/removeOnFail defaults and Redis dispatch job TTLs are reduced to ~2 hours; the Redis claim Lua script now returns jobId only, and the TS layer reloads/updates the job record (with lease metadata) and handles expired records by releasing the lease.

Fixes serialization/cleanup edge cases. ExecutionSnapshot.fromJSON and queued execution snapshot creation now defensively coerce selectedOutputs to an array, and dispatch job completion/failure clears bullmqPayload to avoid retaining large payloads.

Written by Cursor Bugbot for commit d34d6f4. Configure here.

@vercel
Copy link
Copy Markdown

vercel bot commented Mar 30, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

1 Skipped Deployment
Project Deployment Actions Updated (UTC)
docs Skipped Skipped Mar 30, 2026 9:42pm

Request Review

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 30, 2026

Greptile Summary

This PR fixes several production issues in the worker-based (BullMQ) execution system: JSON serialization failures from Lua-side CJSON mutations, stale TTLs consuming Redis memory, and incorrect feature-flag routing that prevented BullMQ from being used when it should have been.

Key changes:

  • Feature flag simplification: isBullMQEnabled() now returns Boolean(env.REDIS_URL), replacing the previous two-flag check (CONCURRENCY_CONTROL_ENABLED && REDIS_URL). All callers now use isBullMQEnabled() directly; the shouldUseBullMQ() / shouldExecuteInline() wrappers are removed.
  • Lua serialization fix: The Lua claim script no longer mutates and re-encodes the job record (avoiding CJSON edge cases). Status is now updated via a TypeScript GETSET round-trip after the script returns.
  • bullmqPayload cleanup: Both memory and Redis stores clear the payload field (undefined) on job completion/failure to prevent large payloads from lingering.
  • TTL reduction: All BullMQ queue retention ages and the Redis dispatch store TTL are reduced from 24–48 h / 7 days to 2 hours, with added count caps.
  • Defensive Array.isArray guards: Added in snapshot.ts and queued-workflow-execution.ts to handle non-array selectedOutputs values after JSON round-trips.
  • Two P1 concerns are noted: (1) the non-atomic claim flow can silently drop a job if the TypeScript record-update fails after the Lua ZREM; (2) isBullMQEnabled() activating for any deployment with REDIS_URL could cause jobs to queue without workers if Redis is present but BullMQ workers are not deployed.

Confidence Score: 4/5

  • Safe to merge for deployments that already have BullMQ workers running, but two P1 concerns should be addressed before broader rollout.
  • The selectedOutputs and serialization fixes are correct. The TTL and feature-flag simplifications are intentional. However, two P1 issues remain: (1) the non-atomic claim flow in redis-store.ts can drop a job if the record read/write fails after the Lua script has already removed the job from its lane, with no recovery path; (2) isBullMQEnabled() now fires for any deployment with REDIS_URL, which could silently starve jobs in environments where BullMQ workers aren't running.
  • apps/sim/lib/core/workspace-dispatch/redis-store.ts (atomic claim safety) and apps/sim/lib/core/bullmq/connection.ts (implicit activation on REDIS_URL).

Important Files Changed

Filename Overview
apps/sim/lib/core/bullmq/connection.ts Removes CONCURRENCY_CONTROL_ENABLED gate — BullMQ is now enabled for any deployment with REDIS_URL, which could silently break environments that have Redis but no BullMQ workers.
apps/sim/lib/core/workspace-dispatch/redis-store.ts Moves job-record status update out of the atomic Lua script into two separate TypeScript round-trips; if either fails after the Lua ZREM, the job is dropped from its lane with no recovery path.
apps/sim/lib/core/bullmq/queues.ts All queue TTLs reduced from 1–7 days to 2 hours for both complete and fail retention; aggressive reduction may hinder post-incident debugging.
apps/sim/lib/webhooks/processor.ts Replaces shouldExecuteInline() with isBullMQEnabled() for polling-webhook routing; introduces a dead-code branch where the inner isBullMQEnabled() ternary is always true.
apps/sim/executor/execution/snapshot.ts Adds a defensive Array.isArray guard on selectedOutputs to handle non-array values from JSON deserialization — safe fix.
apps/sim/lib/workflows/executor/queued-workflow-execution.ts Same defensive Array.isArray guard applied to payload.selectedOutputs — mirrors the snapshot.ts fix correctly.
apps/sim/lib/core/async-jobs/config.ts Removes the now-redundant shouldExecuteInline() and shouldUseBullMQ() wrapper functions — callers now use isBullMQEnabled() directly.
apps/sim/lib/core/config/env.ts Removes CONCURRENCY_CONTROL_ENABLED from the env schema, consistent with the isBullMQEnabled() change.
apps/sim/lib/core/workspace-dispatch/memory-store.ts Adds bullmqPayload: undefined when completing/failing jobs to clear large payload objects from memory — good memory hygiene.
apps/sim/app/api/schedules/execute/route.ts Replaces shouldExecuteInline() with !isBullMQEnabled() for inline fallback guard — semantically equivalent and correct.
apps/sim/app/api/workflows/[id]/execute/route.ts Replaces shouldUseBullMQ() / shouldExecuteInline() with isBullMQEnabled() throughout — consistent and correct after the abstraction removal.
apps/sim/lib/core/async-jobs/index.ts Removes re-exports for the deleted helper functions — clean-up only.

Sequence Diagram

sequenceDiagram
    participant API as API Route
    participant PW as Webhook Processor
    participant BQ as isBullMQEnabled()
    participant LUA as Redis Lua Script
    participant RS as RedisStore (TypeScript)
    participant RDB as Redis

    Note over API,RDB: Workflow / Schedule / Webhook Execution (BullMQ path)

    API->>BQ: isBullMQEnabled()?
    BQ-->>API: Boolean(env.REDIS_URL)

    alt BullMQ enabled
        API->>RDB: enqueueWorkspaceDispatch(...)
        RDB-->>API: jobId
    else Inline fallback
        API->>API: getJobQueue() → inline execution
    end

    Note over LUA,RDB: Redis Dispatch Claim (new non-atomic flow)

    LUA->>RDB: ZADD leaseKey (atomic)
    LUA->>RDB: ZREM laneKey (atomic)
    LUA-->>RS: { type: admitted, jobId }

    RS->>RDB: GET jobKey(jobId)
    RDB-->>RS: record (status: pending)

    alt Record found
        RS->>RDB: SET jobKey(jobId) {status: admitting, lease: ...}
        RS-->>API: AdmittedResult + updatedRecord
    else Record missing / expired
        RS-->>API: throw Error (job already removed from lane — lost!)
    end

    Note over API,RDB: Job Completion / Failure

    API->>RDB: SET jobKey {status: completed, bullmqPayload: undefined}
    API->>RDB: DECR global-depth
Loading

Comments Outside Diff (1)

  1. apps/sim/lib/webhooks/processor.ts, line 1268-1293 (link)

    P2 Dead code inside guarded block

    Inside the if (isPolling && isBullMQEnabled()) block, the inner ternary isBullMQEnabled() ? ... : ... at line 1269 will always take the first branch — isBullMQEnabled() is unconditionally true here because we just entered the outer if. The await (await getJobQueue()).enqueue(...) fallback (lines 1287–1293) is unreachable dead code.

Reviews (1): Last reviewed commit: "memory improvements" | Re-trigger Greptile

@icecrasher321
Copy link
Copy Markdown
Collaborator Author

bugbot run

@icecrasher321
Copy link
Copy Markdown
Collaborator Author

bugbot run

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Bugbot reviewed your changes and found no new issues!

Comment @cursor review or bugbot run to trigger another review on this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant