Backend orchestrator workflows
How OpenQuok uses Flowcraft for integration refresh and notification email, in-process or on BullMQ workers.
Backend orchestrator workflows
Social integrations that expose refreshCron need a supervisor that waits until the stored access token is close to expiry, then calls the provider refresh path and updates the database. By default the API runs this supervisor in the same Node process using the flowcraft package. You can instead enqueue the same blueprint on BullMQ so a separate worker process executes it (distributed execution, BullMQ adapter).
Where the code lives
- Blueprint and tick logic:
orchestrator/flows/refreshTokenWorkflow.ts - Activity-style steps (timeout + retries on failure):
orchestrator/activities/integrationRefreshActivities.ts - Barrel exports:
orchestrator/index.ts - Per-flow transport and BullMQ queue names:
backend/config/orchestratorFlows.ts(defaults in TypeScript). Optional runtime overrides without editing that file:ORCHESTRATOR_INTEGRATION_REFRESH_TRANSPORTandORCHESTRATOR_NOTIFICATION_EMAIL_TRANSPORT(in_processorbullmq), merged inbackend/config/GlobalConfig.ts. - OAuth completion starts the supervisor from
RefreshIntegrationService.startRefreshWorkflow(fire-and-forget).
Activity-style resilience
Loads and token refresh run through helpers that mirror common durable-activity settings: 10 minutes max per attempt, 3 attempts, 2 minutes initial delay between attempts, backoff coefficient 1 (fixed delay). Retries apply when the step throws or hits the timeout; a normal false from refresh (provider failure handled inside the service) is not retried.
Behavior (one iteration)
Each loop iteration:
- Loads the integration by organization and id; stops if it is missing, soft-deleted, mid–OAuth step, or marked
refresh_needed. - Computes milliseconds until
token_expiration; stops if there is no expiry or the token is already expired. - Sleeps for that duration (chunked so individual timers stay within the 32-bit
setTimeoutlimit). - Reloads the row and re-applies the same guards.
- Runs
refresh(provider token exchange + upsert). Stops if refresh fails.
The graph uses Flowcraft’s loop construct: a begin node enters the loop controller; the body is a single tick node that performs the steps above. Continuation is driven by the workflow key loopShouldContinue (the loop controller evaluates conditions against flat serialized state, not a nested context.* path).
Limits vs an external workflow engine
The default in-process transport does not survive API restarts: if the API redeploys during a sleep, the supervisor for that integration is gone until the next OAuth connect or manual trigger.
With transport: bullmq for integration refresh in backend/config/orchestratorFlows.ts, run state and the job queue live in Redis, and you run a worker process alongside the API:
- Local dev:
pnpm orchestrator:worker:integration-refresh-bullmqfrom the repository root (orpnpm worker:integration-refresh-bullmqunderbackend/). - Production:
pnpm railway:orchestrator:start:integration-refreshafterpnpm railway:orchestrator:build, on an always-on host such as Railway. When notification email usesbullmq, run a second service withpnpm railway:orchestrator:start:notification-email.
That improves durability across API deploys, but each tick still performs a long in-node sleep while a worker job is active (see Flowcraft pause/sleep guidance). Tune BullMQ concurrency and monitor queue depth accordingly.
Managed Redis (for example Redis Cloud) works the same as for cache: set REDIS_* and optionally REDIS_BULLMQ_DB; see Redis cache. Workers must use the same Redis as the API. Env and deployment: Orchestrator workers.
Configuration
- Transport, queue name, and enable:
backend/config/orchestratorFlows.ts(integrationRefresh.enabled, transport, queue).GlobalConfig.tscopiesenabledintoconfig.bullmq.integrationRefresh.enabledexcept under Jest, where it is alwaysfalse. Add future flows as sibling entries instead of per-flow keys inbackend/.env.development.example. - Jest:
JEST_WORKER_IDforces the supervisor off regardless ofintegrationRefresh.enabled. To test the supervisor, mockconfigor reload modules with differentorchestratorFlowssettings. - Elsewhere: toggle
integrationRefresh.enabledin code for the deployment profile you want (or split config by environment in that file).
Testing
- Unit:
orchestrator/flows/refreshTokenWorkflow.unit.test.tsdrivesrunRefreshTokenOrchestrationwith mocked activities and chunked sleep so cases finish fast. It also demonstrates Flowcraft testing utilities:InMemoryEventLogger(passeventBusinto the runner),runWithTrace, andcreateStepperfromflowcraft/testing. For replay/CLI-style inspection against persisted events, see time-travel debugging and the CLI tool—those need a persistent event store, not just unit mocks. - Conventions: Repository guidance for workflow tests (Jest +
flowcraft/testing, Faker, when to mockconfigvs call the runner directly) lives in the backend test-suites Cursor rule (.cursor/rules/backend-test-suites.mdc, section on orchestrator workflows).
BullMQ reconciliation (Flowcraft adapter)
The BullMQ adapter guide describes createBullMQReconciler: it scans Redis keys that hold workflow state, treats runs idle longer than a threshold as stalled, and re-enqueues the appropriate next jobs so a worker can continue. That is aimed at production reliability when jobs or workers disappear between steps.
Applicability here
transport: bullmq in orchestratorFlows.ts: it is the documented way to recover runs that got stuck in Redis without an active BullMQ job. OpenQuok does not ship a cron or process that calls the reconciler yet; adding one (same Redis + adapter instance shape as the worker) would be a follow-up if you need that guarantee in production.The refresh-token blueprint is a tight loop with long in-node sleeps; reconciliation mainly helps when state exists in Redis but no worker is driving the next executeNode job—not when a single job is still running for hours on one worker.
BullMQ webhook endpoints (Flowcraft adapter)
The same guide describes wiring HTTP routes so external systems can POST payloads that resume workflows using Flowcraft webhook nodes (Flow.createWebhook() / wait-for-callback style graphs).
Applicability here
begin → loop tick → finished; it does not use webhook nodes, so this feature does not apply to the current refresh supervisor. It would matter only if you add another blueprint that pauses on a webhook. Also, the @flowcraft/bullmq-adapter version used in this repo currently implements registerWebhookEndpoint by throwing (“not implemented”), so the guide’s webhook flow is not usable with the stock adapter here until upstream adds it or you provide a custom integration.
Further reading
- Orchestrator workers — env vars, production scripts, dotenv resolution
- Railway Deployment — persistent services, CLI,
railway.toml - Fluent API
- Pausing and sleep nodes (this workflow uses an imperative delay inside
tickbecause the wait length comes from the database at runtime) - Runtime adapter: BullMQ (reconciliation, webhooks, worker/client setup)