Skip to content
Snippets Groups Projects
Select Git revision
  • 2967596de2dee1acabdb89aac2175f557c55e591
  • master default protected
2 results

models.py

Blame
  • Forked from Jean-Marie Place / SCODOC_R6A06
    98 commits behind the upstream repository.
    models.py 32.88 KiB
    # -*- coding: UTF-8 -*
    
    """Users and Roles models for ScoDoc
    """
    
    import base64
    from datetime import datetime, timedelta
    import os
    import re
    from time import time
    from typing import Optional
    
    import cracklib  # pylint: disable=import-error
    
    from flask import current_app, flash, g
    from flask_login import UserMixin, AnonymousUserMixin
    from sqlalchemy.exc import (
        IntegrityError,
        DataError,
        DatabaseError,
        OperationalError,
        ProgrammingError,
        StatementError,
        InterfaceError,
    )
    
    from werkzeug.security import generate_password_hash, check_password_hash
    
    import jwt
    
    from app import db, email, log, login
    from app.models import Departement, ScoDocModel
    from app.models import SHORT_STR_LEN, USERNAME_STR_LEN
    from app.models.config import ScoDocSiteConfig
    from app.scodoc.sco_exceptions import ScoValueError
    from app.scodoc.sco_permissions import Permission
    from app.scodoc.sco_roles_default import SCO_ROLES_DEFAULTS
    import app.scodoc.sco_utils as scu
    
    VALID_LOGIN_EXP = re.compile(r"^[a-zA-Z0-9@\\\-_\.]+$")
    DEFAULT_RESET_TOKEN_DURATION = 24 * 60 * 60  # seconds (default 24h)
    
    
    def is_valid_password(cleartxt) -> bool:
        """Check password.
        returns True if OK.
        """
        if (
            hasattr(scu.CONFIG, "MIN_PASSWORD_LENGTH")
            and scu.CONFIG.MIN_PASSWORD_LENGTH > 0
            and len(cleartxt) < scu.CONFIG.MIN_PASSWORD_LENGTH
        ):
            return False  # invalid: too short
        try:
            _ = cracklib.FascistCheck(cleartxt)
            return True
        except ValueError:
            return False
    
    
    def is_valid_user_name(user_name: str) -> bool:
        "Check that user_name (aka login) is valid"
        return (
            user_name
            and (len(user_name) >= 2)
            and (len(user_name) < USERNAME_STR_LEN)
            and VALID_LOGIN_EXP.match(user_name)
        )
    
    
    class User(UserMixin, ScoDocModel):
        """ScoDoc users, handled by Flask / SQLAlchemy"""
    
        id = db.Column(db.Integer, primary_key=True)
        user_name = db.Column(db.String(USERNAME_STR_LEN), index=True, unique=True)
        "le login"
        email = db.Column(db.String(120))
        "email à utiliser par ScoDoc"
        email_institutionnel = db.Column(db.String(120))
        "email dans l'établissement, facultatif"
        nom = db.Column(db.String(USERNAME_STR_LEN))
        prenom = db.Column(db.String(USERNAME_STR_LEN))
        dept = db.Column(db.String(SHORT_STR_LEN), index=True)
        "acronyme du département de l'utilisateur"
        active = db.Column(db.Boolean, default=True, index=True)
        "si faux, compte utilisateur désactivé"
        cas_id = db.Column(db.Text(), index=True, unique=True, nullable=True)
        "uid sur le CAS (id, mail ou autre attribut, selon config.cas_attribute_id)"
        cas_allow_login = db.Column(
            db.Boolean, default=False, server_default="false", nullable=False
        )
        "Peut-on se logguer via le CAS ?"
        cas_allow_scodoc_login = db.Column(
            db.Boolean, default=False, server_default="false", nullable=False
        )
        """Si CAS activé et cas_id renseigné, peut-on se logguer sur ScoDoc directement ?
        (le rôle ScoSuperAdmin peut toujours, mettre à True pour les utilisateur API)
        """
        cas_last_login = db.Column(db.DateTime, nullable=True)
        """date du dernier login via CAS"""
        edt_id = db.Column(db.Text(), index=True, nullable=True)
        "identifiant emplois du temps (unicité non imposée)"
        password_hash = db.Column(db.Text())  # les hashs modernes peuvent être très longs
        password_scodoc7 = db.Column(db.String(42))
        last_seen = db.Column(db.DateTime, default=datetime.now)
        date_modif_passwd = db.Column(db.DateTime, default=datetime.now)
        date_created = db.Column(db.DateTime, default=datetime.now)
        date_expiration = db.Column(db.DateTime, default=None)
        passwd_must_be_changed = db.Column(
            db.Boolean, nullable=False, server_default="false", default=False
        )
        passwd_temp = db.Column(db.Boolean, default=False)
        """champ obsolete. Si connexion alors que passwd_temp est vrai,
        efface mot de passe et redirige vers accueil."""
        token = db.Column(db.Text(), index=True, unique=True)
        token_expiration = db.Column(db.DateTime)
    
        # Define the back reference from User to ModuleImpl
        modimpls = db.relationship("ModuleImpl", back_populates="responsable")
        roles = db.relationship("Role", secondary="user_role", viewonly=True)
        Permission = Permission
    
        _departement = db.relationship(
            "Departement",
            foreign_keys=[Departement.acronym],
            primaryjoin=(dept == Departement.acronym),
            lazy="select",
            passive_deletes="all",
            uselist=False,
        )
    
        def __init__(self, **kwargs):
            "user_name:str is mandatory"
            self.roles = []
            self.user_roles = []
            # check login:
            if "user_name" not in kwargs:
                raise ValueError("missing user_name argument")
            if not is_valid_user_name(kwargs["user_name"]):
                raise ValueError(f"invalid user_name: {kwargs['user_name']}")
            kwargs["nom"] = kwargs.get("nom", "") or ""
            kwargs["prenom"] = kwargs.get("prenom", "") or ""
            super().__init__(**kwargs)
            # Ajoute roles:
            if (
                not self.roles
                and self.email
                and self.email == current_app.config["SCODOC_ADMIN_MAIL"]
            ):
                # super-admin
                admin_role = Role.query.filter_by(name="SuperAdmin").first()
                assert admin_role
                self.add_role(admin_role, None)
                db.session.commit()
            # current_app.logger.info("creating user with roles={}".format(self.roles))
    
        def __repr__(self):
            return f"""<User {self.user_name} id={self.id} dept={self.dept}{
                ' (inactive)' if not self.active else ''}>"""
    
        def __str__(self):
            return self.user_name
    
        @classmethod
        def get_user(cls, user_id: int | str, accept_none=False):
            """Get user by id, user_name or User instance, ou 404 (ou None si accept_none)
            If user_id == -1, returns None (without exception)
            """
            query = None
            if isinstance(user_id, str):
                query = db.session.query(cls).filter_by(user_name=user_id)
            elif isinstance(user_id, int):
                if user_id == -1:
                    return None
                query = db.session.query(cls).filter_by(id=user_id)
            elif isinstance(user_id, User):
                return user_id
            else:
                raise ValueError("invalid user_id")
            return query.first_or_404() if not accept_none else query.first()
    
        def can_login_using_scodoc(self) -> bool:
            """True si l'utilisateur peut (essayer de) se connecter avec son compte local ScoDoc
            (si par ailleurs un mot de passe valide existe et que le compte est actif)
    
            Toujours vrai pour le super-admin.
            Si CAS activé and cas_id renseigné, il faut cas_allow_scodoc_login.
    
            Réglages possibles:
            - Global : cas_force CAS forcé pour tous sauf super-admin
            - Par utilisateur:
                - cas_allow_login : Peut-on se logguer via le CAS ?
                - cas_allow_scodoc_login : Si CAS activé, peut-on se logguer sur ScoDoc ?
    
            """
            if self.is_administrator():
                return True  # super admin ou autorisation individuelle
            cas_enabled = ScoDocSiteConfig.is_cas_enabled()
            if not cas_enabled:
                return True  # CAS not enabled
    
            if not self.cas_allow_scodoc_login:
                log(
                    f"""auth: {self.user_name
                    }: cas enabled, scodoc login not allowed"""
                )
                return False
    
            if ScoDocSiteConfig.is_cas_forced() and self.cas_id and self.cas_allow_login:
                log(
                    f"""auth: {self.user_name
                    } (cas_id='{
                        self.cas_id}'): cas forced and cas_id set: scodoc login not allowed"""
                )
                return False
    
            return True
    
        def set_password(self, password: str):
            "Set password"
            log(f"set_password({self})")
            previous_hash = self.password_hash
            if password:
                self.password_hash = generate_password_hash(password)
            else:
                self.password_hash = None
            if self.password_hash != previous_hash:
                self.date_modif_passwd = datetime.now()
            # La création d'un mot de passe efface l'éventuel mot de passe historique
            self.password_scodoc7 = None
            self.passwd_temp = False
            # Retire le flag
            self.passwd_must_be_changed = False
    
        def check_password(self, password: str) -> bool:
            """Check given password vs current one.
            Returns `True` if the password matched, `False` otherwise.
            Also checks for temporary passwords and if CAS disables scodoc login.
            """
            if not self.active:  # inactived users can't login
                current_app.logger.warning(
                    f"auth: login attempt from inactive account {self}"
                )
                return False
            if self.passwd_temp:
                # Anciens comptes ScoDoc 7 non migrés
                # désactive le compte par sécurité.
                current_app.logger.warning(f"auth: desactivating legacy account {self}")
                self.active = False
                self.passwd_temp = True
                db.session.add(self)
                db.session.commit()
                send_notif_desactivation_user(self)
                return False
    
            if not self.can_login_using_scodoc():
                return False
    
            if not self.password_hash:  # user without password can't login
                if self.password_scodoc7:
                    # Special case: user freshly migrated from ScoDoc7
                    return self._migrate_scodoc7_password(password)
                return False
    
            return check_password_hash(self.password_hash, password)
    
        def _migrate_scodoc7_password(self, password) -> bool:
            """After migration, rehash password."""
            if scu.check_scodoc7_password(self.password_scodoc7, password):
                current_app.logger.warning(
                    f"auth: migrating legacy ScoDoc7 password for {self}"
                )
                self.set_password(password)
                self.password_scodoc7 = None
                db.session.add(self)
                db.session.commit()
                return True
            return False
    
        def get_reset_password_token(
            self, expires_in=DEFAULT_RESET_TOKEN_DURATION
        ) -> str | None:
            """Un token pour réinitialiser son mot de passe.
            Par défaut valide durant 24 heures.
            Note: si le CAS est obligatoire pour l'utilisateur, renvoie None
            """
            # si la config CAS interdit le login ScoDoc, pas de token
            if not self.can_login_using_scodoc():
                return None
            token = jwt.encode(
                {"reset_password": self.id, "exp": time() + expires_in},
                current_app.config["SECRET_KEY"],
                algorithm="HS256",
            )
            return token
    
        @staticmethod
        def verify_reset_password_token(token):
            "Vérification du token de ré-initialisation du mot de passe"
            try:
                token = jwt.decode(
                    token, current_app.config["SECRET_KEY"], algorithms=["HS256"]
                )
            except jwt.exceptions.ExpiredSignatureError:
                log("verify_reset_password_token: token expired")
                return None
            except Exception as exc:  # pylint: disable=bare-except
                log("verify_reset_password_token: checking token '{token}'")
                log(f"verify_reset_password_token: {exc}")
                return None
            try:
                user_id = token["reset_password"]
                # double check en principe inutile car déjà fait dans decode()
                expire = float(token["exp"])
                if time() > expire:
                    log(f"verify_reset_password_token: token expired for uid={user_id}")
                    return None
            except (TypeError, KeyError):
                return None
            return db.session.get(User, user_id)
    
        def sort_key(self) -> tuple:
            "sort key"
            return (
                (self.nom or "").upper(),
                (self.prenom or "").upper(),
                (self.user_name or "").upper(),
            )
    
        def to_dict(self, include_email=True):
            """l'utilisateur comme un dict, avec des champs supplémentaires"""
            data = {
                "date_expiration": (
                    self.date_expiration.isoformat() + "Z" if self.date_expiration else None
                ),
                "date_modif_passwd": (
                    self.date_modif_passwd.isoformat() + "Z"
                    if self.date_modif_passwd
                    else None
                ),
                "passwd_must_be_changed": self.passwd_must_be_changed,
                "date_created": (
                    self.date_created.isoformat() + "Z" if self.date_created else None
                ),
                "dept": self.dept,
                "id": self.id,
                "active": self.active,
                "cas_id": self.cas_id,
                "cas_allow_login": self.cas_allow_login,
                "cas_allow_scodoc_login": self.cas_allow_scodoc_login,
                "cas_last_login": (
                    self.cas_last_login.isoformat() + "Z" if self.cas_last_login else None
                ),
                "edt_id": self.edt_id,
                "status_txt": "actif" if self.active else "fermé",
                "last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None,
                "nom": self.nom or "",
                "prenom": self.prenom or "",
                "roles_string": self.get_roles_string(),  # eg "Ens_RT, Ens_Info"
                "user_name": self.user_name,
                # Les champs calculés:
                "nom_fmt": self.get_nom_fmt(),
                "prenom_fmt": self.get_prenom_fmt(),
                "nomprenom": self.get_nomprenom(),
                "prenomnom": self.get_prenomnom(),
                "nomplogin": self.get_nomplogin(),
                "nomcomplet": self.get_nomcomplet(),
            }
            if include_email:
                data["email"] = self.email or ""
                data["email_institutionnel"] = self.email_institutionnel or ""
            return data
    
        @classmethod
        def convert_dict_fields(cls, args: dict) -> dict:
            """Convert fields in the given dict. No other side effect.
            args: dict with args in application.
            returns: dict to store in model's db.
            Convert boolean values to bools.
            """
            args_dict = args
            # Dates
            if "date_expiration" in args:
                date_expiration = args.get("date_expiration")
                if isinstance(date_expiration, str):
                    args["date_expiration"] = (
                        datetime.fromisoformat(date_expiration) if date_expiration else None
                    )
            # booléens:
            for field in ("active", "cas_allow_login", "cas_allow_scodoc_login"):
                if field in args:
                    args_dict[field] = scu.to_bool(args.get(field))
    
            # chaines ne devant pas être NULLs
            for field in ("nom", "prenom"):
                if field in args:
                    args[field] = args[field] or ""
    
            # chaines ne devant pas être vides mais au contraire null (unicité)
            if "cas_id" in args:
                args["cas_id"] = args["cas_id"] or None
    
            return args_dict
    
        def from_dict(self, args: dict, new_user=False):
            """Set users' attributes from given dict values.
            - roles_string : roles, encoded like "Ens_RT, Secr_CJ"
            - date_expiration is a dateime object.
            Does not check permissions here.
            """
            if new_user:
                if "user_name" in args:
                    # never change name of existing users
                    # (see change_user_name method to do that)
                    if not is_valid_user_name(args["user_name"]):
                        raise ValueError(f"invalid user_name: {args['user_name']}")
                    self.user_name = args["user_name"]
                if "password" in args:
                    self.set_password(args["password"])
    
            # Roles: roles_string is "Ens_RT, Secr_RT, ..."
            if "roles_string" in args:
                self.user_roles = []
                for r_d in args["roles_string"].split(","):
                    if r_d:
                        role, dept = UserRole.role_dept_from_string(r_d)
                        self.add_role(role, dept)
    
            # email_institutionnel may not be unique, but check and warns user
            email_institutionnel = args.get("email_institutionnel")
            if email_institutionnel and email_institutionnel != self.email_institutionnel:
                nb_with_this_mail = (
                    db.session.query(User)
                    .filter_by(email_institutionnel=email_institutionnel)
                    .count()
                )
                if nb_with_this_mail > 0:
                    log(
                        "User.from_dict: plusieurs utilisateurs avec ce mail institutionnel"
                    )
                    flash(
                        "Attention: plusieurs utilisateurs avec ce mail institutionnel",
                        "warning",
                    )
    
            super().from_dict(
                args, excluded={"cas_id", "user_name", "roles_string", "roles"}
            )
    
            new_cas_id = args.get("cas_id", "").strip() or None
            if ScoDocSiteConfig.cas_uid_use_scodoc():
                new_cas_id = self.user_name
            else:
                # Set cas_id using regexp if configured:
                exp = ScoDocSiteConfig.get("cas_uid_from_mail_regexp")
                if exp and self.email_institutionnel:
                    cas_id = ScoDocSiteConfig.extract_cas_id(self.email_institutionnel)
                    if cas_id:
                        new_cas_id = cas_id
            if new_cas_id != self.cas_id:
                existing: User = (
                    db.session.query(User).filter_by(cas_id=new_cas_id).first()
                    if new_cas_id is not None
                    else None
                )
                if not existing:
                    self.cas_id = new_cas_id
                else:
                    msg = f"""CAS id invalide pour {self.user_name
                        }, déjà utilisé par {existing.user_name}"""
                    log(f"User.from_dict: {msg}")
                    raise ScoValueError(msg)
    
        def get_token(self, expires_in=3600):
            "Un jeton pour cet user. Stocké en base, non commité."
            now = datetime.now()
            if self.token and self.token_expiration > now + timedelta(seconds=60):
                return self.token
            self.token = base64.b64encode(os.urandom(24)).decode("utf-8")
            self.token_expiration = now + timedelta(seconds=expires_in)
            db.session.add(self)
            return self.token
    
        def revoke_token(self):
            "Révoque le jeton de cet utilisateur"
            self.token_expiration = datetime.now() - timedelta(seconds=1)
    
        @staticmethod
        def check_token(token):
            """Retreive user for given token, check token's validity
            and returns the user object.
            """
            user = User.query.filter_by(token=token).first()
            if user is None or user.token_expiration < datetime.now():
                return None
            return user
    
        def get_dept_id(self) -> int:
            "returns user's department id, or None"
            if self.dept:
                return self._departement.id
            return None
    
        def get_emails(self):
            "List mail adresses to contact this user"
            mails = []
            if self.email:
                mails.append(self.email)
            if self.email_institutionnel:
                mails.append(self.email_institutionnel)
            return mails
    
        # Permissions management:
        def has_permission(self, perm: int, dept: str = False):
            """Check if user has permission `perm` in given `dept` (acronym).
            Similar to Zope ScoDoc7 `has_permission``
    
            Args:
                perm: integer, one of the value defined in Permission class.
                dept: dept id (eg 'RT'), default to current departement.
            """
            if not self.active:
                return False
            if dept is False:
                dept = g.scodoc_dept
            # les role liés à ce département, et les roles avec dept=None (super-admin)
            roles_in_dept = (
                UserRole.query.filter_by(user_id=self.id)
                .filter(
                    (UserRole.dept == dept)
                    | (UserRole.dept == None)  # pylint: disable=C0121
                )
                .all()
            )
            for user_role in roles_in_dept:
                if user_role.role.has_permission(perm):
                    return True
            return False
    
        # Role management
        def add_role(self, role: "Role", dept: str):
            """Add a role to this user.
            :param role: Role to add.
            """
            if not isinstance(role, Role):
                raise ScoValueError("add_role: rôle invalide")
            user_role = UserRole(user=self, role=role, dept=dept)
            db.session.add(user_role)
            self.user_roles.append(user_role)
    
        def add_roles(self, roles: "list[Role]", dept: str):
            """Add roles to this user.
            :param roles: Roles to add.
            """
            for role in roles:
                self.add_role(role, dept)
    
        def set_roles(self, roles, dept):
            "set roles in the given dept"
            self.user_roles = []
            for r in roles:
                if isinstance(r, Role):
                    self.add_role(r, dept)
    
        def get_roles(self):
            "iterator on my roles"
            for role in self.roles:
                yield role
    
        def get_roles_string(self):
            """string repr. of user's roles (with depts)
            e.g. "Ens_RT, Ens_Info, Secr_CJ"
            """
            return ", ".join(
                f"{r.role.name or ''}_{r.dept or ''}"
                for r in self.user_roles
                if r is not None
            )
    
        def get_depts_with_permission(self, permission: int) -> list[str]:
            """Liste des acronymes de département dans lesquels cet utilisateur
            possède la permission indiquée.
            L'"acronyme" None signifie "tous les départements".
            Si plusieurs permissions (plusieurs bits) sont indiquées, c'est un "ou":
            les départements dans lesquels l'utilisateur a l'une des permissions.
            """
            return [
                user_role.dept
                for user_role in UserRole.query.filter_by(user=self)
                .join(Role)
                .filter(Role.permissions.op("&")(permission) != 0)
            ]
    
        def is_administrator(self):
            "True if i'm an active SuperAdmin"
            return self.active and self.has_permission(Permission.ScoSuperAdmin, dept=None)
    
        # Some useful strings:
        def get_nomplogin(self):
            """nomplogin est le nom en majuscules suivi du prénom et du login
            e.g. Dupont Pierre (dupont)
            """
            nom = scu.format_nom(self.nom) if self.nom else self.user_name.upper()
            return f"{nom} {scu.format_prenom(self.prenom)} ({self.user_name})"
    
        @staticmethod
        def get_user_from_nomplogin(nomplogin: str) -> Optional["User"]:
            """Returns User instance from the string "Dupont Pierre (dupont)"
            or None if user does not exist
            """
            match = re.match(r".*\((.*)\)", nomplogin.strip())
            if match:
                user_name = match.group(1)
                u = User.query.filter_by(user_name=user_name).first()
                if u:
                    return u
            return None
    
        def get_nom_fmt(self):
            """Nom formaté: "Martin" """
            if self.nom:
                return scu.format_nom(self.nom, uppercase=False)
            else:
                return self.user_name
    
        def get_prenom_fmt(self):
            """Prénom formaté (minuscule capitalisées)"""
            return scu.format_prenom(self.prenom)
    
        def get_nomprenom(self):
            """Nom capitalisé suivi de l'initiale du prénom:
            Viennet E.
            """
            prenom_abbrv = scu.abbrev_prenom(scu.format_prenom(self.prenom))
            return (self.get_nom_fmt() + " " + prenom_abbrv).strip()
    
        def get_prenomnom(self):
            """L'initiale du prénom suivie du nom: "J.-C. Dupont" """
            prenom_abbrv = scu.abbrev_prenom(scu.format_prenom(self.prenom))
            return (prenom_abbrv + " " + self.get_nom_fmt()).strip()
    
        def get_nomcomplet(self):
            "Prénom et nom complets"
            return scu.format_prenom(self.prenom) + " " + self.get_nom_fmt()
    
        # nomnoacc était le nom en minuscules sans accents (inutile)
    
        def change_user_name(self, new_user_name: str):
            """Modify user name, update all relevant tables.
            commit session.
            """
            # Safety check
            new_user_name = new_user_name.strip()
            if (
                not is_valid_user_name(new_user_name)
                or User.query.filter_by(user_name=new_user_name).count() > 0
            ):
                raise ValueError("invalid user_name")
            # Le user_name est utilisé dans d'autres tables (sans être une clé)
            # BulAppreciations.author
            # EntrepriseHistorique.authenticated_user
            # EtudAnnotation.author
            # ScolarNews.authenticated_user
            # Scolog.authenticated_user
            from app.models import (
                BulAppreciations,
                EtudAnnotation,
                ScolarNews,
                Scolog,
            )
            from app.entreprises.models import EntrepriseHistorique
    
            try:
                # Update all instances of EtudAnnotation
                db.session.query(BulAppreciations).filter(
                    BulAppreciations.author == self.user_name
                ).update({BulAppreciations.author: new_user_name})
                db.session.query(EntrepriseHistorique).filter(
                    EntrepriseHistorique.authenticated_user == self.user_name
                ).update({EntrepriseHistorique.authenticated_user: new_user_name})
                db.session.query(EtudAnnotation).filter(
                    EtudAnnotation.author == self.user_name
                ).update({EtudAnnotation.author: new_user_name})
                db.session.query(ScolarNews).filter(
                    ScolarNews.authenticated_user == self.user_name
                ).update({ScolarNews.authenticated_user: new_user_name})
                db.session.query(Scolog).filter(
                    Scolog.authenticated_user == self.user_name
                ).update({Scolog.authenticated_user: new_user_name})
                # And update ourself:
                self.user_name = new_user_name
                db.session.add(self)
                db.session.commit()
            except (
                IntegrityError,
                DataError,
                DatabaseError,
                OperationalError,
                ProgrammingError,
                StatementError,
                InterfaceError,
            ) as exc:
                db.session.rollback()
                raise exc
    
    
    class AnonymousUser(AnonymousUserMixin):
        "Notre utilisateur anonyme"
    
        def has_permission(self, perm, dept=None):  # pylint: disable=unused-argument
            "always false, anonymous has no permission"
            return False
    
        def is_administrator(self):
            "always false, anonymous is not admin"
            return False
    
    
    login.anonymous_user = AnonymousUser
    
    
    class Role(ScoDocModel):
        """Roles for ScoDoc"""
    
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(64), unique=True, nullable=False, index=True)
        default = db.Column(db.Boolean, default=False, index=True)
        permissions = db.Column(db.BigInteger)  # 64 bits
        users = db.relationship("User", secondary="user_role", viewonly=True)
    
        def __init__(self, **kwargs):
            super(Role, self).__init__(**kwargs)
            if self.permissions is None:
                self.permissions = 0
    
        def __repr__(self):
            return "<Role {} perm={:0{w}b}>".format(
                self.name,
                self.permissions & ((1 << Permission.NBITS) - 1),
                w=Permission.NBITS,
            )
    
        def __str__(self):
            return f"{self.name}: perm={', '.join(Permission.permissions_names(self.permissions))}"
    
        def to_dict(self) -> dict:
            "As dict. Convert permissions to names."
            return {
                "id": self.id,
                "role_name": self.name,  # pour être cohérent avec partion_name, etc.
                "permissions": Permission.permissions_names(self.permissions),
            }
    
        def add_permission(self, perm: int):
            "Add permission to role"
            self.permissions |= perm
    
        def remove_permission(self, perm: int):
            "Remove permission from role"
            self.permissions = self.permissions & ~perm
    
        def reset_permissions(self):
            "Remove all permissions from role"
            self.permissions = 0
    
        def get_named_permissions(self) -> list[str]:
            "List of the names of the permissions associated to this rôle"
            return Permission.permissions_names(self.permissions)
    
        def set_named_permissions(self, permission_names: list[str]):
            """Set permissions, given as a list of permissions names.
            Raises ScoValueError if invalid permission."""
            self.permissions = 0
            for permission_name in permission_names:
                permission = Permission.get_by_name(permission_name)
                if permission is None:
                    raise ScoValueError("set_named_permissions: invalid permission name")
                self.permissions |= permission
    
        def has_permission(self, perm: int) -> bool:
            "True if role as this permission"
            return self.permissions & perm == perm
    
        @staticmethod
        def reset_standard_roles_permissions(reset_permissions=True):
            """Create default roles if missing, then, if reset_permissions,
            reset their permissions to default values.
            """
            Role.reset_roles_permissions(
                SCO_ROLES_DEFAULTS, reset_permissions=reset_permissions
            )
    
        @staticmethod
        def reset_roles_permissions(roles_perms: dict[str, tuple], reset_permissions=True):
            """Ajoute les permissions aux roles
            roles_perms : { "role_name" : (permission, ...) }
            reset_permissions : si vrai efface permissions déja existantes
            Si le role n'existe pas, il est (re) créé.
            """
            default_role = "Observateur"
            for role_name, permissions in roles_perms.items():
                role = Role.query.filter_by(name=role_name).first()
                if role is None:
                    role = Role(name=role_name)
                    role.default = role.name == default_role
                    db.session.add(role)
                if reset_permissions:
                    role.reset_permissions()
                    for perm in permissions:
                        role.add_permission(perm)
                    db.session.add(role)
    
            db.session.commit()
    
        @staticmethod
        def ensure_standard_roles():
            """Create default roles if missing"""
            Role.reset_standard_roles_permissions(reset_permissions=False)
    
        @staticmethod
        def get_named_role(name):
            """Returns existing role with given name, or None."""
            return Role.query.filter_by(name=name).first()
    
    
    class UserRole(ScoDocModel):
        """Associate user to role, in a dept.
        If dept is None, the role applies to all departments (eg super admin).
        """
    
        id = db.Column(db.Integer, primary_key=True)
        user_id = db.Column(db.Integer, db.ForeignKey("user.id"))
        role_id = db.Column(db.Integer, db.ForeignKey("role.id"))
        dept = db.Column(db.String(64))  # dept acronym ou NULL
        user = db.relationship(
            User, backref=db.backref("user_roles", cascade="all, delete-orphan")
        )
        role = db.relationship(
            Role, backref=db.backref("user_roles", cascade="all, delete-orphan")
        )
    
        def __repr__(self):
            return f"<UserRole u={self.user} r={self.role} dept={self.dept}>"
    
        @staticmethod
        def role_dept_from_string(role_dept: str):
            """Return tuple (role, dept) from the string
            role_dept, of the forme "Role_Dept".
            role is a Role instance, dept is a string, or None.
            """
            fields = role_dept.strip().split("_", 1)
            # maxsplit=1, le dept peut contenir un "_"
            if len(fields) != 2:
                current_app.logger.warning(
                    f"auth: role_dept_from_string:  Invalid role_dept '{role_dept}'"
                )
                raise ScoValueError("Invalid role_dept")
            role_name, dept = fields
            dept = dept.strip() if dept else ""
            if dept == "":
                dept = None
    
            role = Role.query.filter_by(name=role_name).first()
            if role is None:
                raise ScoValueError(f"role {role_name} does not exists")
            return (role, dept)
    
    
    def get_super_admin():
        """L'utilisateur admin (ou le premier, s'il y en a plusieurs).
        Utilisé par les tests unitaires et le script de migration.
        """
        admin_role = Role.query.filter_by(name="SuperAdmin").first()
        assert admin_role
        admin_user = (
            User.query.join(UserRole)
            .filter((UserRole.user_id == User.id) & (UserRole.role_id == admin_role.id))
            .first()
        )
        assert admin_user
        return admin_user
    
    
    def send_notif_desactivation_user(user: User):
        """Envoi un message mail de notification à l'admin et à l'adresse du compte désactivé"""
        recipients = user.get_emails() + [current_app.config.get("SCODOC_ADMIN_MAIL")]
        txt = [
            f"""Le compte ScoDoc '{user.user_name}' associé à votre adresse <{user.email}>""",
            """a été désactivé par le système car son mot de passe n'était pas valide.\n""",
            """Contactez votre responsable pour le ré-activer.\n""",
            """Ceci est un message automatique, ne pas répondre.""",
        ]
        txt = "\n".join(txt)
        email.send_email(
            f"ScoDoc: désactivation automatique du compte {user.user_name}",
            email.get_from_addr(),
            recipients,
            txt,
        )
        return txt