From e1602858f48fc6258ebfd6948ee8cd529599371f Mon Sep 17 00:00:00 2001 From: Kiran K Date: Fri, 22 Mar 2024 17:19:23 +0530 Subject: [PATCH] Cron jobs support (#2430) * Use random id instead of os.hostname * Remove unused import * add eventLockKey * add eventLockKey * Check for expiry when acquiring lock (wip) * Add cron interval config * Add timeout functionality to process events * Fix index value overwrite * Fix bulk delete * Remove console.log * directory sync add cron * Refactor event processing and add cron scheduling * Remove await * Refactor directory sync worker * Remove unused import statement * Fix lockKey initialization in EventProcessor constructor * cleanup * take a callback for DSync as NPM option (WIP) * Fix the unit test * Fix the webhooks test * Remove unused import statement * Fix type * fixed cron, cleaned up x-access-token. TODO: Fix webhooks batch cron * grouped dsync index names in one place * ensure the cron ticks regularly, use setInterval instead of setTimeout * do the renewal inside EventLock * lint fix * improved locking and added check for lock in Google dsync cron * locking per cron * refactor --------- Co-authored-by: Deepak Prabhakara --- .env.example | 2 + lib/env.ts | 6 + npm/src/cron/lock.ts | 108 ++++++++++++++ npm/src/directory-sync/batch-events/lock.ts | 82 ----------- npm/src/directory-sync/batch-events/queue.ts | 84 ++++++----- npm/src/directory-sync/index.ts | 60 ++++---- npm/src/directory-sync/non-scim/index.ts | 132 +++++++++++++----- npm/src/directory-sync/request.ts | 23 ++- npm/src/directory-sync/scim/Groups.ts | 38 ++--- npm/src/directory-sync/scim/Users.ts | 20 ++- .../scim/WebhookEventsLogger.ts | 23 +-- npm/src/directory-sync/scim/utils.ts | 7 + npm/src/directory-sync/types.ts | 4 +- npm/src/directory-sync/utils.ts | 6 +- npm/src/typings.ts | 4 + npm/test/dsync/batch/webhooks.test.ts | 10 +- npm/test/dsync/google_api.test.ts | 45 +++--- npm/test/dsync/webhooks.test.ts | 42 ++---- pages/api/admin/terminus/models/[id]/index.ts | 2 - pages/api/admin/terminus/roles.ts | 1 - pages/api/scim/v2.0/[...directory].ts | 5 +- pages/api/v1/dsync/cron/process-events.ts | 4 +- pages/api/v1/dsync/cron/sync-google.ts | 4 +- 23 files changed, 423 insertions(+), 289 deletions(-) create mode 100644 npm/src/cron/lock.ts delete mode 100644 npm/src/directory-sync/batch-events/lock.ts diff --git a/.env.example b/.env.example index 2afa1b6c5..ef95f7528 100644 --- a/.env.example +++ b/.env.example @@ -92,10 +92,12 @@ WEBHOOK_SECRET= # Directory sync webhook event batch size (Eg: 50) DSYNC_WEBHOOK_BATCH_SIZE= +DSYNC_WEBHOOK_BATCH_CRON_INTERVAL= # Google workspace directory sync DSYNC_GOOGLE_CLIENT_ID= DSYNC_GOOGLE_CLIENT_SECRET= +DSYNC_GOOGLE_CRON_INTERVAL= # Only applicable for BoxyHQ SaaS deployments BOXYHQ_HOSTED=0 diff --git a/lib/env.ts b/lib/env.ts index e22e19b80..b345c7ba6 100644 --- a/lib/env.ts +++ b/lib/env.ts @@ -93,6 +93,9 @@ const jacksonOptions: JacksonOption = { webhookBatchSize: process.env.DSYNC_WEBHOOK_BATCH_SIZE ? Number(process.env.DSYNC_WEBHOOK_BATCH_SIZE) : undefined, + webhookBatchCronInterval: process.env.DSYNC_WEBHOOK_BATCH_CRON_INTERVAL + ? Number(process.env.DSYNC_WEBHOOK_BATCH_CRON_INTERVAL) + : undefined, debugWebhooks: process.env.DSYNC_DEBUG_WEBHOOKS === 'true', providers: { google: { @@ -100,6 +103,9 @@ const jacksonOptions: JacksonOption = { clientSecret: process.env.DSYNC_GOOGLE_CLIENT_SECRET || process.env.GOOGLE_CLIENT_SECRET || '', authorizePath: googleDSyncAuthorizePath, callbackPath: googleDSyncCallbackPath, + cronInterval: process.env.DSYNC_GOOGLE_CRON_INTERVAL + ? Number(process.env.DSYNC_GOOGLE_CRON_INTERVAL) + : undefined, }, }, }, diff --git a/npm/src/cron/lock.ts b/npm/src/cron/lock.ts new file mode 100644 index 000000000..2e8212fae --- /dev/null +++ b/npm/src/cron/lock.ts @@ -0,0 +1,108 @@ +import { randomUUID } from 'crypto'; +import type { Storable } from '../typings'; +import { eventLockTTL } from '../directory-sync/utils'; + +const lockRenewalInterval = (eventLockTTL / 2) * 1000; +const instanceKey = randomUUID(); + +interface Lock { + key: string; + created_at: string; +} + +interface LockParams { + lockStore: Storable; + key: string; +} + +export class CronLock { + private lockStore: Storable; + private key: string; + private intervalId: NodeJS.Timeout | undefined; + + constructor({ key, lockStore }: LockParams) { + this.lockStore = lockStore; + this.key = key; + } + + public async acquire() { + try { + const lock = await this.get(); + + if (lock && !this.isExpired(lock)) { + return lock.key === instanceKey; + } + + await this.add(); + + // Renew the lock periodically + if (!this.intervalId) { + this.intervalId = setInterval(async () => { + this.renew(); + }, lockRenewalInterval); + } + + return true; + } catch (e: any) { + console.error(`Error acquiring lock for ${instanceKey}: ${e}`); + return false; + } + } + + private async renew() { + try { + const lock = await this.get(); + + if (!lock) { + return; + } + + if (lock.key != instanceKey) { + return; + } + + await this.add(); + } catch (e: any) { + console.error(`Error renewing lock for ${instanceKey}: ${e}`); + } + } + + private async add() { + const record = { + key: instanceKey, + created_at: new Date().toISOString(), + }; + + await this.lockStore.put(this.key, record); + } + + private async get() { + return (await this.lockStore.get(this.key)) as Lock; + } + + public async release() { + if (this.intervalId) { + clearInterval(this.intervalId); + } + + const lock = await this.get(); + + if (!lock) { + return; + } + + if (lock.key != instanceKey) { + return; + } + + await this.lockStore.delete(this.key); + } + + private isExpired(lock: Lock) { + const lockDate = new Date(lock.created_at); + const currentDate = new Date(); + const diffSeconds = (currentDate.getTime() - lockDate.getTime()) / 1000; + + return diffSeconds > eventLockTTL; + } +} diff --git a/npm/src/directory-sync/batch-events/lock.ts b/npm/src/directory-sync/batch-events/lock.ts deleted file mode 100644 index 7114745bd..000000000 --- a/npm/src/directory-sync/batch-events/lock.ts +++ /dev/null @@ -1,82 +0,0 @@ -import type { Storable } from '../../typings'; - -interface Lock { - key: string; - created_at: string; -} - -interface LockParams { - lockStore: Storable; -} - -export class EventLock { - private lockStore: Storable; - - constructor({ lockStore }: LockParams) { - this.lockStore = lockStore; - } - - public async acquire(key: string) { - try { - const lock = await this.get(); - - if (lock) { - return lock.key === key; - } - - await this.add(key); - - return true; - } catch (e: any) { - console.error(`Error acquiring lock for ${key}: ${e}`); - return false; - } - } - - public async renew(key: string) { - try { - const lock = await this.get(); - - if (!lock) { - return; - } - - if (lock.key != key) { - return; - } - - await this.add(key); - } catch (e: any) { - console.error(`Error renewing lock for ${key}: ${e}`); - } - } - - async add(key: string) { - const record = { - key, - created_at: new Date().toISOString(), - }; - - await this.lockStore.put(key, record); - } - - async get() { - const { data } = (await this.lockStore.getAll()) as { data: Lock[] }; - - return data.length > 0 ? data[0] : null; - } - - async release(key: string) { - const lock = await this.get(); - - if (!lock) { - return; - } - - if (lock.key != key) { - return; - } - - await this.lockStore.delete(key); - } -} diff --git a/npm/src/directory-sync/batch-events/queue.ts b/npm/src/directory-sync/batch-events/queue.ts index d3613d090..022590453 100644 --- a/npm/src/directory-sync/batch-events/queue.ts +++ b/npm/src/directory-sync/batch-events/queue.ts @@ -1,4 +1,3 @@ -import os from 'os'; import _ from 'lodash'; import { randomUUID } from 'crypto'; @@ -8,14 +7,14 @@ import type { IDirectoryConfig, Storable, JacksonOption, - EventLock, + CronLock, IWebhookEventsLogger, } from '../../typings'; -import { eventLockTTL } from '../utils'; import { sendPayloadToWebhook } from '../../event/webhook'; import { isConnectionActive } from '../../controller/utils'; import { JacksonError } from '../../controller/error'; import * as metrics from '../../opentelemetry/metrics'; +import { indexNames } from '../scim/utils'; enum EventStatus { PENDING = 'PENDING', @@ -34,21 +33,21 @@ interface QueuedEvent { interface DirectoryEventsParams { opts: JacksonOption; eventStore: Storable; - eventLock: EventLock; + eventLock: CronLock; directories: IDirectoryConfig; webhookLogs: IWebhookEventsLogger; } let isJobRunning = false; -const lockKey = os.hostname(); -const lockRenewalInterval = (eventLockTTL / 2) * 1000; +let intervalId: NodeJS.Timeout; export class EventProcessor { private eventStore: Storable; - private eventLock: EventLock; + private eventLock: CronLock; private opts: JacksonOption; private directories: IDirectoryConfig; private webhookLogs: IWebhookEventsLogger; + private cronInterval: number | undefined; constructor({ opts, eventStore, eventLock, directories, webhookLogs }: DirectoryEventsParams) { this.opts = opts; @@ -56,6 +55,12 @@ export class EventProcessor { this.eventStore = eventStore; this.directories = directories; this.webhookLogs = webhookLogs; + this.cronInterval = this.opts.dsync?.webhookBatchCronInterval; + + if (this.cronInterval) { + this.scheduleWorker = this.scheduleWorker.bind(this); + this.scheduleWorker(); + } } // Push the new event to the database @@ -72,7 +77,7 @@ export class EventProcessor { const index = [ { - name: 'directoryId', + name: indexNames.directoryId, value: event.directory_id, }, ]; @@ -82,28 +87,8 @@ export class EventProcessor { return record; } - // Process the events and send them to the webhooks as a batch - public async process() { - if (isJobRunning) { - return; - } - - if (!(await this.eventLock.acquire(lockKey))) { - return; - } - - isJobRunning = true; - - // Renew the lock periodically - const intervalId = setInterval(async () => { - this.eventLock.renew(lockKey); - }, lockRenewalInterval); - - const batchSize = this.opts.dsync?.webhookBatchSize; - - if (!batchSize) { - throw new JacksonError('Batch size not defined'); - } + private async _process() { + const batchSize = this.opts.dsync?.webhookBatchSize || 50; // eslint-disable-next-line no-constant-condition while (true) { @@ -111,8 +96,7 @@ export class EventProcessor { const eventsCount = events.length; if (eventsCount === 0) { - clearInterval(intervalId); - await this.eventLock.release(lockKey); + await this.eventLock.release(); break; } @@ -183,8 +167,32 @@ export class EventProcessor { } } } + } + + // Process the events and send them to the webhooks as a batch + public async process() { + if (isJobRunning) { + console.info('A batch process is already running, skipping.'); + return; + } + + if (!(await this.eventLock.acquire())) { + return; + } + + isJobRunning = true; + + try { + this._process(); + } catch (e: any) { + console.error(' Error processing webhooks batch:', e); + } isJobRunning = false; + + if (this.cronInterval) { + this.scheduleWorker(); + } } // Fetch next batch of events from the database @@ -260,4 +268,16 @@ export class EventProcessor { metrics.increment('dsyncEventsBatchFailed'); console.error('All events in the batch have failed. Please check the system.'); } + + public async scheduleWorker() { + if (!this.cronInterval) { + return; + } + + if (intervalId) { + clearInterval(intervalId); + } + + intervalId = setInterval(() => this.process(), this.cronInterval * 1000); + } } diff --git a/npm/src/directory-sync/index.ts b/npm/src/directory-sync/index.ts index a84205098..a2cb2a03f 100644 --- a/npm/src/directory-sync/index.ts +++ b/npm/src/directory-sync/index.ts @@ -1,4 +1,4 @@ -import type { JacksonOption, IEventController, EventCallback, DB } from '../typings'; +import type { JacksonOption, IEventController, DB } from '../typings'; import { DirectoryConfig } from './scim/DirectoryConfig'; import { DirectoryUsers } from './scim/DirectoryUsers'; import { DirectoryGroups } from './scim/DirectoryGroups'; @@ -8,11 +8,11 @@ import { getDirectorySyncProviders } from './scim/utils'; import { RequestHandler } from './request'; import { WebhookEventsLogger } from './scim/WebhookEventsLogger'; import { newGoogleProvider } from './non-scim/google'; -import { startSync } from './non-scim'; +import { SyncProviders } from './non-scim'; import { storeNamespacePrefix } from '../controller/utils'; -import { eventLockTTL, handleEventCallback } from './utils'; +import { eventLockKey, eventLockTTL, googleLockKey, handleEventCallback } from './utils'; import { EventProcessor } from './batch-events/queue'; -import { EventLock } from './batch-events/lock'; +import { CronLock } from '../cron/lock'; const directorySync = async (params: { db: DB; opts: JacksonOption; eventController: IEventController }) => { const { db, opts, eventController } = params; @@ -31,7 +31,6 @@ const directorySync = async (params: { db: DB; opts: JacksonOption; eventControl const directoryUsers = new DirectoryUsers({ directories, users }); const directoryGroups = new DirectoryGroups({ directories, users, groups }); - const requestHandler = new RequestHandler(directoryUsers, directoryGroups); // Fetch the supported providers const getProviders = () => { @@ -43,7 +42,8 @@ const directorySync = async (params: { db: DB; opts: JacksonOption; eventControl // Batch send events const eventStore = db.store(storeNamespacePrefix.dsync.events); const lockStore = db.store(storeNamespacePrefix.dsync.lock, eventLockTTL); - const eventLock = new EventLock({ lockStore }); + const eventLock = new CronLock({ key: eventLockKey, lockStore }); + const googleLock = new CronLock({ key: googleLockKey, lockStore }); const eventProcessor = new EventProcessor({ opts, eventStore, @@ -52,6 +52,35 @@ const directorySync = async (params: { db: DB; opts: JacksonOption; eventControl webhookLogs, }); + // Internal callback handles sending webhooks + const internalCallback = await handleEventCallback({ + opts, + directories, + webhookLogs, + eventProcessor, + }); + + // Use the provided callback (Embedded) or fallback to the internal callback (Hosted) + const _callback = opts.dsync?.callback || internalCallback; + + // SCIM handler + const requestHandler = new RequestHandler({ + directoryUsers, + directoryGroups, + eventCallback: _callback, + }); + + // Google sync handler + const syncProviders = new SyncProviders({ + userController: users, + groupController: groups, + opts, + directories, + requestHandler, + eventCallback: _callback, + eventLock: googleLock, + }); + return { users, groups, @@ -60,27 +89,10 @@ const directorySync = async (params: { db: DB; opts: JacksonOption; eventControl requests: requestHandler, providers: getProviders, events: { - callback: await handleEventCallback({ - opts, - directories, - webhookLogs, - eventProcessor, - }), batch: eventProcessor, }, google: googleProvider.oauth, - sync: async (callback: EventCallback) => { - return await startSync( - { - userController: users, - groupController: groups, - opts, - directories, - requestHandler, - }, - callback - ); - }, + sync: syncProviders.startSync.bind(syncProviders), }; }; diff --git a/npm/src/directory-sync/non-scim/index.ts b/npm/src/directory-sync/non-scim/index.ts index bb202a79a..4cb82b999 100644 --- a/npm/src/directory-sync/non-scim/index.ts +++ b/npm/src/directory-sync/non-scim/index.ts @@ -6,6 +6,7 @@ import type { IRequestHandler, JacksonOption, EventCallback, + CronLock, } from '../../typings'; import { SyncUsers } from './syncUsers'; import { SyncGroups } from './syncGroups'; @@ -17,46 +18,109 @@ interface SyncParams { opts: JacksonOption; directories: IDirectoryConfig; requestHandler: IRequestHandler; + eventCallback: EventCallback; + eventLock: CronLock; } -// Method to start the directory sync process -// This method will be called by the directory sync cron job -export const startSync = async (params: SyncParams, callback: EventCallback) => { - const { userController, groupController, opts, directories, requestHandler } = params; +let isJobRunning = false; +let intervalId: NodeJS.Timeout; - const { directory: provider } = newGoogleProvider({ directories, opts }); +export class SyncProviders { + private userController: IUsers; + private groupController: IGroups; + private directories: IDirectoryConfig; + private requestHandler: IRequestHandler; + private opts: JacksonOption; + private cronInterval: number | undefined; + private eventCallback: EventCallback; + private eventLock: CronLock; - const startTime = Date.now(); + constructor({ + userController, + groupController, + opts, + directories, + requestHandler, + eventCallback, + eventLock, + }: SyncParams) { + this.userController = userController; + this.groupController = groupController; + this.directories = directories; + this.requestHandler = requestHandler; + this.eventCallback = eventCallback; + this.opts = opts; + this.cronInterval = this.opts.dsync?.providers?.google.cronInterval; + this.eventLock = eventLock; - console.info('Starting the sync process'); - - const allDirectories = await provider.getDirectories(); - - if (allDirectories.length === 0) { - console.info('No directories found. Skipping the sync process'); - return; - } - - try { - for (const directory of allDirectories) { - const params = { - directory, - userController, - groupController, - provider, - requestHandler, - callback, - }; - - await new SyncUsers(params).sync(); - await new SyncGroups(params).sync(); - await new SyncGroupMembers(params).sync(); + if (this.cronInterval) { + this.scheduleSync = this.scheduleSync.bind(this); + this.scheduleSync(); } - } catch (e: any) { - console.error(e); } - const endTime = Date.now(); + // Start the sync process + public async startSync() { + if (isJobRunning) { + console.info('A sync process is already running, skipping.'); + return; + } - console.info(`Sync process completed in ${(endTime - startTime) / 1000} seconds`); -}; + if (!(await this.eventLock.acquire())) { + return; + } + + isJobRunning = true; + + const { directory: provider } = newGoogleProvider({ directories: this.directories, opts: this.opts }); + + const startTime = Date.now(); + + try { + const allDirectories = await provider.getDirectories(); + + console.info(`Starting the sync process for ${allDirectories.length} directories`); + + for (const directory of allDirectories) { + const params = { + directory, + provider, + userController: this.userController, + groupController: this.groupController, + requestHandler: this.requestHandler, + callback: this.eventCallback, + }; + + await new SyncUsers(params).sync(); + await new SyncGroups(params).sync(); + await new SyncGroupMembers(params).sync(); + } + } catch (e: any) { + console.error(' Error processing Google sync:', e); + } + + await this.eventLock.release(); + + const endTime = Date.now(); + console.info(`Sync process completed in ${(endTime - startTime) / 1000} seconds`); + + isJobRunning = false; + + if (this.cronInterval) { + this.scheduleSync(); + } + } + + // Schedule the next sync process + private scheduleSync() { + if (!this.cronInterval) { + return; + } + + if (intervalId) { + clearInterval(intervalId); + } + + intervalId = setInterval(() => this.startSync(), this.cronInterval * 1000); + } +} diff --git a/npm/src/directory-sync/request.ts b/npm/src/directory-sync/request.ts index ecca1aa8d..09baae14a 100644 --- a/npm/src/directory-sync/request.ts +++ b/npm/src/directory-sync/request.ts @@ -6,19 +6,30 @@ import type { DirectorySyncRequest, } from '../typings'; +interface RequestHandlerParams { + directoryUsers: IDirectoryUsers; + directoryGroups: IDirectoryGroups; + eventCallback: EventCallback; +} + export class RequestHandler { - constructor( - private directoryUsers: IDirectoryUsers, - private directoryGroups: IDirectoryGroups - ) {} + private directoryUsers: IDirectoryUsers; + private directoryGroups: IDirectoryGroups; + private eventCallback: EventCallback; + + constructor({ directoryUsers, directoryGroups, eventCallback }: RequestHandlerParams) { + this.directoryUsers = directoryUsers; + this.directoryGroups = directoryGroups; + this.eventCallback = eventCallback; + } async handle(request: DirectorySyncRequest, callback?: EventCallback): Promise { const resourceType = request.resourceType.toLowerCase(); if (resourceType === 'users') { - return await this.directoryUsers.handleRequest(request, callback); + return await this.directoryUsers.handleRequest(request, callback || this.eventCallback); } else if (resourceType === 'groups') { - return await this.directoryGroups.handleRequest(request, callback); + return await this.directoryGroups.handleRequest(request, callback || this.eventCallback); } return { status: 404, data: {} }; diff --git a/npm/src/directory-sync/scim/Groups.ts b/npm/src/directory-sync/scim/Groups.ts index 6a199623c..c04fbc19d 100644 --- a/npm/src/directory-sync/scim/Groups.ts +++ b/npm/src/directory-sync/scim/Groups.ts @@ -4,11 +4,7 @@ import type { Group, DatabaseStore, PaginationParams, Response, GroupMembership import * as dbutils from '../../db/utils'; import { apiError, JacksonError } from '../../controller/error'; import { Base } from './Base'; - -const indexNames = { - directoryIdDisplayname: 'directoryIdDisplayname', - directoryId: 'directoryId', -}; +import { indexNames } from './utils'; interface CreateGroupParams { directoryId: string; @@ -171,7 +167,7 @@ export class Groups extends Base { user_id: userId, }, { - name: 'groupId', + name: indexNames.groupId, value: groupId, } ); @@ -310,7 +306,7 @@ export class Groups extends Base { try { const { data } = (await this.store('members').getByIndex( { - name: 'groupId', + name: indexNames.groupId, value: groupId, }, pageOffset, @@ -331,14 +327,16 @@ export class Groups extends Base { // Delete all groups from a directory async deleteAll(directoryId: string) { - const index = { - name: indexNames.directoryId, - value: directoryId, - }; - // eslint-disable-next-line no-constant-condition while (true) { - const { data: groups } = await this.store('groups').getByIndex(index, 0, this.bulkDeleteBatchSize); + const { data: groups } = await this.store('groups').getByIndex( + { + name: indexNames.directoryId, + value: directoryId, + }, + 0, + this.bulkDeleteBatchSize + ); if (!groups || groups.length === 0) { break; @@ -356,14 +354,16 @@ export class Groups extends Base { // Remove all users from a group public async removeAllUsers(groupId: string) { - const index = { - name: 'groupId', - value: groupId, - }; - // eslint-disable-next-line no-constant-condition while (true) { - const { data: members } = await this.store('members').getByIndex(index, 0, this.bulkDeleteBatchSize); + const { data: members } = await this.store('members').getByIndex( + { + name: indexNames.groupId, + value: groupId, + }, + 0, + this.bulkDeleteBatchSize + ); if (!members || members.length === 0) { break; diff --git a/npm/src/directory-sync/scim/Users.ts b/npm/src/directory-sync/scim/Users.ts index 541f6d4d0..13aaa1a2f 100644 --- a/npm/src/directory-sync/scim/Users.ts +++ b/npm/src/directory-sync/scim/Users.ts @@ -2,11 +2,7 @@ import type { User, DatabaseStore, PaginationParams, Response } from '../../typi import { apiError, JacksonError } from '../../controller/error'; import { Base } from './Base'; import { keyFromParts } from '../../db/utils'; - -const indexNames = { - directoryIdUsername: 'directoryIdUsername', - directoryId: 'directoryId', -}; +import { indexNames } from './utils'; /** * @swagger @@ -215,14 +211,16 @@ export class Users extends Base { // Delete all users from a directory async deleteAll(directoryId: string) { - const index = { - name: indexNames.directoryId, - value: directoryId, - }; - // eslint-disable-next-line no-constant-condition while (true) { - const { data: users } = await this.store('users').getByIndex(index, 0, this.bulkDeleteBatchSize); + const { data: users } = await this.store('users').getByIndex( + { + name: indexNames.directoryId, + value: directoryId, + }, + 0, + this.bulkDeleteBatchSize + ); if (!users || users.length === 0) { break; diff --git a/npm/src/directory-sync/scim/WebhookEventsLogger.ts b/npm/src/directory-sync/scim/WebhookEventsLogger.ts index f5a901a02..cb67320c9 100644 --- a/npm/src/directory-sync/scim/WebhookEventsLogger.ts +++ b/npm/src/directory-sync/scim/WebhookEventsLogger.ts @@ -8,7 +8,8 @@ import type { PaginationParams, } from '../../typings'; import { Base } from './Base'; -import { webhookEventTTL } from '../utils'; +import { webhookLogsTTL } from '../utils'; +import { indexNames } from './utils'; type GetAllParams = PaginationParams & { directoryId?: string; @@ -81,7 +82,7 @@ export class WebhookEventsLogger extends Base { }; await this.eventStore().put(id, log, { - name: 'directoryId', + name: indexNames.directoryId, value: directory.id, }); @@ -130,7 +131,7 @@ export class WebhookEventsLogger extends Base { if (directoryId) { const index = { - name: 'directoryId', + name: indexNames.directoryId, value: directoryId, }; @@ -148,14 +149,16 @@ export class WebhookEventsLogger extends Base { // Delete all event logs for a directory async deleteAll(directoryId: string) { - const index = { - name: 'directoryId', - value: directoryId, - }; - // eslint-disable-next-line no-constant-condition while (true) { - const { data: events } = await this.eventStore().getByIndex(index, 0, this.bulkDeleteBatchSize); + const { data: events } = await this.eventStore().getByIndex( + { + name: indexNames.directoryId, + value: directoryId, + }, + 0, + this.bulkDeleteBatchSize + ); if (!events || events.length === 0) { break; @@ -167,6 +170,6 @@ export class WebhookEventsLogger extends Base { // Get the store for the events private eventStore() { - return this.store('logs', webhookEventTTL); + return this.store('logs', webhookLogsTTL); } } diff --git a/npm/src/directory-sync/scim/utils.ts b/npm/src/directory-sync/scim/utils.ts index b87736e25..0dc275d76 100644 --- a/npm/src/directory-sync/scim/utils.ts +++ b/npm/src/directory-sync/scim/utils.ts @@ -3,6 +3,13 @@ import _ from 'lodash'; import { DirectorySyncProviders } from '../../typings'; import type { DirectoryType, User, UserPatchOperation, GroupPatchOperation } from '../../typings'; +export const indexNames = { + directoryIdUsername: 'directoryIdUsername', + directoryIdDisplayname: 'directoryIdDisplayname', + directoryId: 'directoryId', + groupId: 'groupId', +}; + const parseUserRoles = (roles: string | string[]) => { if (typeof roles === 'string') { return roles.split(','); diff --git a/npm/src/directory-sync/types.ts b/npm/src/directory-sync/types.ts index b6e2e7df7..5cf5ef004 100644 --- a/npm/src/directory-sync/types.ts +++ b/npm/src/directory-sync/types.ts @@ -8,7 +8,7 @@ import { WebhookEventsLogger } from './scim/WebhookEventsLogger'; import { ApiError } from '../typings'; import { RequestHandler } from './request'; import { EventProcessor } from './batch-events/queue'; -import { EventLock as Lock } from './batch-events/lock'; +import { CronLock as Lock } from '../cron/lock'; export type IDirectorySyncController = Awaited>; export type IDirectoryConfig = InstanceType; @@ -19,7 +19,7 @@ export type IGroups = InstanceType; export type IWebhookEventsLogger = InstanceType; export type IRequestHandler = InstanceType; export type IEventProcessor = InstanceType; -export type EventLock = InstanceType; +export type CronLock = InstanceType; export type DirectorySyncEventType = | 'user.created' diff --git a/npm/src/directory-sync/utils.ts b/npm/src/directory-sync/utils.ts index 1d3d4f625..ec3c076a0 100644 --- a/npm/src/directory-sync/utils.ts +++ b/npm/src/directory-sync/utils.ts @@ -14,8 +14,10 @@ import { sendPayloadToWebhook } from '../event/webhook'; import { transformEventPayload } from './scim/transform'; import { JacksonError } from '../controller/error'; -export const eventLockTTL = 6; -export const webhookEventTTL = 7 * 24 * 60 * 60; +export const eventLockTTL = 30; +export const webhookLogsTTL = 7 * 24 * 60 * 60; +export const eventLockKey = 'dsync-event-lock'; +export const googleLockKey = 'dsync-google-lock'; interface Payload { directory: Directory; diff --git a/npm/src/typings.ts b/npm/src/typings.ts index 217becff3..4835d23f0 100644 --- a/npm/src/typings.ts +++ b/npm/src/typings.ts @@ -7,6 +7,7 @@ export * from './directory-sync/types'; export * from './event/types'; import db from './db/db'; +import { EventCallback } from './typings'; export type DB = Awaited>; @@ -456,6 +457,7 @@ export interface JacksonOption { webhook?: Webhook; dsync?: { webhookBatchSize?: number; + webhookBatchCronInterval?: number; debugWebhooks?: boolean; providers?: { google: { @@ -463,8 +465,10 @@ export interface JacksonOption { clientSecret: string; authorizePath: string; callbackPath: string; + cronInterval?: number; }; }; + callback?: EventCallback; }; /** The number of days a setup link is valid for. Defaults to 3 days. */ diff --git a/npm/test/dsync/batch/webhooks.test.ts b/npm/test/dsync/batch/webhooks.test.ts index 95ba668cd..0d50a91b6 100644 --- a/npm/test/dsync/batch/webhooks.test.ts +++ b/npm/test/dsync/batch/webhooks.test.ts @@ -56,13 +56,11 @@ tap.before(async () => { log_webhook_events: true, }); - const eventCallback = directorySync.events.callback; - // Add some users to generate events - await directorySync.requests.handle(requests.create(directory1, users[0]), eventCallback); - await directorySync.requests.handle(requests.create(directory1, users[1]), eventCallback); - await directorySync.requests.handle(requests.create(directory2, users[1]), eventCallback); - await directorySync.requests.handle(requests.create(directory2, users[0]), eventCallback); + await directorySync.requests.handle(requests.create(directory1, users[0])); + await directorySync.requests.handle(requests.create(directory1, users[1])); + await directorySync.requests.handle(requests.create(directory2, users[1])); + await directorySync.requests.handle(requests.create(directory2, users[0])); }); tap.teardown(async () => { diff --git a/npm/test/dsync/google_api.test.ts b/npm/test/dsync/google_api.test.ts index 56fe0ef55..734840f9c 100644 --- a/npm/test/dsync/google_api.test.ts +++ b/npm/test/dsync/google_api.test.ts @@ -1,6 +1,6 @@ import tap from 'tap'; import nock from 'nock'; -import type { DirectorySyncEvent } from '@boxyhq/saml-jackson'; +import type { DirectorySyncEvent, JacksonOption } from '@boxyhq/saml-jackson'; import { jacksonOptions } from '../utils'; import { IDirectorySyncController, DirectoryType } from '../../src/typings'; @@ -148,8 +148,19 @@ const mockGroupMembersAPI = (groupKey: string, members: any[]) => { .reply(200, { members }); }; +let events: DirectorySyncEvent[] = []; + tap.before(async () => { - directorySyncController = (await (await import('../../src/index')).default(jacksonOptions)) + const options: JacksonOption = { + ...jacksonOptions, + dsync: { + callback: async (event: DirectorySyncEvent) => { + events.push(event); + }, + }, + }; + + directorySyncController = (await (await import('../../src/index')).default(options)) .directorySyncController; await directorySyncController.directories.create(directoryPayload); @@ -160,7 +171,7 @@ tap.teardown(async () => { }); tap.test('Sync 1', async (t) => { - const events: DirectorySyncEvent[] = []; + events = []; // Mock necessary API calls mockUsersAPI(fakeGoogleDirectory.users); @@ -168,9 +179,7 @@ tap.test('Sync 1', async (t) => { mockGroupMembersAPI('engineering', fakeGoogleDirectory.members.engineering); mockGroupMembersAPI('sales', fakeGoogleDirectory.members.sales); - await directorySyncController.sync(async (event: DirectorySyncEvent) => { - events.push(event); - }); + await directorySyncController.sync(); nock.cleanAll(); @@ -223,7 +232,7 @@ tap.test('Sync 1', async (t) => { }); tap.test('Sync 2', async (t) => { - const events: DirectorySyncEvent[] = []; + events = []; // Update user fakeGoogleDirectory.users[0].name.givenName = 'Eliza Updated'; @@ -236,9 +245,7 @@ tap.test('Sync 2', async (t) => { mockGroupMembersAPI('engineering', fakeGoogleDirectory.members.engineering); mockGroupMembersAPI('sales', fakeGoogleDirectory.members.sales); - await directorySyncController.sync(async (event: DirectorySyncEvent) => { - events.push(event); - }); + await directorySyncController.sync(); nock.cleanAll(); @@ -256,7 +263,7 @@ tap.test('Sync 2', async (t) => { }); tap.test('Sync 3', async (t) => { - const events: DirectorySyncEvent[] = []; + events = []; // Delete the last user const deleteUser = fakeGoogleDirectory.users.pop(); @@ -271,9 +278,7 @@ tap.test('Sync 3', async (t) => { mockGroupsAPI(fakeGoogleDirectory.groups); mockGroupMembersAPI('engineering', fakeGoogleDirectory.members.engineering); - await directorySyncController.sync(async (event: DirectorySyncEvent) => { - events.push(event); - }); + await directorySyncController.sync(); nock.cleanAll(); @@ -291,7 +296,7 @@ tap.test('Sync 3', async (t) => { }); tap.test('Sync 4', async (t) => { - const events: DirectorySyncEvent[] = []; + events = []; // Add new user const newUser = { @@ -321,9 +326,7 @@ tap.test('Sync 4', async (t) => { mockGroupMembersAPI('engineering', fakeGoogleDirectory.members.engineering); mockGroupMembersAPI('marketing', fakeGoogleDirectory.members.marketing); - await directorySyncController.sync(async (event: DirectorySyncEvent) => { - events.push(event); - }); + await directorySyncController.sync(); nock.cleanAll(); @@ -350,7 +353,7 @@ tap.test('Sync 4', async (t) => { }); tap.test('Sync 5', async (t) => { - const events: DirectorySyncEvent[] = []; + events = []; // Remove elizasmith from the engineering group fakeGoogleDirectory.members.engineering.shift(); @@ -369,9 +372,7 @@ tap.test('Sync 5', async (t) => { mockGroupMembersAPI('engineering', fakeGoogleDirectory.members.engineering); mockGroupMembersAPI('marketing', fakeGoogleDirectory.members.marketing); - await directorySyncController.sync(async (event: DirectorySyncEvent) => { - events.push(event); - }); + await directorySyncController.sync(); nock.cleanAll(); diff --git a/npm/test/dsync/webhooks.test.ts b/npm/test/dsync/webhooks.test.ts index dc5636d6c..2f910ce47 100644 --- a/npm/test/dsync/webhooks.test.ts +++ b/npm/test/dsync/webhooks.test.ts @@ -1,4 +1,4 @@ -import { IDirectorySyncController, Directory, DirectorySyncEvent, EventCallback } from '../../src/typings'; +import { IDirectorySyncController, Directory, DirectorySyncEvent } from '../../src/typings'; import tap from 'tap'; import groups from './data/groups'; import users from './data/users'; @@ -12,8 +12,6 @@ import { createSignatureString } from '../../src/event/webhook'; let directorySync: IDirectorySyncController; let directory: Directory; -let eventCallback: EventCallback; - const fakeDirectory = getFakeDirectory(); const webhook: Directory['webhook'] = { @@ -47,8 +45,6 @@ tap.before(async () => { directorySync.webhookLogs.setTenantAndProduct(directory.tenant, directory.product); directorySync.users.setTenantAndProduct(directory.tenant, directory.product); - - eventCallback = directorySync.events.callback; }); tap.teardown(async () => { @@ -80,7 +76,7 @@ tap.test('Webhook Events /', async (t) => { }); // Create a user - await directorySync.requests.handle(usersRequest.create(directory, users[0]), eventCallback); + await directorySync.requests.handle(usersRequest.create(directory, users[0])); const events = await directorySync.webhookLogs.getAll(); @@ -102,7 +98,7 @@ tap.test('Webhook Events /', async (t) => { }); // Create a user - await directorySync.requests.handle(usersRequest.create(directory, users[0]), eventCallback); + await directorySync.requests.handle(usersRequest.create(directory, users[0])); const events = await directorySync.webhookLogs.getAll(); @@ -116,7 +112,7 @@ tap.test('Webhook Events /', async (t) => { t.test('Should be able to get an event by id', async (t) => { // Create a user - await directorySync.requests.handle(usersRequest.create(directory, users[0]), eventCallback); + await directorySync.requests.handle(usersRequest.create(directory, users[0])); const logs = await directorySync.webhookLogs.getAll(); @@ -132,20 +128,17 @@ tap.test('Webhook Events /', async (t) => { // Create the user const { data: createdUser } = await directorySync.requests.handle( - usersRequest.create(directory, users[0]), - eventCallback + usersRequest.create(directory, users[0]) ); // Update the user const { data: updatedUser } = await directorySync.requests.handle( - usersRequest.updateById(directory, createdUser.id, users[0]), - eventCallback + usersRequest.updateById(directory, createdUser.id, users[0]) ); // Delete the user const { data: deletedUser } = await directorySync.requests.handle( - usersRequest.deleteById(directory, createdUser.id), - eventCallback + usersRequest.deleteById(directory, createdUser.id) ); mock.verify(); @@ -184,20 +177,17 @@ tap.test('Webhook Events /', async (t) => { // Create the group const { data: createdGroup } = await directorySync.requests.handle( - groupRequest.create(directory, groups[0]), - eventCallback + groupRequest.create(directory, groups[0]) ); // Update the group const { data: updatedGroup } = await directorySync.requests.handle( - groupRequest.updateById(directory, createdGroup.id, groups[0]), - eventCallback + groupRequest.updateById(directory, createdGroup.id, groups[0]) ); // Delete the group const { data: deletedGroup } = await directorySync.requests.handle( - groupRequest.deleteById(directory, createdGroup.id), - eventCallback + groupRequest.deleteById(directory, createdGroup.id) ); mock.verify(); @@ -235,20 +225,17 @@ tap.test('Webhook Events /', async (t) => { // Create the user const { data: createdUser } = await directorySync.requests.handle( - usersRequest.create(directory, users[0]), - eventCallback + usersRequest.create(directory, users[0]) ); // Create the group const { data: createdGroup } = await directorySync.requests.handle( - groupRequest.create(directory, groups[0]), - eventCallback + groupRequest.create(directory, groups[0]) ); // Add the user to the group await directorySync.requests.handle( - groupRequest.addMembers(directory, createdGroup.id, [{ value: createdUser.id }]), - eventCallback + groupRequest.addMembers(directory, createdGroup.id, [{ value: createdUser.id }]) ); // Remove the user from the group @@ -258,8 +245,7 @@ tap.test('Webhook Events /', async (t) => { createdGroup.id, [{ value: createdUser.id }], `members[value eq "${createdUser.id}"]` - ), - eventCallback + ) ); mock.verify(); diff --git a/pages/api/admin/terminus/models/[id]/index.ts b/pages/api/admin/terminus/models/[id]/index.ts index 1c6b8fb4f..cd6f9d6fb 100644 --- a/pages/api/admin/terminus/models/[id]/index.ts +++ b/pages/api/admin/terminus/models/[id]/index.ts @@ -20,7 +20,6 @@ const getModel = async (req: NextApiRequest, res: NextApiResponse) => { const { data } = await axios.get(getTerminusUrl(id), { headers: { Authorization: `api-key ${terminusOptions.adminToken}`, - 'x-access-token': terminusOptions.adminToken, // TODO: Remove this }, }); @@ -36,7 +35,6 @@ const saveModel = async (req: NextApiRequest, res: NextApiResponse) => { const { data } = await axios.post(getTerminusUrl(id), req.body, { headers: { Authorization: `api-key ${terminusOptions.adminToken}`, - 'x-access-token': terminusOptions.adminToken, // TODO: Remove this }, }); diff --git a/pages/api/admin/terminus/roles.ts b/pages/api/admin/terminus/roles.ts index 639952f1a..6280247ee 100644 --- a/pages/api/admin/terminus/roles.ts +++ b/pages/api/admin/terminus/roles.ts @@ -13,7 +13,6 @@ const getRoles = async (req: NextApiRequest, res: NextApiResponse) => { const { data } = await axios.get(`${terminusOptions.hostUrl}/v1/manage/roles`, { headers: { Authorization: `api-key ${terminusOptions.adminToken}`, - 'x-access-token': terminusOptions.adminToken, }, }); diff --git a/pages/api/scim/v2.0/[...directory].ts b/pages/api/scim/v2.0/[...directory].ts index 63de2bc16..0bab47695 100644 --- a/pages/api/scim/v2.0/[...directory].ts +++ b/pages/api/scim/v2.0/[...directory].ts @@ -25,10 +25,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) }, }; - const { status, data } = await directorySyncController.requests.handle( - request, - directorySyncController.events.callback - ); + const { status, data } = await directorySyncController.requests.handle(request); return res.status(status).json(data); } diff --git a/pages/api/v1/dsync/cron/process-events.ts b/pages/api/v1/dsync/cron/process-events.ts index 0f2ab593c..599401987 100644 --- a/pages/api/v1/dsync/cron/process-events.ts +++ b/pages/api/v1/dsync/cron/process-events.ts @@ -7,9 +7,9 @@ const handler = async (req: NextApiRequest, res: NextApiResponse) => { try { const { directorySyncController } = await jackson(); - await directorySyncController.events.batch.process(); + directorySyncController.events.batch.process(); - res.status(200).json({ message: 'Processing completed' }); + res.json({ message: 'Processing started' }); } catch (e: any) { res.status(500).json({ message: e.message || 'Processing failed' }); } diff --git a/pages/api/v1/dsync/cron/sync-google.ts b/pages/api/v1/dsync/cron/sync-google.ts index a4cf3e26c..2f84eab92 100644 --- a/pages/api/v1/dsync/cron/sync-google.ts +++ b/pages/api/v1/dsync/cron/sync-google.ts @@ -7,9 +7,9 @@ const handler = async (req: NextApiRequest, res: NextApiResponse) => { try { const { directorySyncController } = await jackson(); - await directorySyncController.sync(directorySyncController.events.callback); + directorySyncController.sync(); - res.status(200).json({ message: 'Sync completed' }); + res.json({ message: 'Sync started' }); } catch (e: any) { res.status(500).json({ message: e.message || 'Sync failed' }); }