Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 50 additions & 26 deletions apps/sim/lib/webhooks/polling/google-drive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,12 @@ export const googleDrivePollingHandler: PollingProviderHandler = {

const config = webhookData.providerConfig as unknown as GoogleDriveWebhookConfig

// First poll: get startPageToken and seed state
// First poll (or re-seed after 410): seed page token, preserve any existing known file IDs.
if (!config.pageToken) {
const startPageToken = await getStartPageToken(accessToken, config, requestId, logger)

await updateWebhookProviderConfig(
webhookId,
{ pageToken: startPageToken, knownFileIds: [] },
{ pageToken: startPageToken, knownFileIds: config.knownFileIds ?? [] },
logger
)
await markWebhookSuccess(webhookId, logger)
Expand All @@ -105,7 +104,6 @@ export const googleDrivePollingHandler: PollingProviderHandler = {
return 'success'
}

// Fetch changes since last pageToken
const { changes, newStartPageToken } = await fetchChanges(
accessToken,
config,
Expand All @@ -120,7 +118,6 @@ export const googleDrivePollingHandler: PollingProviderHandler = {
return 'success'
}

// Filter changes client-side (folder, MIME type, trashed)
const filteredChanges = filterChanges(changes, config)

if (!filteredChanges.length) {
Expand All @@ -145,11 +142,6 @@ export const googleDrivePollingHandler: PollingProviderHandler = {
logger
)

// Update state: new pageToken and rolling knownFileIds.
// Newest IDs are placed first so that when the set exceeds MAX_KNOWN_FILE_IDS,
// the oldest (least recently seen) IDs are evicted. Recent files are more
// likely to be modified again, so keeping them prevents misclassifying a
// repeat modification as a "created" event.
const existingKnownIds = config.knownFileIds || []
const mergedKnownIds = [...new Set([...newKnownFileIds, ...existingKnownIds])].slice(
0,
Expand Down Expand Up @@ -180,13 +172,38 @@ export const googleDrivePollingHandler: PollingProviderHandler = {
)
return 'success'
} catch (error) {
if (error instanceof Error && error.name === 'DrivePageTokenInvalidError') {
await updateWebhookProviderConfig(webhookId, { pageToken: undefined }, logger)
await markWebhookSuccess(webhookId, logger)
logger.warn(
`[${requestId}] Drive page token invalid for webhook ${webhookId}, re-seeding on next poll`
)
return 'success'
}
if (error instanceof Error && error.name === 'DriveRateLimitError') {
await markWebhookSuccess(webhookId, logger)
logger.warn(
`[${requestId}] Drive API rate limited for webhook ${webhookId}, skipping to retry next poll cycle`
)
return 'success'
}
logger.error(`[${requestId}] Error processing Google Drive webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
},
}

const DRIVE_RATE_LIMIT_REASONS = new Set(['rateLimitExceeded', 'userRateLimitExceeded'])

/** Returns true only for quota/rate-limit 403s, not permission errors. */
function isDriveRateLimitError(status: number, errorData: Record<string, unknown>): boolean {
if (status !== 403) return false
const reason = (errorData as { error?: { errors?: { reason?: string }[] } })?.error?.errors?.[0]
?.reason
return reason !== undefined && DRIVE_RATE_LIMIT_REASONS.has(reason)
}

async function getStartPageToken(
accessToken: string,
config: GoogleDriveWebhookConfig,
Expand All @@ -204,9 +221,15 @@ async function getStartPageToken(
})

if (!response.ok) {
const status = response.status
const errorData = await response.json().catch(() => ({}))
if (status === 429 || isDriveRateLimitError(status, errorData)) {
const err = new Error(`Drive API rate limit (${status}): ${JSON.stringify(errorData)}`)
err.name = 'DriveRateLimitError'
throw err
}
throw new Error(
`Failed to get startPageToken: ${response.status} - ${JSON.stringify(errorData)}`
`Failed to get Drive start page token: ${status} - ${JSON.stringify(errorData)}`
)
}

Expand All @@ -227,7 +250,6 @@ async function fetchChanges(
const maxFiles = config.maxFilesPerPoll || MAX_FILES_PER_POLL
let pages = 0

// eslint-disable-next-line no-constant-condition
while (true) {
pages++
const params = new URLSearchParams({
Expand All @@ -248,8 +270,19 @@ async function fetchChanges(
})

if (!response.ok) {
const status = response.status
const errorData = await response.json().catch(() => ({}))
throw new Error(`Failed to fetch changes: ${response.status} - ${JSON.stringify(errorData)}`)
if (status === 410) {
const err = new Error('Drive page token is no longer valid')
err.name = 'DrivePageTokenInvalidError'
throw err
}
if (status === 429 || isDriveRateLimitError(status, errorData)) {
const err = new Error(`Drive API rate limit (${status}): ${JSON.stringify(errorData)}`)
err.name = 'DriveRateLimitError'
throw err
}
throw new Error(`Failed to fetch Drive changes: ${status} - ${JSON.stringify(errorData)}`)
}

const data = await response.json()
Expand All @@ -274,12 +307,9 @@ async function fetchChanges(
currentPageToken = data.nextPageToken as string
}

// When allChanges exceeds maxFiles (multi-page overshoot), resume mid-list via lastNextPageToken.
// Otherwise resume from newStartPageToken (end of change list) or lastNextPageToken (MAX_PAGES hit).
const slicingOccurs = allChanges.length > maxFiles
// Drive API guarantees exactly one of nextPageToken or newStartPageToken per response.
// Slicing case: prefer lastNextPageToken (mid-list resume); fall back to newStartPageToken
// (guaranteed on final page when hasMore was false). Non-slicing case: prefer newStartPageToken
// (guaranteed when loop exhausted all pages); fall back to lastNextPageToken (when loop exited
// early due to MAX_PAGES with hasMore still true).
const resumeToken = slicingOccurs
? (lastNextPageToken ?? newStartPageToken!)
: (newStartPageToken ?? lastNextPageToken!)
Expand All @@ -292,26 +322,21 @@ function filterChanges(
config: GoogleDriveWebhookConfig
): DriveChangeEntry[] {
return changes.filter((change) => {
// Always include removals (deletions)
if (change.removed) return true

const file = change.file
if (!file) return false

// Exclude trashed files
if (file.trashed) return false

// Folder filter: check if file is in the specified folder
const folderId = config.folderId || config.manualFolderId
if (folderId) {
if (!file.parents || !file.parents.includes(folderId)) {
return false
}
}

// MIME type filter
if (config.mimeTypeFilter) {
// Support prefix matching (e.g., "image/" matches "image/png", "image/jpeg")
if (config.mimeTypeFilter.endsWith('/')) {
if (!file.mimeType.startsWith(config.mimeTypeFilter)) {
return false
Expand Down Expand Up @@ -339,7 +364,6 @@ async function processChanges(
const knownFileIdsSet = new Set(config.knownFileIds || [])

for (const change of changes) {
// Determine event type before idempotency to avoid caching filter decisions
let eventType: 'created' | 'modified' | 'deleted'
if (change.removed) {
eventType = 'deleted'
Expand All @@ -349,12 +373,12 @@ async function processChanges(
eventType = 'modified'
}

// Track file as known regardless of filter (for future create/modify distinction)
// Track file as known regardless of filter so future changes are correctly classified
if (!change.removed) {
newKnownFileIds.push(change.fileId)
}

// Client-side event type filter — skip before idempotency so filtered events aren't cached
// Apply event type filter before idempotency so filtered events aren't cached
const filter = config.eventTypeFilter
if (filter) {
const skip = filter === 'created_or_modified' ? eventType === 'deleted' : eventType !== filter
Expand Down
Loading