fix: meeting ended trigger for webhooks and zapier sometimes not working (#10946)

Co-authored-by: mohammed gehad <mohammed.gehad.1998@gmail.com>
Co-authored-by: Monto <138862352+monto7926@users.noreply.github.com>
Co-authored-by: Carina Wollendorfer <30310907+CarinaWolli@users.noreply.github.com>
This commit is contained in:
Monto 2023-08-31 01:17:42 +02:00 committed by GitHub
parent bc9aeef710
commit 25684f9040
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 173 additions and 42 deletions

View File

@ -0,0 +1,23 @@
name: Cron - webhookTriggers
on:
# "Scheduled workflows run on the latest commit on the default or base branch."
# — https://docs.github.com/en/actions/learn-github-actions/events-that-trigger-workflows#schedule
schedule:
# Runs “every 5 minutes” (see https://crontab.guru)
- cron: "*/5 * * * *"
jobs:
cron-webhookTriggers:
env:
APP_URL: ${{ secrets.APP_URL }}
CRON_API_KEY: ${{ secrets.CRON_API_KEY }}
runs-on: ubuntu-latest
steps:
- name: cURL request
if: ${{ env.APP_URL && env.CRON_API_KEY }}
run: |
curl ${{ secrets.APP_URL }}/api/cron/webhookTriggers \
-X POST \
-H 'content-type: application/json' \
-H 'authorization: ${{ secrets.CRON_API_KEY }}' \
--fail

View File

@ -0,0 +1 @@
export { default } from "@calcom/features/webhooks/lib/cron";

View File

@ -2,8 +2,8 @@ import type { Prisma } from "@prisma/client";
import type { NextApiRequest, NextApiResponse } from "next";
import { v4 } from "uuid";
import { scheduleTrigger } from "@calcom/app-store/zapier/lib/nodeScheduler";
import findValidApiKey from "@calcom/features/ee/api-keys/lib/findValidApiKey";
import { scheduleTrigger } from "@calcom/features/webhooks/lib/scheduleTrigger";
import { defaultHandler, defaultResponder } from "@calcom/lib/server";
import prisma from "@calcom/prisma";
import { BookingStatus, WebhookTriggerEvents } from "@calcom/prisma/enums";

View File

@ -5,7 +5,6 @@ import appStore from "@calcom/app-store";
import { getCalendar } from "@calcom/app-store/_utils/getCalendar";
import { FAKE_DAILY_CREDENTIAL } from "@calcom/app-store/dailyvideo/lib/VideoApiAdapter";
import { DailyLocationType } from "@calcom/app-store/locations";
import { cancelScheduledJobs } from "@calcom/app-store/zapier/lib/nodeScheduler";
import { deleteMeeting, updateMeeting } from "@calcom/core/videoClient";
import dayjs from "@calcom/dayjs";
import { sendCancelledEmails, sendCancelledSeatEmails } from "@calcom/emails";
@ -16,6 +15,7 @@ import { sendCancelledReminders } from "@calcom/features/ee/workflows/lib/remind
import { deleteScheduledSMSReminder } from "@calcom/features/ee/workflows/lib/reminders/smsReminderManager";
import { deleteScheduledWhatsappReminder } from "@calcom/features/ee/workflows/lib/reminders/whatsappReminderManager";
import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks";
import { cancelScheduledJobs } from "@calcom/features/webhooks/lib/scheduleTrigger";
import type { EventTypeInfo } from "@calcom/features/webhooks/lib/sendPayload";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import { isPrismaObjOrUndefined, parseRecurringEvent } from "@calcom/lib";

View File

@ -1,12 +1,12 @@
import type { Prisma, Workflow, WorkflowsOnEventTypes, WorkflowStep } from "@prisma/client";
import { scheduleTrigger } from "@calcom/app-store/zapier/lib/nodeScheduler";
import type { EventManagerUser } from "@calcom/core/EventManager";
import EventManager from "@calcom/core/EventManager";
import { sendScheduledEmails } from "@calcom/emails";
import { isEventTypeOwnerKYCVerified } from "@calcom/features/ee/workflows/lib/isEventTypeOwnerKYCVerified";
import { scheduleWorkflowReminders } from "@calcom/features/ee/workflows/lib/reminders/reminderScheduler";
import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks";
import { scheduleTrigger } from "@calcom/features/webhooks/lib/scheduleTrigger";
import type { EventTypeInfo } from "@calcom/features/webhooks/lib/sendPayload";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import { getTeamIdFromEventType } from "@calcom/lib/getTeamIdFromEventType";

View File

@ -19,7 +19,6 @@ import {
} from "@calcom/app-store/locations";
import type { EventTypeAppsList } from "@calcom/app-store/utils";
import { getAppFromSlug } from "@calcom/app-store/utils";
import { cancelScheduledJobs, scheduleTrigger } from "@calcom/app-store/zapier/lib/nodeScheduler";
import EventManager from "@calcom/core/EventManager";
import { getEventName } from "@calcom/core/event";
import { getUserAvailability } from "@calcom/core/getUserAvailability";
@ -48,6 +47,7 @@ import {
import { getFullName } from "@calcom/features/form-builder/utils";
import type { GetSubscriberOptions } from "@calcom/features/webhooks/lib/getWebhooks";
import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks";
import { cancelScheduledJobs, scheduleTrigger } from "@calcom/features/webhooks/lib/scheduleTrigger";
import { isPrismaObjOrUndefined, parseRecurringEvent } from "@calcom/lib";
import { getVideoCallUrlFromCalEvent } from "@calcom/lib/CalEventParser";
import { checkRateLimitAndThrowError } from "@calcom/lib/checkRateLimitAndThrowError";

View File

@ -0,0 +1,96 @@
/* Cron job for scheduled webhook events triggers */
import type { NextApiRequest, NextApiResponse } from "next";
import dayjs from "@calcom/dayjs";
import { defaultHandler } from "@calcom/lib/server";
import prisma from "@calcom/prisma";
async function handler(req: NextApiRequest, res: NextApiResponse) {
const apiKey = req.headers.authorization || req.query.apiKey;
if (process.env.CRON_API_KEY !== apiKey) {
res.status(401).json({ message: "Not authenticated" });
return;
}
// get jobs that should be run
const jobsToRun = await prisma.webhookScheduledTriggers.findMany({
where: {
startAfter: {
lte: dayjs().toISOString(),
},
},
});
// run jobs
for (const job of jobsToRun) {
try {
await fetch(job.subscriberUrl, {
method: "POST",
body: job.payload,
});
} catch (error) {
console.log(`Error running webhook trigger (retry count: ${job.retryCount}): ${error}`);
// if job fails, retry again for 5 times.
if (job.retryCount < 5) {
await prisma.webhookScheduledTriggers.update({
where: {
id: job.id,
},
data: {
retryCount: {
increment: 1,
},
startAfter: dayjs()
.add(5 * (job.retryCount + 1), "minutes")
.toISOString(),
},
});
return res.json({ ok: false });
}
}
const parsedJobPayload = JSON.parse(job.payload) as {
id: number; // booking id
endTime: string;
scheduledJobs: string[];
triggerEvent: string;
};
// clean finished job
await prisma.webhookScheduledTriggers.delete({
where: {
id: job.id,
},
});
const booking = await prisma.booking.findUnique({
where: { id: parsedJobPayload.id },
select: { id: true, scheduledJobs: true },
});
if (!booking) {
console.log("Error finding booking in webhook trigger:", parsedJobPayload);
return res.json({ ok: false });
}
//remove scheduled job from bookings once triggered
const updatedScheduledJobs = booking.scheduledJobs.filter((scheduledJob) => {
return scheduledJob !== job.jobName;
});
await prisma.booking.update({
where: {
id: booking.id,
},
data: {
scheduledJobs: updatedScheduledJobs,
},
});
}
res.json({ ok: true });
}
export default defaultHandler({
POST: Promise.resolve({ default: handler }),
});

View File

@ -1,5 +1,3 @@
import schedule from "node-schedule";
import prisma from "@calcom/prisma";
import { WebhookTriggerEvents } from "@calcom/prisma/enums";
@ -9,45 +7,32 @@ export async function scheduleTrigger(
subscriber: { id: string; appId: string | null }
) {
try {
//schedule job to call subscriber url at the end of meeting
// FIXME: in-process scheduling - job will vanish on server crash / restart
const job = schedule.scheduleJob(
`${subscriber.appId}_${subscriber.id}`,
booking.endTime,
async function () {
const body = JSON.stringify({ triggerEvent: WebhookTriggerEvents.MEETING_ENDED, ...booking });
await fetch(subscriberUrl, {
method: "POST",
body,
});
const payload = JSON.stringify({ triggerEvent: WebhookTriggerEvents.MEETING_ENDED, ...booking });
const jobName = `${subscriber.appId}_${subscriber.id}`;
//remove scheduled job from bookings once triggered
const updatedScheduledJobs = booking.scheduledJobs.filter((scheduledJob) => {
return scheduledJob !== `${subscriber.appId}_${subscriber.id}`;
});
await prisma.booking.update({
where: {
id: booking.id,
},
data: {
scheduledJobs: updatedScheduledJobs,
},
});
}
);
// add scheduled job to database
const createTrigger = prisma.webhookScheduledTriggers.create({
data: {
jobName,
payload,
startAfter: booking.endTime,
subscriberUrl,
},
});
//add scheduled job name to booking
await prisma.booking.update({
const updateBooking = prisma.booking.update({
where: {
id: booking.id,
},
data: {
scheduledJobs: {
push: job.name,
push: jobName,
},
},
});
await prisma.$transaction([createTrigger, updateBooking]);
} catch (error) {
console.error("Error cancelling scheduled jobs", error);
}
@ -64,16 +49,20 @@ export async function cancelScheduledJobs(
const promises = booking.scheduledJobs.map(async (scheduledJob) => {
if (appId) {
if (scheduledJob.startsWith(appId)) {
if (schedule.scheduledJobs[scheduledJob]) {
schedule.scheduledJobs[scheduledJob].cancel();
}
await prisma.webhookScheduledTriggers.deleteMany({
where: {
jobName: scheduledJob,
},
});
scheduledJobs = scheduledJobs?.filter((job) => scheduledJob !== job) || [];
}
} else {
//if no specific appId given, delete all scheduled jobs of booking
if (schedule.scheduledJobs[scheduledJob]) {
schedule.scheduledJobs[scheduledJob].cancel();
}
await prisma.webhookScheduledTriggers.deleteMany({
where: {
jobName: scheduledJob,
},
});
scheduledJobs = [];
}

View File

@ -0,0 +1,12 @@
-- CreateTable
CREATE TABLE "WebhookScheduledTriggers" (
"id" SERIAL NOT NULL,
"jobName" TEXT NOT NULL,
"subscriberUrl" TEXT NOT NULL,
"payload" TEXT NOT NULL,
"startAfter" TIMESTAMP(3) NOT NULL,
"retryCount" INTEGER NOT NULL DEFAULT 0,
"createdAt" TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "WebhookScheduledTriggers_pkey" PRIMARY KEY ("id")
);

View File

@ -807,6 +807,16 @@ model WorkflowReminder {
@@index([seatReferenceId])
}
model WebhookScheduledTriggers {
id Int @id @default(autoincrement())
jobName String
subscriberUrl String
payload String
startAfter DateTime
retryCount Int @default(0)
createdAt DateTime? @default(now())
}
enum WorkflowTemplates {
REMINDER
CUSTOM

View File

@ -1,10 +1,10 @@
import z from "zod";
import { getCalendar } from "@calcom/app-store/_utils/getCalendar";
import { cancelScheduledJobs } from "@calcom/app-store/zapier/lib/nodeScheduler";
import { DailyLocationType } from "@calcom/core/location";
import { sendCancelledEmails } from "@calcom/emails";
import { getCalEventResponses } from "@calcom/features/bookings/lib/getCalEventResponses";
import { cancelScheduledJobs } from "@calcom/features/webhooks/lib/scheduleTrigger";
import { isPrismaObjOrUndefined, parseRecurringEvent } from "@calcom/lib";
import getPaymentAppData from "@calcom/lib/getPaymentAppData";
import { deletePayment } from "@calcom/lib/payment/deletePayment";

View File

@ -2,7 +2,6 @@ import type { BookingReference, EventType } from "@prisma/client";
import type { TFunction } from "next-i18next";
import { getCalendar } from "@calcom/app-store/_utils/getCalendar";
import { cancelScheduledJobs } from "@calcom/app-store/zapier/lib/nodeScheduler";
import { CalendarEventBuilder } from "@calcom/core/builders/CalendarEvent/builder";
import { CalendarEventDirector } from "@calcom/core/builders/CalendarEvent/director";
import { deleteMeeting } from "@calcom/core/videoClient";
@ -13,6 +12,7 @@ import { deleteScheduledWhatsappReminder } from "@calcom/ee/workflows/lib/remind
import { sendRequestRescheduleEmail } from "@calcom/emails";
import { getCalEventResponses } from "@calcom/features/bookings/lib/getCalEventResponses";
import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks";
import { cancelScheduledJobs } from "@calcom/features/webhooks/lib/scheduleTrigger";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import { isPrismaObjOrUndefined } from "@calcom/lib";
import { getTeamIdFromEventType } from "@calcom/lib/getTeamIdFromEventType";