Skip to content
Snippets Groups Projects
Select Git revision
  • 952d2e2bd895c617e90a82fe1c8739ecbd5625b6
  • master default protected
2 results

routing.py

Blame
  • Forked from Jean-Marie Place / SCODOC_R6A06
    Source project has a limited visibility.
    routing.py 8.37 KiB
    """
    Routes for CAS authentication
    Modified for ScoDoc
    """
    
    import re
    import ssl
    from urllib.error import URLError
    from urllib.request import urlopen
    from xml.parsers.expat import ExpatError
    
    import flask
    from flask import current_app, request
    from xmltodict import parse
    
    from .cas_urls import create_cas_login_url
    from .cas_urls import create_cas_logout_url
    from .cas_urls import create_cas_validate_url
    
    
    blueprint = flask.Blueprint("cas", __name__)
    
    
    @blueprint.route("/login/")
    def login():
        """
        This route has two purposes. First, it is used by the user
        to login. Second, it is used by the CAS to respond with the
        `ticket` after the user logs in successfully.
    
        When the user accesses this url, they are redirected to the CAS
        to login. If the login was successful, the CAS will respond to this
        route with the ticket in the url. The ticket is then validated.
        If validation was successful the logged in username is saved in
        the user's session under the key `CAS_USERNAME_SESSION_KEY` and
        the user's attributes are saved under the key
        'CAS_USERNAME_ATTRIBUTE_KEY'
        """
        conf_func = current_app.config.get("CAS_CONFIGURATION_FUNCTION")
        if conf_func:  # call function setting app configuration
            conf_func(current_app)
        if not "CAS_SERVER" in current_app.config:
            current_app.logger.info("cas_login: no configuration")
            return "CAS configuration missing"
        cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
    
        redirect_url = create_cas_login_url(
            current_app.config["CAS_SERVER"],
            current_app.config["CAS_LOGIN_ROUTE"],
            flask.url_for(
                ".login",
                origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
                _external=True,
            ),
        )
        if "ticket" in flask.request.args:
            flask.session[cas_token_session_key] = flask.request.args["ticket"]
    
        if cas_token_session_key in flask.session:
            if validate(flask.session[cas_token_session_key]):
                if "CAS_AFTER_LOGIN_SESSION_URL" in flask.session:
                    redirect_url = flask.session.pop("CAS_AFTER_LOGIN_SESSION_URL")
                elif flask.request.args.get("origin"):
                    redirect_url = flask.request.args["origin"]
                else:
                    redirect_url = flask.url_for(current_app.config["CAS_AFTER_LOGIN"])
            else:
                flask.session.pop(cas_token_session_key, None)
    
        current_app.logger.debug(f"cas.login: redirecting to {redirect_url}")
    
        return flask.redirect(redirect_url)
    
    
    @blueprint.route("/logout/")
    def logout():
        """
        When the user accesses this route they are logged out.
        """
        conf_func = current_app.config.get("CAS_CONFIGURATION_FUNCTION")
        if conf_func:  # call function setting app configuration
            conf_func(current_app)
        cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
        cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
        cas_token_session_key = current_app.config["CAS_TOKEN_SESSION_KEY"]
    
        flask.session.pop(cas_username_session_key, None)
        flask.session.pop(cas_attributes_session_key, None)
        flask.session.pop(cas_token_session_key, None)  # added by EV
        flask.session.pop("CAS_EDT_ID", None)  # added by EV
        cas_after_logout = current_app.config.get("CAS_AFTER_LOGOUT")
        cas_logout_route = current_app.config.get("CAS_LOGOUT_ROUTE")
        cas_server = current_app.config.get("CAS_SERVER")
        if cas_server:
            if cas_after_logout and cas_logout_route:
                # If config starts with http, use it as dest URL.
                # Else, build Flask URL
                dest_url = (
                    cas_after_logout
                    if cas_after_logout.startswith("http")
                    else flask.url_for(cas_after_logout, _external=True)
                )
                redirect_url = create_cas_logout_url(
                    cas_server,
                    cas_logout_route,
                    dest_url,
                )
            else:
                redirect_url = create_cas_logout_url(cas_server, None)
        else:
            redirect_url = request.root_url
    
        current_app.logger.debug(f"cas.logout: redirecting to {redirect_url}")
        return flask.redirect(redirect_url)
    
    
    def validate(ticket) -> bool:
        """
        Will attempt to validate the ticket. If validation fails, then False
        is returned. If validation is successful, then True is returned
        and the validated username is saved in the session under the
        key `CAS_USERNAME_SESSION_KEY` while the validated attributes dictionary
        is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
        """
        from app.models.config import ScoDocSiteConfig
    
        cas_username_session_key = current_app.config["CAS_USERNAME_SESSION_KEY"]
        cas_attributes_session_key = current_app.config["CAS_ATTRIBUTES_SESSION_KEY"]
        cas_error_callback = current_app.config.get("CAS_ERROR_CALLBACK")
        current_app.logger.debug(f"validating token {ticket}")
    
        cas_validate_url = create_cas_validate_url(
            current_app.config["CAS_SERVER"],
            current_app.config["CAS_VALIDATE_ROUTE"],
            flask.url_for(
                ".login",
                origin=flask.session.get("CAS_AFTER_LOGIN_SESSION_URL"),
                _external=True,
            ),
            ticket,
        )
    
        current_app.logger.debug(f"Making GET request to {cas_validate_url}")
    
        xml_from_dict = {}
        is_valid = False
    
        if current_app.config.get("CAS_SSL_VERIFY"):
            ssl_context = ssl.SSLContext()
            ssl_context.verify_mode = ssl.CERT_REQUIRED
            ca_data = current_app.config.get("CAS_SSL_CERTIFICATE", "")
            try:
                ssl_context.load_verify_locations(cadata=ca_data)
            except (ssl.SSLError, ValueError):
                current_app.logger.error("CAS : error loading SSL cert.")
                if cas_error_callback:
                    cas_error_callback("erreur chargement certificat SSL CAS (PEM)")
                return False
        else:
            ssl_context = None
    
        try:
            xmldump = (
                urlopen(cas_validate_url, context=ssl_context)
                .read()
                .strip()
                .decode("utf8", "ignore")
            )
            xml_from_dict = parse(xmldump)
            is_valid = (
                True
                if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"]
                else False
            )
        except (ValueError, ExpatError, KeyError) as exc:
            current_app.logger.error(f"CAS returned unexpected result: {exc}")
            current_app.logger.error(f"Received data from CAS server:\n{xmldump}\n\n")
            if cas_error_callback:
                cas_error_callback("réponse invalide du serveur CAS")
            return False  # unreachable (cas_error_callback should raise exception)
        except URLError:
            current_app.logger.error("CAS : error validating token: check SSL certificate")
            cas_error_callback(
                "erreur connexion au serveur CAS: vérifiez le certificat SSL"
            )
            return False  # unreachable
        except Exception as exc:
            current_app.logger.error(f"CAS : unkown error validating token: {exc}")
            cas_error_callback("erreur décodage réponse CAS")
            return False  # unreachable
    
        if is_valid:
            current_app.logger.debug("valid")
            xml_from_dict = xml_from_dict["cas:serviceResponse"][
                "cas:authenticationSuccess"
            ]
            username = xml_from_dict["cas:user"]
            attributes = xml_from_dict.get("cas:attributes", {})
    
            if attributes and "cas:memberOf" in attributes:
                if isinstance(attributes["cas:memberOf"], str):
                    attributes["cas:memberOf"] = (
                        attributes["cas:memberOf"].lstrip("[").rstrip("]").split(",")
                    )
                    for group_number in range(0, len(attributes["cas:memberOf"])):
                        attributes["cas:memberOf"][group_number] = (
                            attributes["cas:memberOf"][group_number].lstrip(" ").rstrip(" ")
                        )
            # Extract auxiliary informations (utilisé pour edt_id)
            exp = ScoDocSiteConfig.get("cas_edt_id_from_xml_regexp")
            if exp:
                m = re.search(exp, xmldump)
                if m and len(m.groups()) > 0:
                    cas_edt_id = m.group(1)
                    if cas_edt_id:
                        flask.session["CAS_EDT_ID"] = cas_edt_id
    
            flask.session[cas_username_session_key] = username
            flask.session[cas_attributes_session_key] = attributes
        else:
            current_app.logger.debug("invalid")
    
        return is_valid