import logging import streamlit as st from sqlalchemy.sql import text from streamlit.user_info import UserInfoProxy from queries.connection import connection logger = logging.getLogger(__name__) def find_user_by_oidc_id(oidc_user_id): return connection().query('SELECT * FROM users WHERE oidc_user_id = :id', params={'id': oidc_user_id}) def find_user_by_email(email): return connection().query('SELECT * FROM users WHERE email = :email', params={'email': email}) def find_default_user(): return find_user_by_email('default') def update_default_user(email, name, oidc_user_id): with connection().session as session: try: query = text("UPDATE users SET email = :email, name = :name, oidc_user_id = :user_id WHERE email = 'default'") session.execute(query, {'email': email, 'name': name, 'user_id': oidc_user_id}) except Exception as e: session.rollback() raise e def create_user(email, name, oidc_user_id): with connection().session as session: try: logger.info("Creating new user %s", email) query = text('INSERT INTO users (email, name, oidc_user_id) VALUES (:email, :name, :user_id)') session.execute(query, {'email': email, 'name': name, 'user_id': oidc_user_id}) return connection().query('SELECT * FROM users WHERE oidc_user_id = :id', params={'id': oidc_user_id}) except Exception as e: session.rollback() raise e def set_user_in_session(user: UserInfoProxy): email = user.email user_id = user.sub if hasattr(user, 'name'): name = user.name else: name = None user_entity = find_user_by_oidc_id(user_id) if user_entity.empty: user_entity = find_user_by_email(email) if user_entity.empty: user_entity = find_default_user() if user_entity.empty: user_entity = create_user(email, name, user_id) else: update_default_user(email, name, user_id) user_entity = find_user_by_oidc_id(user_id) st.session_state.user_id = user_entity["id"][0] st.session_state.user_name = user_entity["name"][0] st.session_state.user_email = user_entity["email"][0] st.session_state.user_external_id = user_entity["oidc_user_id"][0]