Salesforce Authentication 2 (Token Validation and Refresh)
Connection test flow is a simple request to Salesforce API with access_token. If we receive valid response it means all works fine. In other case we should notify user about problems with Salesforce integration. I mentioned last time when we setup connection and receive response I save it to DB as is as it contains many useful information. And now we can use part of this response.
Here is example of OAuth handshake response:
{
'access_token': '00Dxxxxxxxxxx',
'refresh_token': '5Aezzzzzzzzzz',
'signature': '8KFGb34p3vMLhSRHHmJZN3Bux9tlcA1+cvgIACw2SG4=',
'scope': 'refresh_token api',
'instance_url': 'https://ap17.salesforce.com',
'id': 'https://login.salesforce.com/id/00D2x000003v71DEAQ/0052x000001S9ojAAC',
'token_type': 'Bearer',
'issued_at': '1589799677876'
}
Here id is what we a looking for. It is the link to endpoint with connection identity information. This information we also will store in DB alongside with connection details (latter in this article).
@sfdc_bp.route('/test', methods=['POST'])
@login_required
def test_connection():
task_payload = request.get_json()
task = Task.query.filter_by(id=task_payload['id']).first_or_404()
if not task.sfdc:
results = {'status': False, 'message': 'SFDC config not found'}
else:
identity_url = task.sfdc['id']
headers = {
'Authorization': 'Bearer ' + task.sfdc['access_token'],
'Content-Type': 'application/json'
}
resp = requests.get(identity_url,
headers=headers)
try:
# Only check for Response Status Code and response is valid json
if resp.status_code == 200 and resp.json():
results = {'status': True, 'message': ''}
else:
raise ValueError()
except (json.decoder.JSONDecodeError, ValueError):
print(resp.text)
results = {'status': False, 'message': resp.text}
return jsonify(results)
Here is example for invalid access_token:
403
Bad_OAuth_Token
But invalid access token doesn't mean connection is bad. Access Token can be expired at this moment and we need to refresh it. Now the time to create magic proxy service to process all requests to Salesforce. It will be responsible for access_token refresh if need it so we forget about this problem.
I've refactored previous code for Salesforce integration and created new backend service class to manage all Salesforce related tasks.
./backend/services/sfdc_service.py
import json
import requests
from flask import current_app
from urllib.parse import urlencode, quote_plus
from sqlalchemy.orm.attributes import flag_modified
from models import db, Task
class SfdcService:
task: Task = None
def __init__(self, task: Task):
self.task = task
def get(self, url, expected_status_code=200):
resp = requests.get(url, headers=self.get_headers())
if resp.status_code == 403:
self.refresh_access_token()
resp = requests.get(url, headers=self.get_headers())
if resp.status_code != expected_status_code:
print('=== RESPONSE ERROR:')
print(resp.status_code, resp.text)
raise ValueError('Unexpected status code: ' + str(resp.status_code) + ' ' + str(resp.text))
return resp.text
def post(self, url, payload, expected_status_code=200):
if not isinstance(payload, str):
try:
payload = json.dumps(payload)
except: # NOQA
raise ValueError('Invalid payload format')
resp = requests.post(url, data=payload, headers=self.get_headers())
if resp.status_code == 403:
self.refresh_access_token()
resp = requests.get(url, headers=self.get_headers())
if resp.status_code != expected_status_code:
print('=== RESPONSE ERROR:')
print(resp.status_code, resp.text)
raise ValueError('Unexpected status code: ' + str(resp.status_code) + ' ' + str(resp.text))
return resp.text
def refresh_access_token(self):
print('=== REFRESH ACCESS TOKEN ===', self.task)
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
}
payload = {
'grant_type': 'refresh_token',
'refresh_token': self.task.sfdc['refresh_token'],
'client_id': current_app.config['SFDC_CONNECTED_APP_KEY'],
'client_secret': current_app.config['SFDC_CONNECTED_APP_SECRET']
}
req_body = urlencode(payload, quote_via=quote_plus)
resp = requests.post(self.task.sfdc['instance_url'] + '/services/oauth2/token',
headers=headers,
data=req_body)
try:
token_results = resp.json()
except: # NOQA
raise ValueError(resp.text())
if 'access_token' not in token_results:
raise ValueError(resp.text())
'''
token_results Example:
{
"access_token": "00D2x000003v71D!AQwAQO8s_Vk9klLTobBMmskWjp_GOWFwwwVV3J20ZUINaeBmA_SBuXwEUxduxrk0juH8bbdMnLm1Pg8IEIbjv4ft_8VymBfn",
"signature": "s5Qs/Ktl2plae7oKCKo0Cyn6QGJ62wNMsHCeSQuWSsA=",
"scope": "refresh_token api",
"instance_url": "https://ap17.salesforce.com",
"id": "https://login.salesforce.com/id/00D2x000003v71DEAQ/0052x000001S9ojAAC",
"token_type": "Bearer",
"issued_at": "1589979038599"
}
''' # NOQA
self.task.sfdc['access_token'] = token_results['access_token']
self.task.sfdc['instance_url'] = token_results['access_token']
self.task.sfdc['issued_at'] = token_results['instance_url']
self.task.sfdc['identity'] = self.get_identity()
flag_modified(self.task, 'sfdc')
db.session.commit()
def get_headers(self):
if not self.task.sfdc:
raise ValueError('Salesforce connection information not found')
return {
'Authorization': 'Bearer ' + self.task.sfdc['access_token'],
'Content-Type': 'application/json'
}
def generate_access_token_from_code(self, code, state):
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
}
payload = {
'grant_type': 'authorization_code',
'code': code,
'client_id': current_app.config['SFDC_CONNECTED_APP_KEY'],
'client_secret': current_app.config['SFDC_CONNECTED_APP_SECRET'],
'redirect_uri': state['redirectUrl']
}
req_body = urlencode(payload, quote_via=quote_plus)
resp = requests.post(state['loginBaseUrl'] + '/services/oauth2/token',
headers=headers,
data=req_body)
token_results = resp.json()
'''
token_results Example:
{
'access_token': '00Dxxxxxxxxxx',
'refresh_token': '5Aezzzzzzzzzz',
'signature': '8KFGb34p3vMLhSRHHmJZN3Bux9tlcA1+cvgIACw2SG4=',
'scope': 'refresh_token api',
'instance_url': 'https://ap17.salesforce.com',
'id': 'https://login.salesforce.com/id/00D2x000003v71DEAQ/0052x000001S9ojAAC',
'token_type': 'Bearer',
'issued_at': '1589799677876'
},
{
'error': 'invalid_grant',
'error_description': 'expired authorization code'
}
'''
try:
token_results = resp.json()
except: # NOQA
raise ValueError(resp.text())
if 'access_token' not in token_results:
raise ValueError(resp.text())
self.task.sfdc = token_results
token_results['identity'] = self.get_identity()
db.session.commit()
def get_identity(self):
identity_url = self.task.sfdc['id']
resp = self.get(identity_url, 200)
try:
identity = json.loads(resp)
return identity
except: # NOQA
raise ValueError(resp)
And usage
./backend/controllers/sfdc.py
import base64
import json
from flask import Blueprint, request, redirect, abort, jsonify
from flask_login import login_required, current_user
from models import Task
from services.sfdc_service import SfdcService
sfdc_bp = Blueprint('sfdc', __name__, url_prefix='/sfdc')
@sfdc_bp.route('/oauth/callback', methods=['GET'])
@login_required
def oauth_handshake():
# /sfdc/oauth/callback?code=aPrx9pB8PA1X2QOXBj2XFIklPku.mPE7TzVdMCdcxfaw6K0Wnd5K3iiUoG_vGiqp82EBNFpXxQ%3D%3D&state=eyJ0YXNrSWQiOjIxLCJsb2dpbkJhc2VVcmwiOiJodHRwczovL2xvZ2luLnNhbGVzZm9yY2UuY29tIn0%3D
code = request.args.get('code')
state = json.loads(base64.b64decode(request.args.get('state')).decode('utf-8'))
task: Task = (Task.query.filter(Task.id == state['taskId'],
Task.user_id == current_user.id)
.first_or_404())
try:
SfdcService(task).generate_access_token_from_code(code, state)
except ValueError as e:
abort(500, e)
return redirect('/#/tasks/' + str(task.id))
@sfdc_bp.route('/test', methods=['POST'])
@login_required
def test_connection():
task_payload = request.get_json()
task = Task.query.filter_by(id=task_payload['id']).first_or_404()
if not task.sfdc:
results = {'status': False, 'message': 'SFDC config not found'}
else:
try:
SfdcService(task).get_identity()
results = {'status': True, 'message': ''}
except ValueError as e:
results = {'status': False, 'message': str(e)}
return jsonify(results)
Comments
Post a Comment