Task schedule configuration (Cron-like)
./backend/controllers/task.py
...
@task_bp.route('/start', methods=['POST'])
@login_required
def start_tasks():
task_payload = request.get_json()
task = Task.query.filter(Task.id == task_payload['id'],
Task.user_id == current_user.id).first_or_404()
if not task.config or not croniter.is_valid(task.config['trigger_value']):
abort(400, 'Invalid Trigger Setup')
task.status = 'waiting'
base = datetime.now().replace(microsecond=0)
croniter_iter = croniter(task.config['trigger_value'], base)
next_run_at = croniter_iter.get_next(datetime)
task.next_run_at = next_run_at
db.session.commit()
return jsonify({'status': 'ok'})
@task_bp.route('/stop', methods=['POST'])
@login_required
def stop_tasks():
task_payload = request.get_json()
task = Task.query.filter(Task.id == task_payload['id'],
Task.user_id == current_user.id).first_or_404()
task.status = 'stop'
task.next_run_at = None
db.session.commit()
return jsonify({'status': 'ok'})
...
Comments
Post a Comment