import urllib.parse import jwt import requests from authlib.integrations.base_client import OAuthError from authlib.integrations.flask_client import OAuth from flask import Flask, request, redirect, session, render_template, url_for, make_response # TODO add client export json in project & git app = Flask(__name__) # key used to encore session cookie app.config['SECRET_KEY'] = 'client-oidc' idp_url = "https://id.vilanet.fr/realms/vilanet" client_url = "http://localhost:5001" client_id = "client-oidc" client_secret = "BqWWnuj5JkgZZWEaXuR8bprEx53lqGxC" resource_server_url = 'http://localhost:5002' # To manage OIDC flow for UI client oauth = OAuth(app=app) oauth.register( name="keycloak", client_id=client_id, client_secret=client_secret, server_metadata_url=f'{idp_url}/.well-known/openid-configuration', client_kwargs={'scope': 'openid', 'code_challenge_method': 'S256'} ) @app.route("/service1") def service1(): return make_response(redirect(resource_server_url + "/api/v1/service1?" + urllib.parse.urlencode({'callbackUrl': client_url}))) @app.route("/service2") def service2(): return make_response(redirect(resource_server_url + "/api/v1/service2?" + urllib.parse.urlencode({'callbackUrl': client_url}))) @app.route("/auth") def auth(): try: token_response = oauth.keycloak.authorize_access_token() except OAuthError as e: return redirect('/?' + urllib.parse.urlencode({'error': e})) response = make_response(redirect('/')) access_token = token_response['access_token'] if access_token: response.set_cookie('access_token', access_token, httponly=True, domain='localhost') refresh_token = token_response['refresh_token'] if refresh_token: response.set_cookie('refresh_token', refresh_token, httponly=True, domain='localhost') id_token = token_response['id_token'] if id_token: response.set_cookie('id_token', id_token, httponly=True, domain='localhost') if token_response['userinfo']: if 'name' in token_response['userinfo']: session['userinfo_name'] = token_response['userinfo']['name'] if 'email' in token_response['userinfo']: session['userinfo_email'] = token_response['userinfo']['email'] return response @app.route("/", methods=['GET', 'POST']) def index(): auth_error = False userinfo_name = False userinfo_email = False token_data = False if 'sso' in request.args: redirect_uri = url_for('auth', _external=True) return oauth.keycloak.authorize_redirect(redirect_uri) elif 'slo' in request.args: if 'refresh_token' in request.cookies: # propagate logout to Keycloak refresh_token = request.cookies['refresh_token'] requests.post(f'{idp_url}/protocol/openid-connect/logout', data={ "client_id": client_id, "client_secret": client_secret, "refresh_token": refresh_token, }) response = make_response(redirect('/')) response.set_cookie('access_token', '', expires=0) response.set_cookie('refresh_token', '', expires=0) response.set_cookie('id_token', '', expires=0) if 'userinfo_name' in session: del session['userinfo_name'] if 'userinfo_email' in session: del session['userinfo_email'] return response elif 'error' in request.args: auth_error = request.args['error'] if 'userinfo_name' in session: userinfo_name = session['userinfo_name'] if 'userinfo_email' in session: userinfo_email = session['userinfo_email'] # it is OK to use ID token to display user info on client side # is it not OK to use access token on client side if 'id_token' in request.cookies: id_token = request.cookies['id_token'] jwks_uri = f"{idp_url}/protocol/openid-connect/certs" jwks_client = jwt.PyJWKClient(jwks_uri) key = jwks_client.get_signing_key_from_jwt(id_token) try: token_data = jwt.decode( id_token, key.key, algorithms=["RS256"], audience=client_id, options={'verify_signature': False, 'verify_aud': True} ) auth_error = False except Exception as e: auth_error = e return render_template( 'index.html', auth_error=auth_error, userinfo_name=userinfo_name, userinfo_email=userinfo_email, token_data=token_data, ) if __name__ == "__main__": app.run(host='127.0.0.1', port=5001, debug=True)