Scheduler jobs in backend (Local Development)

Local Development Scheduler for Flask



In first post (https://angular-python-salesforce.blogspot.com/2020/04/scheduler-jobs-in-backend.html) I've described common approach to implement Scheduler Job. Now I'm going to make it works in local environment for development purposes.

I've updated cli commands structure and now task group contains "scheduler" and "run" scripts:
- scheduler is designed to call from cron and run tasks processing.
- run script is designed to process one particular task

./backend/scripts/task.py
import os
import sys
import click
from flask import Blueprint
from models import db, Task

task_scripts = Blueprint('task_scripts', __name__, cli_group='task')


def run_task_helper(task):
   if current_app.config['is_development_mode']::
        print('RUN TASK LOCALY')
        import subprocess
        subprocess.Popen(['flask', 'task', 'run', str(task.id)])
    else:
        print('RUN TASK IN PRODUCTION ...')


@task_scripts.cli.command("scheduler")
def start_scheduler():

    # EXAMPLE: flask task scheduler

    print('SCHEDULER STARTED')

    tasks = Task.query.all()
    print('==== Tasks:')
    print(tasks)

    for task in tasks:
        run_task_helper(task)

    print('SCHEDULER FINISHED')


@task_scripts.cli.command("run")
@click.argument("task_id")
def run_task(task_id):

    # EXAMPLE: flask task run <TASK_ID>

    print('TASK PROCESSING STARTED:', task_id)

    task = Task.query.filter_by(id=task_id).first()
    print('==== Task:')
    print(task)

    if task is None:
        sys.exit('Task not found')

    task.status = 'Finished'

    db.session.commit()

    print('TASK PROCESSING FINISHED')
The main idea here is usage of subprocess.Popen. It allows to run separate process in background (it doesn't block the main thread) that looks close to what we want to see in production.

And now if we run flask task scheduler it will run N task processes (for all Tasks, but next I will add more complex logic). This command we will use in Heroku Scheduler. 

Also flask task run <task_id> will be used directly to run task processing immediately (with button from UI). For this purpose I've added new route handler

./backend/controllers/task.py
...
@task_bp.route('/run', methods=['POST'])
@login_required
def run_tasks():

    task_payload = request.get_json()
    task = Task.query.filter_by(id=task_payload['id']).first_or_404()

    run_task_helper(task)

    return jsonify({'status': 'ok'})

One more interesting step to emulate real scheduler in local environment is this bash script - it will fire our tasks processing every 10 seconds.

./run_scheruler.sh
#!/bin/bash

echo "Starting Scheruler"

while true; do
    echo "=== Start processing ==="
    pipenv run flask task scheduler
    echo "=== Finish processing ==="
    sleep 10
done

Comments

Popular posts from this blog

Salesforce Authentication 2 (Token Validation and Refresh)

HTTPS in local environment for Angular + Flask project.

Salesforce Lightning Design System (SLDS)