diff --git a/apps/sim/lib/webhooks/polling/google-drive.ts b/apps/sim/lib/webhooks/polling/google-drive.ts index d57c19fa50..6dcdd9cdff 100644 --- a/apps/sim/lib/webhooks/polling/google-drive.ts +++ b/apps/sim/lib/webhooks/polling/google-drive.ts @@ -89,13 +89,12 @@ export const googleDrivePollingHandler: PollingProviderHandler = { const config = webhookData.providerConfig as unknown as GoogleDriveWebhookConfig - // First poll: get startPageToken and seed state + // First poll (or re-seed after 410): seed page token, preserve any existing known file IDs. if (!config.pageToken) { const startPageToken = await getStartPageToken(accessToken, config, requestId, logger) - await updateWebhookProviderConfig( webhookId, - { pageToken: startPageToken, knownFileIds: [] }, + { pageToken: startPageToken, knownFileIds: config.knownFileIds ?? [] }, logger ) await markWebhookSuccess(webhookId, logger) @@ -105,7 +104,6 @@ export const googleDrivePollingHandler: PollingProviderHandler = { return 'success' } - // Fetch changes since last pageToken const { changes, newStartPageToken } = await fetchChanges( accessToken, config, @@ -120,7 +118,6 @@ export const googleDrivePollingHandler: PollingProviderHandler = { return 'success' } - // Filter changes client-side (folder, MIME type, trashed) const filteredChanges = filterChanges(changes, config) if (!filteredChanges.length) { @@ -145,11 +142,6 @@ export const googleDrivePollingHandler: PollingProviderHandler = { logger ) - // Update state: new pageToken and rolling knownFileIds. - // Newest IDs are placed first so that when the set exceeds MAX_KNOWN_FILE_IDS, - // the oldest (least recently seen) IDs are evicted. Recent files are more - // likely to be modified again, so keeping them prevents misclassifying a - // repeat modification as a "created" event. const existingKnownIds = config.knownFileIds || [] const mergedKnownIds = [...new Set([...newKnownFileIds, ...existingKnownIds])].slice( 0, @@ -180,6 +172,21 @@ export const googleDrivePollingHandler: PollingProviderHandler = { ) return 'success' } catch (error) { + if (error instanceof Error && error.name === 'DrivePageTokenInvalidError') { + await updateWebhookProviderConfig(webhookId, { pageToken: undefined }, logger) + await markWebhookSuccess(webhookId, logger) + logger.warn( + `[${requestId}] Drive page token invalid for webhook ${webhookId}, re-seeding on next poll` + ) + return 'success' + } + if (error instanceof Error && error.name === 'DriveRateLimitError') { + await markWebhookSuccess(webhookId, logger) + logger.warn( + `[${requestId}] Drive API rate limited for webhook ${webhookId}, skipping to retry next poll cycle` + ) + return 'success' + } logger.error(`[${requestId}] Error processing Google Drive webhook ${webhookId}:`, error) await markWebhookFailed(webhookId, logger) return 'failure' @@ -187,6 +194,16 @@ export const googleDrivePollingHandler: PollingProviderHandler = { }, } +const DRIVE_RATE_LIMIT_REASONS = new Set(['rateLimitExceeded', 'userRateLimitExceeded']) + +/** Returns true only for quota/rate-limit 403s, not permission errors. */ +function isDriveRateLimitError(status: number, errorData: Record): boolean { + if (status !== 403) return false + const reason = (errorData as { error?: { errors?: { reason?: string }[] } })?.error?.errors?.[0] + ?.reason + return reason !== undefined && DRIVE_RATE_LIMIT_REASONS.has(reason) +} + async function getStartPageToken( accessToken: string, config: GoogleDriveWebhookConfig, @@ -204,9 +221,15 @@ async function getStartPageToken( }) if (!response.ok) { + const status = response.status const errorData = await response.json().catch(() => ({})) + if (status === 429 || isDriveRateLimitError(status, errorData)) { + const err = new Error(`Drive API rate limit (${status}): ${JSON.stringify(errorData)}`) + err.name = 'DriveRateLimitError' + throw err + } throw new Error( - `Failed to get startPageToken: ${response.status} - ${JSON.stringify(errorData)}` + `Failed to get Drive start page token: ${status} - ${JSON.stringify(errorData)}` ) } @@ -227,7 +250,6 @@ async function fetchChanges( const maxFiles = config.maxFilesPerPoll || MAX_FILES_PER_POLL let pages = 0 - // eslint-disable-next-line no-constant-condition while (true) { pages++ const params = new URLSearchParams({ @@ -248,8 +270,19 @@ async function fetchChanges( }) if (!response.ok) { + const status = response.status const errorData = await response.json().catch(() => ({})) - throw new Error(`Failed to fetch changes: ${response.status} - ${JSON.stringify(errorData)}`) + if (status === 410) { + const err = new Error('Drive page token is no longer valid') + err.name = 'DrivePageTokenInvalidError' + throw err + } + if (status === 429 || isDriveRateLimitError(status, errorData)) { + const err = new Error(`Drive API rate limit (${status}): ${JSON.stringify(errorData)}`) + err.name = 'DriveRateLimitError' + throw err + } + throw new Error(`Failed to fetch Drive changes: ${status} - ${JSON.stringify(errorData)}`) } const data = await response.json() @@ -274,12 +307,9 @@ async function fetchChanges( currentPageToken = data.nextPageToken as string } + // When allChanges exceeds maxFiles (multi-page overshoot), resume mid-list via lastNextPageToken. + // Otherwise resume from newStartPageToken (end of change list) or lastNextPageToken (MAX_PAGES hit). const slicingOccurs = allChanges.length > maxFiles - // Drive API guarantees exactly one of nextPageToken or newStartPageToken per response. - // Slicing case: prefer lastNextPageToken (mid-list resume); fall back to newStartPageToken - // (guaranteed on final page when hasMore was false). Non-slicing case: prefer newStartPageToken - // (guaranteed when loop exhausted all pages); fall back to lastNextPageToken (when loop exited - // early due to MAX_PAGES with hasMore still true). const resumeToken = slicingOccurs ? (lastNextPageToken ?? newStartPageToken!) : (newStartPageToken ?? lastNextPageToken!) @@ -292,16 +322,13 @@ function filterChanges( config: GoogleDriveWebhookConfig ): DriveChangeEntry[] { return changes.filter((change) => { - // Always include removals (deletions) if (change.removed) return true const file = change.file if (!file) return false - // Exclude trashed files if (file.trashed) return false - // Folder filter: check if file is in the specified folder const folderId = config.folderId || config.manualFolderId if (folderId) { if (!file.parents || !file.parents.includes(folderId)) { @@ -309,9 +336,7 @@ function filterChanges( } } - // MIME type filter if (config.mimeTypeFilter) { - // Support prefix matching (e.g., "image/" matches "image/png", "image/jpeg") if (config.mimeTypeFilter.endsWith('/')) { if (!file.mimeType.startsWith(config.mimeTypeFilter)) { return false @@ -339,7 +364,6 @@ async function processChanges( const knownFileIdsSet = new Set(config.knownFileIds || []) for (const change of changes) { - // Determine event type before idempotency to avoid caching filter decisions let eventType: 'created' | 'modified' | 'deleted' if (change.removed) { eventType = 'deleted' @@ -349,12 +373,12 @@ async function processChanges( eventType = 'modified' } - // Track file as known regardless of filter (for future create/modify distinction) + // Track file as known regardless of filter so future changes are correctly classified if (!change.removed) { newKnownFileIds.push(change.fileId) } - // Client-side event type filter — skip before idempotency so filtered events aren't cached + // Apply event type filter before idempotency so filtered events aren't cached const filter = config.eventTypeFilter if (filter) { const skip = filter === 'created_or_modified' ? eventType === 'deleted' : eventType !== filter