import logging import streamlit as st from sqlalchemy.sql import text from enums import CounterType logger = logging.getLogger(__name__) connection = st.connection("sqlite") with connection.session as configure_session: configure_session.execute(text('PRAGMA foreign_keys=ON')) def create_counter(title:str, counter_type:CounterType, counter_color) -> None: with connection.session as session: try: query = text('INSERT INTO counters (name, type, color) VALUES (:title, :type, :color)') session.execute(query, {'title': title, 'type': counter_type, 'color': counter_color}) session.commit() except Exception as e: logger.error(e) session.rollback() def get_counters(): try: return connection.query('SELECT id, name, type, color FROM counters', ttl=0) except Exception as e: logger.error(e) return st.dataframe() def increment_counter(counter_id:int) -> None: with connection.session as session: try: query = text('INSERT INTO entries (counter_id) VALUES (:id)') session.execute(query, {'id': counter_id}) session.commit() except Exception as e: logger.error(e) session.rollback() def remove_counter(counter_id:int) -> None: with connection.session as session: try: query = text('DELETE FROM counters WHERE id = :id') session.execute(query, {'id': counter_id}) session.commit() except Exception as e: logger.error(e) session.rollback() def get_counter(counter_id:int): try: return connection.query('SELECT * FROM counters WHERE id = :id', params={'id': counter_id}, ttl=0).iloc[0] except Exception as e: logger.error(e) return None def get_analytics(end_date:str = 'now'): try: return connection.query(''' WITH RECURSIVE timeseries(d) AS ( VALUES(date(:end_date)) UNION ALL SELECT date(d, '-1 day') as d FROM timeseries WHERE d > date(:end_date, '-30 days') ), stats AS ( SELECT date(timestamp) as d, counter_id, sum(increment) as count FROM entries group by counter_id, date(timestamp) ) select s.d as date, case when counter_id is null then json_object() else json_group_object(name, count) end as counters FROM timeseries s left outer join stats t on s.d = t.d left join counters c on t.counter_id = c.id GROUP by s.d ''', params={"end_date": end_date}, ttl=0) except Exception as e: logger.error(e) return None def get_daily_analytics(counter_id:int, end_date:str = 'now'): try: return connection.query(''' WITH RECURSIVE timeseries(d) AS ( VALUES(date(:end_date)) UNION ALL SELECT date(d, '-1 day') as d FROM timeseries WHERE d > date(:end_date, '-7 days') ), stats AS ( SELECT date(timestamp) as d, sum(increment) as count FROM entries where counter_id = :id group by date(timestamp) ) SELECT t.d as "date", coalesce(s.count, 0) as count FROM timeseries as t LEFT JOIN stats as s on s.d = t.d ''', params={'id': counter_id, "end_date": end_date}, ttl=0) except Exception as e: logger.error(e) return None def get_weekly_analytics(counter_id:int, end_date:str = 'now'): try: return connection.query(''' WITH RECURSIVE timeseries(d) AS ( VALUES(date(:end_date, 'weekday 0')) UNION ALL SELECT date(d, '-7 day') FROM timeseries WHERE d > date(:end_date, '-30 days') ), weeks AS ( SELECT strftime('%W',d) as w FROM timeseries ), stats AS ( SELECT strftime('%W', timestamp) as w, sum(increment) as count FROM entries where counter_id = :id group by strftime('%W', timestamp) ) SELECT w.w as "week", coalesce(s.count, 0) as count FROM weeks as w LEFT JOIN stats as s on s.w = w.w ''', params={'id': counter_id, "end_date": end_date}, ttl=0) except Exception as e: logger.error(e) return None def get_monthly_analytics(counter_id:int, end_date:str = 'now'): try: return connection.query(''' WITH RECURSIVE timeseries(d) AS ( VALUES( date(:end_date, 'start of year')) UNION ALL SELECT date(d, '+1 month') FROM timeseries WHERE d < date(:end_date, '-1 month') ), months AS ( SELECT strftime('%m',d) as m, strftime('%Y',d) as y FROM timeseries ), stats AS ( SELECT strftime('%m', timestamp) as m, strftime('%Y', timestamp) as y, sum(increment) as count FROM entries where counter_id = :id group by strftime('%m', timestamp), strftime('%Y', timestamp) ) SELECT concat(m.m,', ',m.y) as "month", coalesce(s.count, 0) as count FROM months as m LEFT JOIN stats as s on s.m = m.m and s.y = m.y ''', params={'id': counter_id, "end_date": end_date}, ttl=0) except Exception as e: logger.error(e) return None def get_yearly_analytics(counter_id:int, end_date:str = 'now'): try: return connection.query(''' WITH RECURSIVE timeseries(d) AS ( VALUES( date(:end_date, 'start of year', '-4 years')) UNION ALL SELECT date(d, '+1 year') FROM timeseries WHERE d < date(:end_date, '-1 year') ), years AS ( SELECT strftime('%Y',d) as y FROM timeseries ), stats AS ( SELECT strftime('%Y', timestamp) as y, sum(increment) as count FROM entries where counter_id = :id group by strftime('%Y', timestamp) ) SELECT m.y as "year", coalesce(s.count, 0) as count FROM years as m LEFT JOIN stats as s on s.y = m.y ''', params={'id': counter_id, "end_date": end_date}, ttl=0) except Exception as e: logger.error(e) return None def get_colors(palette_id:int): try: return connection.query('''SELECT color1,color2,color3,color4,color5 FROM color_palettes WHERE id = :id''', params={'id': palette_id}) except Exception as e: logger.error(e) return None