Learning to make use of AWS Lambda function: Cron Jobs (plus Pub-Sub pattern)

To most any backend logic nowadays, scheduled tasks can be seen as a crucial part. For example, an investment platform app usually has a task of sending periodically trading report for users who invested in their app.

These kinds of task are easily integrated to the server logic as a cron job, but will cause some troubles like, noises to logging, hard to trigger manually for debugging, etc.

That is why I decided to move cron job logic to AWS Lambda functions at my current team. And the result is really huge so far. We can monitor log of each cron job on CloudWatch easily or trigger a job manually without concerning our main application.

Originally, the implementation for the said above example is using the node-cron library for the main app written with Express.

import "source-map-support/register";import config from "./app/config";import app from "./app";import cron from "node-cron";import { sendMonthlyReport } from "./app/jobs";if (config.app.env === "production") {cron.schedule("0 10 28 * *", sendMonthlyReport); // run at 28th day every month}app.listen(config.app.port, () => console.log(`App listening on port ${config.app.port}`));

Now, with Lambda functions implemented with Serverless Framework, the logic is separated from the main application.

// handler.ts
import "source-map-support/register";
import { sendMonthlyReport } from "./functions"
export const send_monthly_report = sendMonthlyReport
// sendMonthlyReport.ts
export const sendMonthlyReports: APIGatewayProxyHandler = async (_, context) => {
try {
// query invested users
// loop through each user
// generate pdf report
// send email to each user with attached file
return { statusCode: 200, body: JSON.stringify({ message: "Success" })
};
...
// serverless.ymlFunctions:
send_monthly_reports:
handler: handler.send_monthly_reports
events:
- schedule:
name: ${self:custom.stage}-send-monthly-reports
rate: cron(0 3 1 * ? *) # vn time 10AM 1st month

So far so good, we can monitor and maintain the cron job logic separate from the main application. However, this leads to another problem: running time limit of Lambda function.

These kind of work flow can take long time to complete and hardly can be done in one trigger of such function, for example:

  • Query users who invested in the month & get trading data
  • Loop through each user: Generate pdf report with headless Chrome & Send email with Sendgrid

Therefore, we come up with the pub-sub pattern using AWS SNS service to solve this issue. Now, the workflow become like this:

  • Query users who invested in the month & get trading data
  • Loop through each user: send necessary data as a message to a SNS topic (i.e. 1 message for 1 user)
  • Another Lambda function will subscribe to that topic then consume the message to carry out task
// sendMonthlyReport.ts
export const sendMonthlyReports: APIGatewayProxyHandler = async (_, context) => {
try {
const userIds = await getInvestedUserIds();
for (const userId of userIds) {
await publishSNS({userId}, "SendUserReportTopic")
}
...// subscriberReport.ts
export const subscriberReport: SNSHandler = async (event, context) => {
let fileName = "monthlyWealthReport";try {logger(`[SNS] Message received: ${event.Records[0].Sns.MessageId}`);const message: { userId: string } = event.Records[0].Sns.Message && JSON.parse(event.Records[0].Sns.Message); // logic to generate report, upload to S3, send email, etc....// serverless.ymlFunctions:
send_monthly_reports:
handler: handler.send_monthly_reports
events:
- schedule:
name: ${self:custom.stage}-send-monthly-reports
rate: cron(0 3 1 * ? *) # vn time 10AM 1st month
sub_send_monthly_report_by_user_id:
handler: handler.sub_send_monthly_report_by_user_id # send wealth report for user - Subscriber
events:
- sns: ${self:custom.stage}-SendEmailWealthReport

Finally, with Lambda functions and SNS service, we can create a simple workflow with pub-sub pattern for async tasks like send mass emails to users properly with separated logs and easy to monitor usage. From this base, we can continue improve the workflow like implementing a queue system with AWS SQS for more tasks requiring order for execution.