Scheduler jobs in backend

Heroku Scheduler for Flask


The foundation of our service is scheduled jobs that we plan to use for regular Salesforce backup. There are multiple ways how to implement it like Celery, RQ, APScheduler, but all these methods demand additional background service, that can be too expensive solution for our prototype. I found maybe the best solution for Heroku based on free Heroku Scheduler described in this article - Create a Background Worker on a Flask App (on Heroku) without Redis

In this case the only think we need is to turn our app to script that can be run with app context. We did it already when added scrip for user creation in one of last posts (https://angular-python-salesforce.blogspot.com/2020/03/user-authentification-with-flask-login.html).

To store scheduler jobs configuration and results we need to create few models in DB. It will be Task and TaskRun models.

task.py
from models import db, OutputMixin
from flask_login import UserMixin
from sqlalchemy.dialects.postgresql import JSONB


class Task(UserMixin, OutputMixin, db.Model):

    __tablename__ = 'tasks'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(255), nullable=False)
    status = db.Column(db.String(255), nullable=False)  # stop, in progress, wait
    last_run_at = db.Column(db.DateTime, nullable=True)
    next_run_at = db.Column(db.DateTime, nullable=True)
    last_run_status = db.Column(db.String(255), nullable=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=False)

    config = db.Column(JSONB, nullable=False)
    # config example:
    # { "trigger_value": "...(str)", "trigger_type": "...(str)" }

task_run.py
from models import db, OutputMixin
from flask_login import UserMixin
from sqlalchemy.dialects.postgresql import JSONB


class TaskRun(UserMixin, OutputMixin, db.Model):

    __tablename__ = 'task_runs'

    id = db.Column(db.Integer, primary_key=True)
    task_id = db.Column(db.Integer, db.ForeignKey('tasks.id', ondelete='CASCADE'), nullable=False)
    status = db.Column(db.String(255), nullable=False)  # success, fail, in progress
    started_at = db.Column(db.DateTime, nullable=False)
    results = db.Column(JSONB, nullable=True)

Now create migration and apply it to create tables in DB

flask db migrate -m "Added Task and TaskRun model"
flask db upgrade

I prefer to store Task.config and TaskRun.results as schema-less JSONB structure as I'm not sure yet what exact data we will need in the future.

Now let's create script that will be run from Heroku Scheruler and should process all tasks. For beta release all tasks will be processed in the same scheduler context, but next we can update it to start separate processes for each tasks.

As we dive more and more to Flask Command Line Interface let's create separate folder for all our scripts and update old create-user script to match new structure.

in ./backend/scripts create two new files:

auth.py
import click
from models import User, db
from flask import Blueprint
from werkzeug.security import generate_password_hash

auth_scripts = Blueprint('auth_scripts', __name__, cli_group='auth')


@auth_scripts.cli.command("create_user")
@click.argument("username")
@click.argument("email")
@click.argument("password")
def create_user_cli(username, email, password):

    # EXAMPLE: flask auth create_user dmn1 dmn1@dmn1.com 1111

    new_user = User(username=username, email=email, password=generate_password_hash(password, method='sha256'))
    db.session.add(new_user)
    db.session.commit()
    print(new_user)

task.py
from flask import Blueprint
from models import Task

task_scripts = Blueprint('task_scripts', __name__, cli_group='task')


@task_scripts.cli.command("run_scheduler")
def run_scheduler():

    # EXAMPLE: flask task run_scheduler

    print('SCHEDULER STARTED')

    tasks = Task.query.all()
    print('==== Tasks:')
    print(tasks)

    print('SCHEDULER FINISHED')

And few updates in ./backend/app.py
...

# Scripts
from scripts.auth import auth_scripts  # NOQA
app.register_blueprint(auth_scripts)

from scripts.task import task_scripts  # NOQA
app.register_blueprint(task_scripts)

...

Test start for Scheduler
flask task run_scheduler

(project-RO4ptYog) bash-3.2$ flask task run_scheduler
SCHEDULER STARTED
2020-04-19 15:59:15,710 INFO sqlalchemy.engine.base.Engine select version()
2020-04-19 15:59:15,710 INFO sqlalchemy.engine.base.Engine {}
2020-04-19 15:59:15,713 INFO sqlalchemy.engine.base.Engine select current_schema()
2020-04-19 15:59:15,713 INFO sqlalchemy.engine.base.Engine {}
2020-04-19 15:59:15,716 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2020-04-19 15:59:15,716 INFO sqlalchemy.engine.base.Engine {}
2020-04-19 15:59:15,718 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2020-04-19 15:59:15,718 INFO sqlalchemy.engine.base.Engine {}
2020-04-19 15:59:15,719 INFO sqlalchemy.engine.base.Engine show standard_conforming_strings
2020-04-19 15:59:15,719 INFO sqlalchemy.engine.base.Engine {}
2020-04-19 15:59:15,721 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2020-04-19 15:59:15,722 INFO sqlalchemy.engine.base.Engine SELECT tasks.id AS tasks_id, tasks.name AS tasks_name, tasks.status AS tasks_status, tasks.last_run_at AS tasks_last_run_at, tasks.next_run_at AS tasks_next_run_at, tasks.last_run_status AS tasks_last_run_status, tasks.user_id AS tasks_user_id, tasks.config AS tasks_config 
FROM tasks
2020-04-19 15:59:15,722 INFO sqlalchemy.engine.base.Engine {}
==== Tasks:
[<Task 7>]
SCHEDULER FINISHED
2020-04-19 15:59:15,729 INFO sqlalchemy.engine.base.Engine ROLLBACK

Next time we will create UI to show the list of Tasks and create/edit Task.


Comments

Popular posts from this blog

HTTPS in local environment for Angular + Flask project.

Salesforce Authentication 2 (Token Validation and Refresh)

User authentification with Flask-Login