Salesforce Authentication 2 (Token Validation and Refresh)



In last article we added Salesforce connection setup logic to our application. Now we need to implement connection test logic and token refresh flow. 

Connection test flow is a simple request to Salesforce API with access_token. If we receive valid response it means all works fine. In other case we should notify user about problems with Salesforce integration. I mentioned last time when we setup connection and receive response I save it to DB as is as it contains many useful information. And now we can use part of this response.

Here is example of OAuth handshake response:

{
'access_token': '00Dxxxxxxxxxx',
'refresh_token': '5Aezzzzzzzzzz',
'signature': '8KFGb34p3vMLhSRHHmJZN3Bux9tlcA1+cvgIACw2SG4=',
'scope': 'refresh_token api',
'instance_url': 'https://ap17.salesforce.com',
'id': 'https://login.salesforce.com/id/00D2x000003v71DEAQ/0052x000001S9ojAAC',
'token_type': 'Bearer',
'issued_at': '1589799677876'
}

Here id is what we a looking for. It is the link to endpoint with connection identity information. This information we also will store in DB alongside with connection details (latter in this article).

@sfdc_bp.route('/test', methods=['POST'])
@login_required
def test_connection():

    task_payload = request.get_json()
    task = Task.query.filter_by(id=task_payload['id']).first_or_404()

    if not task.sfdc:
        results = {'status': False, 'message': 'SFDC config not found'}
    else:
        identity_url = task.sfdc['id']
        headers = {
            'Authorization': 'Bearer ' + task.sfdc['access_token'],
            'Content-Type': 'application/json'
        }
        resp = requests.get(identity_url,
                            headers=headers)

        try:
            # Only check for Response Status Code and response is valid json
            if resp.status_code == 200 and resp.json():
                results = {'status': True, 'message': ''}
            else:
                raise ValueError()
        except (json.decoder.JSONDecodeError, ValueError):
            print(resp.text)
            results = {'status': False, 'message': resp.text}

    return jsonify(results)
Here is example for invalid access_token:

403

Bad_OAuth_Token


But invalid access token doesn't mean connection is bad. Access Token can be expired at this moment and we need to refresh it. Now the time to create magic proxy service to process all requests to Salesforce. It will be responsible for access_token refresh if need it so we forget about this problem.


I've refactored previous code for Salesforce integration and created new backend service class to manage all Salesforce related tasks.

./backend/services/sfdc_service.py
import json
import requests
from flask import current_app
from urllib.parse import urlencode, quote_plus
from sqlalchemy.orm.attributes import flag_modified
from models import db, Task


class SfdcService:

    task: Task = None

    def __init__(self, task: Task):
        self.task = task

    def get(self, url, expected_status_code=200):

        resp = requests.get(url, headers=self.get_headers())

        if resp.status_code == 403:
            self.refresh_access_token()
            resp = requests.get(url, headers=self.get_headers())

        if resp.status_code != expected_status_code:
            print('=== RESPONSE ERROR:')
            print(resp.status_code, resp.text)
            raise ValueError('Unexpected status code: ' + str(resp.status_code) + ' ' + str(resp.text))

        return resp.text

    def post(self, url, payload, expected_status_code=200):

        if not isinstance(payload, str):
            try:
                payload = json.dumps(payload)
            except:  # NOQA
                raise ValueError('Invalid payload format')

        resp = requests.post(url, data=payload, headers=self.get_headers())

        if resp.status_code == 403:
            self.refresh_access_token()
            resp = requests.get(url, headers=self.get_headers())

        if resp.status_code != expected_status_code:
            print('=== RESPONSE ERROR:')
            print(resp.status_code, resp.text)
            raise ValueError('Unexpected status code: ' + str(resp.status_code) + ' ' + str(resp.text))

        return resp.text

    def refresh_access_token(self):

        print('=== REFRESH ACCESS TOKEN ===', self.task)

        headers = {
            'Content-Type': 'application/x-www-form-urlencoded',
            'Accept': 'application/json',
        }
        payload = {
            'grant_type': 'refresh_token',
            'refresh_token': self.task.sfdc['refresh_token'],
            'client_id': current_app.config['SFDC_CONNECTED_APP_KEY'],
            'client_secret': current_app.config['SFDC_CONNECTED_APP_SECRET']
        }
        req_body = urlencode(payload, quote_via=quote_plus)

        resp = requests.post(self.task.sfdc['instance_url'] + '/services/oauth2/token',
                             headers=headers,
                             data=req_body)

        try:
            token_results = resp.json()
        except:  # NOQA
            raise ValueError(resp.text())

        if 'access_token' not in token_results:
            raise ValueError(resp.text())

        '''
        token_results Example:
        {
            "access_token": "00D2x000003v71D!AQwAQO8s_Vk9klLTobBMmskWjp_GOWFwwwVV3J20ZUINaeBmA_SBuXwEUxduxrk0juH8bbdMnLm1Pg8IEIbjv4ft_8VymBfn",
            "signature": "s5Qs/Ktl2plae7oKCKo0Cyn6QGJ62wNMsHCeSQuWSsA=",
            "scope": "refresh_token api",
            "instance_url": "https://ap17.salesforce.com",
            "id": "https://login.salesforce.com/id/00D2x000003v71DEAQ/0052x000001S9ojAAC",
            "token_type": "Bearer",
            "issued_at": "1589979038599"
        }
        '''  # NOQA

        self.task.sfdc['access_token'] = token_results['access_token']
        self.task.sfdc['instance_url'] = token_results['access_token']
        self.task.sfdc['issued_at'] = token_results['instance_url']

        self.task.sfdc['identity'] = self.get_identity()

        flag_modified(self.task, 'sfdc')

        db.session.commit()

    def get_headers(self):

        if not self.task.sfdc:
            raise ValueError('Salesforce connection information not found')

        return {
            'Authorization': 'Bearer ' + self.task.sfdc['access_token'],
            'Content-Type': 'application/json'
        }

    def generate_access_token_from_code(self, code, state):

        headers = {
            'Content-Type': 'application/x-www-form-urlencoded',
            'Accept': 'application/json',
        }
        payload = {
            'grant_type': 'authorization_code',
            'code': code,
            'client_id': current_app.config['SFDC_CONNECTED_APP_KEY'],
            'client_secret': current_app.config['SFDC_CONNECTED_APP_SECRET'],
            'redirect_uri': state['redirectUrl']
        }
        req_body = urlencode(payload, quote_via=quote_plus)

        resp = requests.post(state['loginBaseUrl'] + '/services/oauth2/token',
                             headers=headers,
                             data=req_body)

        token_results = resp.json()
        '''
        token_results Example:
        {
            'access_token': '00Dxxxxxxxxxx',
            'refresh_token': '5Aezzzzzzzzzz',
            'signature': '8KFGb34p3vMLhSRHHmJZN3Bux9tlcA1+cvgIACw2SG4=',
            'scope': 'refresh_token api',
            'instance_url': 'https://ap17.salesforce.com',
            'id': 'https://login.salesforce.com/id/00D2x000003v71DEAQ/0052x000001S9ojAAC',
            'token_type': 'Bearer',
            'issued_at': '1589799677876'
        },
        {
            'error': 'invalid_grant',
            'error_description': 'expired authorization code'
        }
        '''

        try:
            token_results = resp.json()
        except:  # NOQA
            raise ValueError(resp.text())

        if 'access_token' not in token_results:
            raise ValueError(resp.text())

        self.task.sfdc = token_results
        token_results['identity'] = self.get_identity()

        db.session.commit()

    def get_identity(self):
        identity_url = self.task.sfdc['id']
        resp = self.get(identity_url, 200)
        try:
            identity = json.loads(resp)
            return identity
        except:  # NOQA
            raise ValueError(resp)
And usage

./backend/controllers/sfdc.py
import base64
import json
from flask import Blueprint, request, redirect, abort, jsonify
from flask_login import login_required, current_user
from models import Task
from services.sfdc_service import SfdcService

sfdc_bp = Blueprint('sfdc', __name__, url_prefix='/sfdc')


@sfdc_bp.route('/oauth/callback', methods=['GET'])
@login_required
def oauth_handshake():

    # /sfdc/oauth/callback?code=aPrx9pB8PA1X2QOXBj2XFIklPku.mPE7TzVdMCdcxfaw6K0Wnd5K3iiUoG_vGiqp82EBNFpXxQ%3D%3D&state=eyJ0YXNrSWQiOjIxLCJsb2dpbkJhc2VVcmwiOiJodHRwczovL2xvZ2luLnNhbGVzZm9yY2UuY29tIn0%3D

    code = request.args.get('code')
    state = json.loads(base64.b64decode(request.args.get('state')).decode('utf-8'))

    task: Task = (Task.query.filter(Task.id == state['taskId'],
                                    Task.user_id == current_user.id)
                            .first_or_404())

    try:
        SfdcService(task).generate_access_token_from_code(code, state)
    except ValueError as e:
        abort(500, e)

    return redirect('/#/tasks/' + str(task.id))


@sfdc_bp.route('/test', methods=['POST'])
@login_required
def test_connection():

    task_payload = request.get_json()
    task = Task.query.filter_by(id=task_payload['id']).first_or_404()

    if not task.sfdc:
        results = {'status': False, 'message': 'SFDC config not found'}
    else:

        try:
            SfdcService(task).get_identity()
            results = {'status': True, 'message': ''}
        except ValueError as e:
            results = {'status': False, 'message': str(e)}

    return jsonify(results)


Comments

Popular posts from this blog

HTTPS in local environment for Angular + Flask project.

Task schedule configuration (Cron-like)

Salesforce Lightning Design System (SLDS)