2
0

first commit

This commit is contained in:
2024-08-09 00:39:27 +02:00
commit 79688abe2e
5698 changed files with 497838 additions and 0 deletions

View File

@@ -0,0 +1,91 @@
# Tasker
Tasker: "One who performs a task, as a day-laborer."
Task: "A function to be performed; an objective."
## What is it?
Introduces a new pattern called Tasker which may be switched out in the future for other third party services.
Also introduces a base `InternalTasker` which doesn't require third party dependencies and should work out of the box (by configuring a proper cron).
## Why is this needed?
The Tasker pattern is needed to streamline the execution of non-critical tasks in an application, providing a structured approach to task scheduling, execution, retrying, and cancellation. Here's why it's necessary:
1. **Offloading non-critical tasks**: There are tasks that don't need to be executed immediately on the main thread, such as sending emails, generating reports, or performing periodic maintenance tasks. Offloading these tasks to a separate queue or thread improves the responsiveness and efficiency of the main application.
2. **Retry mechanism**: Not all tasks succeed on the first attempt due to errors or external dependencies. This pattern incorporates a retry mechanism, which allows failed tasks to be retried automatically for a specified number of attempts. This improves the robustness of the system by handling temporary failures gracefully.
3. **Scheduled task execution**: Some tasks need to be executed at a specific time or after a certain delay. The Tasker pattern facilitates scheduling tasks for future execution, ensuring they are performed at the designated time without manual intervention.
4. **Task cancellation**: Occasionally, it's necessary to cancel a scheduled task due to changing requirements or user actions. The Tasker pattern supports task cancellation, enabling previously scheduled tasks to be revoked or removed from the queue before execution.
5. **Flexible implementation**: The Tasker pattern allows for flexibility in implementation by providing a base structure (`InternalTasker`) that can be extended or replaced with third-party services (`TriggerDevTasker`, `AwsSqsTasker`, etc.). This modularity ensures that the task execution mechanism can be adapted to suit different application requirements or environments.
Overall, the Tasker pattern enhances the reliability, performance, and maintainability by managing non-critical tasks in a systematic and efficient manner. It abstracts away the complexities of task execution, allowing developers to focus on core application logic while ensuring timely and reliable execution of background tasks.
## How does it work?
Since the Tasker is a pattern on itself, it will depend on the actual implementation. For example, a `TriggerDevTasker` will work very differently from an `AwsSqsTasker`.
For simplicity sake will explain how the `InternalTasker` works:
- Instead of running a non-critical task you schedule using the tasker:
```diff
const examplePayload = { example: "payload" };
- await sendWebhook(examplePayload);
+ await tasker.create("sendWebhook", JSON.stringify(examplePayload));
```
- This will create a new task to be run on the next processing of the task queue.
- Then on the next cron run it will be picked up and executed:
```ts
// /app/api/tasks/cron/route.ts
import tasker from "@calcom/features/tasker";
export async function GET() {
// authenticate the call...
await tasker.processQueue();
return Response.json({ success: true });
}
```
- By default, the cron will run each minute and will pick the next 100 tasks to be executed.
- If the tasks succeeds, it will be marked as `suceededAt: new Date()`. If if fails, the `attempts` prop will increase by 1 and will be retried on the next cron run.
- If `attempts` reaches `maxAttemps`, it will be considered a failed and won't be retried again.
- By default, tasks will be attempted up to 3 times. This can be overridden when creating a task.
- From here we can either keep a record of executed tasks, or we can setup another cron to cleanup all successful and failed tasks:
```ts
// /app/api/tasks/cleanup/route.ts
import tasker from "@calcom/features/tasker";
export async function GET() {
// authenticate the call...
await tasker.cleanup();
return Response.json({ success: true });
}
```
- This will delete all failed and successful tasks.
- A task is just a simple function receives a payload:
```ts
type TaskHandler = (payload: string) => Promise<void>;
```
## How to contribute?
You can contribute by either expanding the `InternalTasker` or creating new Taskers. To see how to add new Taskers, see the `tasker-factory.ts` file.
You can also take some inspiration by looking into previous attempts to add various Message Queue pull requests:
- [feat: Messaging Bus Implementation using AWS SQS OSSHack Challenge](https://github.com/calcom/cal.com/pull/12663)
- [feat: add opt-in ready-to-deploy message queue (QStash+Next.js functions)](https://github.com/calcom/cal.com/pull/12658)
- [feat: Implement A Message Queuing System](https://github.com/calcom/cal.com/pull/12655)
- [Message Queuing System](https://github.com/calcom/cal.com/pull/12654)
- [feat: Message Queuing System using Trigger.dev](https://github.com/calcom/cal.com/pull/12641)

View File

@@ -0,0 +1,13 @@
import type { NextRequest } from "next/server";
import { NextResponse } from "next/server";
import tasker from "..";
export async function GET(request: NextRequest) {
const authHeader = request.headers.get("authorization");
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return new Response("Unauthorized", { status: 401 });
}
await tasker.cleanup();
return NextResponse.json({ success: true });
}

View File

@@ -0,0 +1,13 @@
import type { NextRequest } from "next/server";
import { NextResponse } from "next/server";
import tasker from "..";
export async function GET(request: NextRequest) {
const authHeader = request.headers.get("authorization");
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return new Response("Unauthorized", { status: 401 });
}
await tasker.processQueue();
return NextResponse.json({ success: true });
}

View File

@@ -0,0 +1,14 @@
import type { Tasker } from "./tasker";
import { getTasker } from "./tasker-factory";
const globalForTasker = global as unknown as {
tasker: Tasker;
};
export const tasker = globalForTasker.tasker || getTasker();
if (process.env.NODE_ENV !== "production") {
globalForTasker.tasker = tasker;
}
export default tasker;

View File

@@ -0,0 +1,38 @@
import { Task } from "./repository";
import { type Tasker, type TaskTypes } from "./tasker";
import tasksMap from "./tasks";
/**
* This is the default internal Tasker that uses the Task repository to create tasks.
* It doens't have any external dependencies and is suitable for most use cases.
* To use a different Tasker, you can create a new class that implements the Tasker interface.
* Then, you can use the TaskerFactory to select the new Tasker.
*/
export class InternalTasker implements Tasker {
async create(type: TaskTypes, payload: string): Promise<string> {
return Task.create(type, payload);
}
async processQueue(): Promise<void> {
const tasks = await Task.getNextBatch();
const tasksPromises = tasks.map(async (task) => {
const taskHandlerGetter = tasksMap[task.type as keyof typeof tasksMap];
if (!taskHandlerGetter) throw new Error(`Task handler not found for type ${task.type}`);
const taskHandler = await taskHandlerGetter();
return taskHandler(task.payload)
.then(async () => {
await Task.succeed(task.id);
})
.catch(async (error) => {
await Task.retry(task.id, error instanceof Error ? error.message : "Unknown error");
});
});
const settled = await Promise.allSettled(tasksPromises);
const failed = settled.filter((result) => result.status === "rejected");
const succeded = settled.filter((result) => result.status === "fulfilled");
console.info({ failed, succeded });
}
async cleanup(): Promise<void> {
const count = await Task.cleanup();
console.info(`Cleaned up ${count} tasks`);
}
}

View File

@@ -0,0 +1,19 @@
import { type Tasker, type TaskTypes } from "./tasker";
/**
* RedisTasker is a tasker that uses Redis as a backend.
* WIP: This is a work in progress and is not fully implemented yet.
**/
export class RedisTasker implements Tasker {
async create(type: TaskTypes, payload: string): Promise<string> {
throw new Error("Method not implemented.");
}
processQueue(): Promise<void> {
throw new Error("Method not implemented.");
}
cleanup(): Promise<void> {
throw new Error("Method not implemented.");
}
}

View File

@@ -0,0 +1,148 @@
import { type Prisma } from "@prisma/client";
import db from "@calcom/prisma";
import { type TaskTypes } from "./tasker";
const whereSucceeded: Prisma.TaskWhereInput = {
succeededAt: { not: null },
};
const whereMaxAttemptsReached: Prisma.TaskWhereInput = {
attempts: {
equals: {
// @ts-expect-error prisma is tripping: '_ref' does not exist in type 'FieldRef<"Task", "Int">'
_ref: "maxAttempts",
_container: "Task",
},
},
};
const whereUpcomingTasks: Prisma.TaskWhereInput = {
// Get only tasks that have not succeeded yet
succeededAt: null,
// Get only tasks that are scheduled to run now or in the past
scheduledAt: {
lt: new Date(),
},
// Get only tasks where maxAttemps has not been reached
attempts: {
lt: {
// @ts-expect-error prisma is tripping: '_ref' does not exist in type 'FieldRef<"Task", "Int">'
_ref: "maxAttempts",
_container: "Task",
},
},
};
export class Task {
static async create(
type: TaskTypes,
payload: string,
options: { scheduledAt?: Date; maxAttempts?: number } = {}
) {
const { scheduledAt, maxAttempts } = options;
const newTask = await db.task.create({
data: {
payload,
type,
scheduledAt,
maxAttempts,
},
});
return newTask.id;
}
static async getNextBatch() {
return db.task.findMany({
where: whereUpcomingTasks,
orderBy: {
scheduledAt: "asc",
},
take: 100,
});
}
static async getAll() {
return db.task.findMany();
}
static async getFailed() {
return db.task.findMany({
where: whereMaxAttemptsReached,
});
}
static async getSucceeded() {
return db.task.findMany({
where: whereSucceeded,
});
}
static async count() {
return db.task.count();
}
static async countUpcoming() {
return db.task.count({
where: whereUpcomingTasks,
});
}
static async countFailed() {
return db.task.count({
where: whereMaxAttemptsReached,
});
}
static async countSucceeded() {
return db.task.count({
where: whereSucceeded,
});
}
static async retry(taskId: string, lastError?: string) {
return db.task.update({
where: {
id: taskId,
},
data: {
attempts: { increment: 1 },
lastError,
},
});
}
static async succeed(taskId: string) {
return db.task.update({
where: {
id: taskId,
},
data: {
attempts: { increment: 1 },
succeededAt: new Date(),
},
});
}
static async cancel(taskId: string) {
return db.task.delete({
where: {
id: taskId,
},
});
}
static async cleanup() {
return db.task.deleteMany({
where: {
OR: [
// Get tasks that have succeeded
whereSucceeded,
// Get tasks where maxAttemps has been reached
whereMaxAttemptsReached,
],
},
});
}
}

View File

@@ -0,0 +1,27 @@
import { InternalTasker } from "./internal-tasker";
// import { RedisTasker } from "./redis-tasker";
import { type Tasker, type TaskerTypes } from "./tasker";
/**
* This is a factory class that creates Taskers.
* The TaskerFactory is useful when you want to use a different Tasker in different environments.
* For example, you can use the InternalTasker in development and the AWSSQSTasker in production.
*/
export class TaskerFactory {
createTasker(type?: TaskerTypes): Tasker {
// TODO: Add more alternative Taskers in the future:
// RedisTasker, TriggerDevTasker, TemporalIOTasker, AWSSQSTasker, etc.
// TODO: Uncomment the following line when RedisTasker is implemented.
// if (type === "redis") return new RedisTasker();
// For now, we only have the InternalTasker.
if (type === "internal") return new InternalTasker();
// Default to InternalTasker
return new InternalTasker();
}
}
/** Shorthand for getting the default Tasker */
export function getTasker() {
const taskerFactory = new TaskerFactory();
return taskerFactory.createTasker();
}

View File

@@ -0,0 +1,9 @@
export type TaskerTypes = "internal" | "redis";
export type TaskTypes = "sendEmail" | "sendWebhook" | "sendSms";
export type TaskHandler = (payload: string) => Promise<void>;
export interface Tasker {
/** Create a new task with the given type and payload. */
create(type: TaskTypes, payload: string): Promise<string>;
processQueue(): Promise<void>;
cleanup(): Promise<void>;
}

View File

@@ -0,0 +1,14 @@
import type { TaskHandler, TaskTypes } from "../tasker";
/**
* This is a map of all the tasks that the Tasker can handle.
* The keys are the TaskTypes and the values are the task handlers.
* The task handlers are imported dynamically to avoid circular dependencies.
*/
const tasks: Record<TaskTypes, () => Promise<TaskHandler>> = {
sendEmail: () => import("./sendEmail").then((module) => module.sendEmail),
sendWebhook: () => import("./sendWebook").then((module) => module.sendWebhook),
sendSms: () => Promise.resolve(() => Promise.reject(new Error("Not implemented"))),
};
export default tasks;

View File

@@ -0,0 +1,25 @@
import { z } from "zod";
const sendEmailPayloadSchema = z.object({
/** */
to: z.string(),
/** The email template to send */
template: z.string(),
payload: z.string(),
});
export async function sendEmail(payload: string): Promise<void> {
try {
const parsedPayload = sendEmailPayloadSchema.parse(JSON.parse(payload));
console.log(parsedPayload);
const emails = await import("@calcom/emails");
const email = emails[parsedPayload.template as keyof typeof emails];
if (!email) throw new Error("Invalid email template");
// @ts-expect-error - TODO bring back email type safety
await email(parsedPayload.to);
} catch (error) {
// ... handle error
console.error(error);
throw error;
}
}

View File

@@ -0,0 +1,29 @@
import { z } from "zod";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
const sendWebhookPayloadSchema = z.object({
secretKey: z.string().nullable(),
triggerEvent: z.string(),
createdAt: z.string(),
webhook: z.object({
subscriberUrl: z.string().url(),
appId: z.string().nullable(),
payloadTemplate: z.string().nullable(),
}),
// TODO: Define the data schema
data: z.any(),
});
export async function sendWebhook(payload: string): Promise<void> {
try {
const { secretKey, triggerEvent, createdAt, webhook, data } = sendWebhookPayloadSchema.parse(
JSON.parse(payload)
);
await sendPayload(secretKey, triggerEvent, createdAt, webhook, data);
} catch (error) {
// ... handle error
console.error(error);
throw error;
}
}