Skip to content

Backend orchestrator workflows

How OpenQuok uses Flowcraft for integration refresh and notification email, in-process or on BullMQ workers.

6 min read

Backend orchestrator workflows

Social integrations that expose refreshCron need a supervisor that waits until the stored access token is close to expiry, then calls the provider refresh path and updates the database. By default the API runs this supervisor in the same Node process using the flowcraft package. You can instead enqueue the same blueprint on BullMQ so a separate worker process executes it (distributed execution, BullMQ adapter).

Where the code lives

  • Blueprint and tick logic: orchestrator/flows/refreshTokenWorkflow.ts
  • Activity-style steps (timeout + retries on failure): orchestrator/activities/integrationRefreshActivities.ts
  • Barrel exports: orchestrator/index.ts
  • Per-flow transport and BullMQ queue names: backend/config/orchestratorFlows.ts (defaults in TypeScript). Optional runtime overrides without editing that file: ORCHESTRATOR_INTEGRATION_REFRESH_TRANSPORT and ORCHESTRATOR_NOTIFICATION_EMAIL_TRANSPORT (in_process or bullmq), merged in backend/config/GlobalConfig.ts.
  • OAuth completion starts the supervisor from RefreshIntegrationService.startRefreshWorkflow (fire-and-forget).

Activity-style resilience

Loads and token refresh run through helpers that mirror common durable-activity settings: 10 minutes max per attempt, 3 attempts, 2 minutes initial delay between attempts, backoff coefficient 1 (fixed delay). Retries apply when the step throws or hits the timeout; a normal false from refresh (provider failure handled inside the service) is not retried.

Behavior (one iteration)

Each loop iteration:

  1. Loads the integration by organization and id; stops if it is missing, soft-deleted, mid–OAuth step, or marked refresh_needed.
  2. Computes milliseconds until token_expiration; stops if there is no expiry or the token is already expired.
  3. Sleeps for that duration (chunked so individual timers stay within the 32-bit setTimeout limit).
  4. Reloads the row and re-applies the same guards.
  5. Runs refresh (provider token exchange + upsert). Stops if refresh fails.

The graph uses Flowcraft’s loop construct: a begin node enters the loop controller; the body is a single tick node that performs the steps above. Continuation is driven by the workflow key loopShouldContinue (the loop controller evaluates conditions against flat serialized state, not a nested context.* path).

Limits vs an external workflow engine

The default in-process transport does not survive API restarts: if the API redeploys during a sleep, the supervisor for that integration is gone until the next OAuth connect or manual trigger.

With transport: bullmq for integration refresh in backend/config/orchestratorFlows.ts, run state and the job queue live in Redis, and you run a worker process alongside the API:

  • Local dev: pnpm orchestrator:worker:integration-refresh-bullmq from the repository root (or pnpm worker:integration-refresh-bullmq under backend/).
  • Production: pnpm railway:orchestrator:start:integration-refresh after pnpm railway:orchestrator:build, on an always-on host such as Railway. When notification email uses bullmq, run a second service with pnpm railway:orchestrator:start:notification-email.

That improves durability across API deploys, but each tick still performs a long in-node sleep while a worker job is active (see Flowcraft pause/sleep guidance). Tune BullMQ concurrency and monitor queue depth accordingly.

Managed Redis (for example Redis Cloud) works the same as for cache: set REDIS_* and optionally REDIS_BULLMQ_DB; see Redis cache. Workers must use the same Redis as the API. Env and deployment: Orchestrator workers.

Configuration

  • Transport, queue name, and enable: backend/config/orchestratorFlows.ts (integrationRefresh.enabled, transport, queue). GlobalConfig.ts copies enabled into config.bullmq.integrationRefresh.enabled except under Jest, where it is always false. Add future flows as sibling entries instead of per-flow keys in backend/.env.development.example.
  • Jest: JEST_WORKER_ID forces the supervisor off regardless of integrationRefresh.enabled. To test the supervisor, mock config or reload modules with different orchestratorFlows settings.
  • Elsewhere: toggle integrationRefresh.enabled in code for the deployment profile you want (or split config by environment in that file).

Testing

  • Unit: orchestrator/flows/refreshTokenWorkflow.unit.test.ts drives runRefreshTokenOrchestration with mocked activities and chunked sleep so cases finish fast. It also demonstrates Flowcraft testing utilities: InMemoryEventLogger (pass eventBus into the runner), runWithTrace, and createStepper from flowcraft/testing. For replay/CLI-style inspection against persisted events, see time-travel debugging and the CLI tool—those need a persistent event store, not just unit mocks.
  • Conventions: Repository guidance for workflow tests (Jest + flowcraft/testing, Faker, when to mock config vs call the runner directly) lives in the backend test-suites Cursor rule (.cursor/rules/backend-test-suites.mdc, section on orchestrator workflows).

BullMQ reconciliation (Flowcraft adapter)

The BullMQ adapter guide describes createBullMQReconciler: it scans Redis keys that hold workflow state, treats runs idle longer than a threshold as stalled, and re-enqueues the appropriate next jobs so a worker can continue. That is aimed at production reliability when jobs or workers disappear between steps.

The refresh-token blueprint is a tight loop with long in-node sleeps; reconciliation mainly helps when state exists in Redis but no worker is driving the next executeNode job—not when a single job is still running for hours on one worker.

BullMQ webhook endpoints (Flowcraft adapter)

The same guide describes wiring HTTP routes so external systems can POST payloads that resume workflows using Flowcraft webhook nodes (Flow.createWebhook() / wait-for-callback style graphs).

Further reading