Learning should always be fun

Agenda and Agendash for Scheduling in Node.js with MongoDB

Agenda and Agendash for Scheduling in Node.js with MongoDB

Hello friends, How are you? Here we begin the adventurous journey with Agenda for the scheduling in Node.js.

There are various library out in the land of npm for  scheduling  the task in node such as corn, plain setTimeout() and setInterval() func  which will solve most of the use case to schedule a task. The major problem that may encounter are :

  • What if the server restarts?
  • What if the user want a notification before a schedule process?

One of the solution is to use a database for the scheduling task. Agenda is comparatively easy as well as powerful if your database is MongoDB. You can compare some job queue solutions from the table below

 

Feature Bull Kue Bee Agenda
Backend redis redis redis mongo
Priorities
Concurrency
Delayed jobs
Global events
Rate Limiter
Pause/Resume
Sandboxed worker
Repeatable jobs
Atomic ops
Persistence
UI
Optimized for Jobs / Messages Jobs Messages Jobs

 

source: https://www.npmjs.com/package/agenda

 

Let get started …

 

The agenda can be used as a single process for both the defining and running job. But the agenda instance of a same collection can be used into the multiple file as a consumer and a producer for defining and running the jobs respectively.

Let us install the agenda and agendash. Agendash is a useful plugin of agenda to see the UI in a browser.

npm install agenda agendash --save

We are using express to create a server. Here we go with the code in a single file.

const app = require('express')();
const Agenda = require('agenda');
const Agendash = require('agendash');

(async function run() {
    let configureMongoDBObj = {
        db: {
            address: 'mongodb://127.0.0.1/agenda', 
            collection: 'jobs', 
            options:{
                useNewUrlParser: true 
            }
        }
    };
    
    let agenda = new Agenda(configureMongoDBObj);
    agenda.define('JOB_ONE', (job, done) => {
        console.log('job one fired here');
        done();
    });

    agenda.define('JOB_TWO', (job, done) => {
        console.log('JOB TWO fired here')
        done();
    });

    agenda.define('JOB_THREE', (job, done) => {
        console.log('JOB three in every minute here')
        done();
    });
    //agendash UI config
    app.use('/dash', Agendash(agenda));
    await agenda.start();
    //schedule once
    await agenda.schedule('1 minute', 'JOB_ONE');
    await agenda.schedule('2 minute', 'JOB_ONE');
    await agenda.schedule('3 minute', 'JOB_TWO');
    await agenda.schedule('4 minute', 'JOB_THREE');
    //repeat
    await agenda.every('1 minute', 'JOB_3');
    
})();

app.listen(2810, ()=> console.log('listening on the port 2810'));

 

Lets run the url localhost:2810/dash in the browser for the beautiful dashboard. Here we can see the the work schedule, time and other important notes.

 

 

 

Consumer and Producer

 

Let us understand the consumer and producer concept in agenda. It will help us to define a job in a single file and schedule a job from else where in the project. Here I have used OOP concept for the consumer and producer. Let’s create a folder name configuration and index file to configure agenda.

configuration/index.js

const Agenda = require('agenda');
let configureMongoDBObj = {
    db: {
        address: 'mongodb://127.0.0.1/agendaDemoFor', 
        collection: 'jobs', 
        options:{
            useNewUrlParser: true 
        }
    }
};
let agenda = new Agenda(configureMongoDBObj);
module.exports = agenda;

We create consumer directory and index file to define a job.

consumer/index.js

let agenda = require('../configuration/index');
let SequenceMessageWorkFlow = require('../../businessWorkflow/sequenceMessage');
let FetchAutoPostWorkFlow = require('../../businessWorkflow/fetchAutoPosts');

class Consumer {
    constructor() {
        this.run()
            .catch(error => {
                console.error(error)
            });
    }

    async run() {
        try {
            agenda.define('JOB_PROCESS_SEQUENCE_MESSAGES', (job, done) => {
                new SequenceMessageWorkFlow(job.attrs.data);
                done();
            });

            
            agenda.define('JOB_PROCESS_FETCH_RSS_AUTOPOSTS', (job, done) => {
                new FetchAutoPostWorkFlow(job.attrs);
                done();
            });

            await new Promise(resolve => agenda.once('ready', resolve));
            agenda.start();

        } catch (error) {
            console.log(error)
        }
    }
}
module.exports = Consumer;

 

Likewise, we create a producer directory and index.js file exportable function which schedules a job.

let agenda = require('../configuration/index');

class Producer {
    //job once func
    runOnceOnSchedule(scheduleTime, jobName, jobParams){
        agenda.schedule(scheduleTime, jobName, jobParams)
        .then(()=>console.log('The messages is run once %s %s - %s', scheduleTime, jobName, jobParams))
        .catch((err)=>console.log(err));  
    }

    //repeat job func
    runEverySchedule(scheduleTime, jobName, jobParams) { 
        agenda.every(scheduleTime, jobName, jobParams)
        .then(()=>console.log('Every schedule'))
        .catch((err)=>console.log(err));
    }

   //cancel and readd 
    reAddSchedule(scheduleTime , jobName) {
        agenda.cancel({name: jobName}).then(()=> {
            console.log('Delete schedule');
            this.runEverySchedule(scheduleTime, jobName );
        }).catch((err)=>console.log(err));
    }

}
module.exports = Producer;

 

This producer can be called anywhere from the code. lets us look an example how it can be called from a controller.

let Producer = require('../CNQ/producer');
let P = new Producer();
let dataParams = {sequenceId: seq._id, sequenceMessageId: SequenceMessage._id,  flowns: SequenceMessage.namespace}; 
P.runOnceOnSchedule("in 20 minutes",'JOB_PROCESS_SEQUENCE_MESSAGES',dataPrams);

This code will schedule the job called ‘JOB_PROCESS_SEQUENCE_MESSAGES’ for 20 minutes later and will run after  job when the time comes. In addition to this, the important aspect of agenda is to modify the selected job. So to that we can use the mongoose ORM to target the job collection. The blend of the mongoose ORM with agenda make the schedule more powerful, and easy to modify , delete and cancel the tasks. Let us look the mongoose concept to the agenda.

First of all let us install mongoose

npm install mongoose --save

Now , Configure the mongoose with the agenda job collection.

let mongoose = require('mongoose');
const Schema =mongoose.Schema;

var Job = mongoose.model("Job", new Schema({
    _pageId : {type:String}
}), "jobs");
module.exports = Job;

We can see various attributes of the database which comes on  agenda by default. So we can do a custom modification and target each of the attributes with mongoose ORM that reflect it at a same time.

 

 

Conclusion

 

The agenda makes the node schedule more powerful and easy. We don’t need to take care on the custom function and OS system internals to process the task. It can be used to create, modify and delete the schedule and the limit is more beyond. For more details see to the offical github page of agenda. If you have any query please let me know in the comment section. Thank you!!

 

 

Feature Image Credit:

Photo by Estée Janssens on Unsplash