import os from flask import Flask, request, redirect, session, make_response, render_template from onelogin.saml2.auth import OneLogin_Saml2_Auth from onelogin.saml2.utils import OneLogin_Saml2_Utils app = Flask(__name__) app.config['SAML_PATH'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config') app.config['SECRET_KEY'] = 'client-saml' # TODO call dummy-server # TODO reuse token_data display from client-oidc def init_saml_auth(req): auth = OneLogin_Saml2_Auth(req, custom_base_path=app.config['SAML_PATH']) return auth def prepare_flask_request(request): url_data = request.args.copy() url_data.update(request.form) return { 'https': 'on' if request.scheme == 'https' else 'off', 'http_host': request.host, 'server_port': request.host.split(':')[1] if ':' in request.host else '443' if request.scheme == 'https' else '80', 'script_name': request.path, 'get_data': url_data, 'post_data': request.form.copy() } @app.route("/", methods=['GET', 'POST']) def index(): req = prepare_flask_request(request) auth = init_saml_auth(req) errors = [] error_reason = None not_auth_warn = False success_slo = False attributes = False paint_logout = False if 'sso' in request.args: return redirect(auth.login()) # If AuthNRequest ID need to be stored in order to later validate it, do instead # sso_built_url = auth.login() # request.session['AuthNRequestID'] = auth.get_last_request_id() # return redirect(sso_built_url) elif 'slo' in request.args: name_id = session_index = name_id_format = name_id_nq = name_id_spnq = None if 'samlNameId' in session: name_id = session['samlNameId'] if 'samlSessionIndex' in session: session_index = session['samlSessionIndex'] if 'samlNameIdFormat' in session: name_id_format = session['samlNameIdFormat'] if 'samlNameIdNameQualifier' in session: name_id_nq = session['samlNameIdNameQualifier'] if 'samlNameIdSPNameQualifier' in session: name_id_spnq = session['samlNameIdSPNameQualifier'] return redirect(auth.logout(name_id=name_id, session_index=session_index, nq=name_id_nq, name_id_format=name_id_format, spnq=name_id_spnq)) elif 'acs' in request.args: request_id = None if 'AuthNRequestID' in session: request_id = session['AuthNRequestID'] auth.process_response(request_id=request_id) errors = auth.get_errors() not_auth_warn = not auth.is_authenticated() if len(errors) == 0: if 'AuthNRequestID' in session: del session['AuthNRequestID'] session['samlUserdata'] = auth.get_attributes() session['samlNameId'] = auth.get_nameid() session['samlNameIdFormat'] = auth.get_nameid_format() session['samlNameIdNameQualifier'] = auth.get_nameid_nq() session['samlNameIdSPNameQualifier'] = auth.get_nameid_spnq() session['samlSessionIndex'] = auth.get_session_index() self_url = OneLogin_Saml2_Utils.get_self_url(req) if 'RelayState' in request.form and self_url != request.form['RelayState']: # To avoid 'Open Redirect' attacks, before execute the redirection confirm # the value of the request.form['RelayState'] is a trusted URL. return redirect(auth.redirect_to(request.form['RelayState'])) elif auth.get_settings().is_debug_active(): error_reason = auth.get_last_error_reason() elif 'sls' in request.args: request_id = None if 'LogoutRequestID' in session: request_id = session['LogoutRequestID'] dscb = lambda: session.clear() url = auth.process_slo(request_id=request_id, delete_session_cb=dscb) errors = auth.get_errors() if len(errors) == 0: if url is not None: # To avoid 'Open Redirect' attacks, before execute the redirection confirm # the value of the url is a trusted URL. return redirect(url) else: success_slo = True elif auth.get_settings().is_debug_active(): error_reason = auth.get_last_error_reason() if 'samlUserdata' in session: paint_logout = True if len(session['samlUserdata']) > 0: attributes = session['samlUserdata'].items() return render_template( 'index.html', errors=errors, error_reason=error_reason, not_auth_warn=not_auth_warn, success_slo=success_slo, attributes=attributes, paint_logout=paint_logout ) @app.route('/metadata') def metadata(): req = prepare_flask_request(request) auth = init_saml_auth(req) settings = auth.get_settings() metadata = settings.get_sp_metadata() errors = settings.validate_metadata(metadata) if len(errors) == 0: resp = make_response(metadata, 200) resp.headers['Content-Type'] = 'text/xml' else: resp = make_response(', '.join(errors), 500) return resp if __name__ == "__main__": app.run(host='127.0.0.1', port=5000, debug=True)