Module secfsdstools.a_utils.dbutils

Basic DB handling functionality

Expand source code
"""
Basic DB handling functionality
"""

import logging
import os
import sqlite3
from abc import ABC
from dataclasses import Field
from typing import List, TypeVar, Tuple, Optional

import pandas as pd

T = TypeVar("T")  # pylint: disable=W0621

LOGGER = logging.getLogger(__name__)


# noinspection SqlResolve
class DB(ABC):
    """
    Base class for DB handling. Provides some basic functionality.
    """

    def __init__(self, db_dir="db/"):
        self.db_dir = db_dir
        self.database = os.path.join(self.db_dir, 'secfsdstools.db')

    def db_file_exists(self) -> bool:
        """
        Checks if the configured db files is actually present.

        Returns:
            bool: returns True, if the dbfile was found
        """
        return os.path.exists(self.database)

    def table_exists(self, table_name: str) -> bool:
        """
        Checks whether a table exists.

        Returns:
            bool: True if the  table is present
        """
        if not self.db_file_exists():
            return False

        # using the sqlite_master table to check whether the table exists
        sql = f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'"
        return len(self.execute_fetchall(sql)) > 0

    def get_connection(self) -> sqlite3.Connection:
        """
        creates a connection to the db.

        Returns:
            sqlite3.Connection: sqlite3 connection instance
        """
        return sqlite3.connect(self.database)

    def execute_read_as_df(self, sql: str) -> pd.DataFrame:
        """
        directly read the content into a pandas dataframe
        Args:
             sql (str): Select String
        Returns:
            pd.DataFrame: pd.DataFrame
        """
        conn = self.get_connection()
        try:
            LOGGER.debug("execute %s", sql)
            return pd.read_sql_query(sql, conn)
        finally:
            conn.close()

    def execute_fetchall(self, sql: str) -> List[Tuple]:
        """
        returns all results of the sql
        Args:
             sql (str): sql statement
        Returns:
            List[Tuple]: list with tuples
        """
        conn = self.get_connection()
        try:
            LOGGER.debug("execute %s", sql)
            return conn.execute(sql).fetchall()
        finally:
            conn.close()

    def execute_fetchall_typed(self, sql: str, T) -> List[T]:  # pylint: disable=W0621,C0103
        """fetches all data of the sql statement and directly wraps it
        into the provided type.
        Note all selected columns in the sql have to exist with the same
         name in the dataclass of type T.

        Args:
             sql (str): sql string
             T: type class
        Returns:
             List[T]: list of instances of the type
        """
        conn = self.get_connection()
        try:
            LOGGER.debug("execute %s", sql)
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            cursor.execute(sql)
            results = cursor.fetchall()
            return [T(**dict(x)) for x in results]
        finally:
            conn.close()

    def execute_single(self, sql: str, conn: sqlite3.Connection):
        """
        executes a single sql statement without any parameters.
        Args:
             sql (str): sql string, not paramterized
             conn (sqlite3.Connection): connection to use
        """
        LOGGER.debug("execute %s", sql)
        conn.execute(sql)

    def execute_many(self, sql: str, params: List[Tuple], conn: sqlite3.Connection):
        """
        executes a parameterized statement for every tuple in the params list
        Args:
             sql (str): parameterized statement
             params (List[Tuple]): list with tuples containing the parameters
             conn (sqlite3.Connection): connection to use
        """
        LOGGER.debug("execute %s", sql)
        conn.executemany(sql, params)

    def append_df_to_table(self, table_name: str, dataframe: pd.DataFrame,
                           conn: sqlite3.Connection):
        """
        add the content of a df to the table. The name of the columns in df
        and table have to match

        Args:
             table_name (str): name of the table to append the data
             dataframe (pd.DataFrame):  the df with the data
             conn (sqlite3.Connection): connection to use
        """
        dataframe.to_sql(table_name, conn, if_exists="append", index=False)

    def create_insert_statement_for_dataclass(self, table_name: str, data) -> str:
        """
        creates the insert sql statement based on the fields of a dataclass

        Args:
             table_name (str): name of the table to insert into
             data: object of the dataclass
        Returns:
            str: 'insert into' statement
        """
        # todo: None handling
        fields: List[Field]
        if isinstance(data.__dataclass_fields__, dict):
            # __dataclass_fields__ is a dict, so you can use the
            # .values() method to get the Field objects
            fields = data.__dataclass_fields__.values()
        else:  # from python 3.10
            # __dataclass_fields__ is a tuple, so you can just use it directly
            fields = data.__dataclass_fields__

        column_list = [f"'{field.name}'" for field in fields]
        value_list = []
        for field in fields:
            quotes = ""
            if field.type == str:
                quotes = "'"
            value_list.append(quotes + str(getattr(data, field.name)) + quotes)

        column_str = ', '.join(column_list)
        value_str = ', '.join(value_list)
        return f"INSERT INTO {table_name} ({column_str}) VALUES ({value_str})"


class DBStateAcessor(DB):
    """
    Helper class to write and read values into the status table
    """
    STATUS_TABLE_NAME = 'status'
    KEY_COL_NAME = 'keyName'
    VALUE_COL_NAME = 'value'

    def set_key(self, key: str, value: str):
        """
        Sets the provided key to the provided value.
        Args:
            key: key as string
            value: value as string
        """

        sql = f"""INSERT INTO {DBStateAcessor.STATUS_TABLE_NAME}
                                      ({DBStateAcessor.KEY_COL_NAME}, {DBStateAcessor.VALUE_COL_NAME})
                         VALUES ('{key}', '{value}') """

        # python 3.7 uses sqlite 3.21, which does not support the upsert functionality
        # with ON CONFLICT DO UPDATE SET
        # so we first have to check if the key exists and use update instead of insert
        if self.get_key(key):
            # update
            sql = f"""UPDATE {DBStateAcessor.STATUS_TABLE_NAME}
                        SET {DBStateAcessor.VALUE_COL_NAME} = '{value}'
                        WHERE {DBStateAcessor.KEY_COL_NAME} = '{key}'"""

        with self.get_connection() as conn:
            self.execute_single(sql, conn)

    def get_key(self, key: str) -> Optional[str]:
        """
        Reads the value of key from the status table or returns None if the key is not present
        Args:
            key: key to read

        Returns:
            str: the stored value or None
        """
        sql = f"""SELECT {DBStateAcessor.VALUE_COL_NAME}
                   FROM  {DBStateAcessor.STATUS_TABLE_NAME}
                   WHERE {DBStateAcessor.KEY_COL_NAME} = '{key}'"""
        result = self.execute_fetchall(sql)

        return None if len(result) == 0 else result[0][0]

Classes

class DB (db_dir='db/')

Base class for DB handling. Provides some basic functionality.

Expand source code
class DB(ABC):
    """
    Base class for DB handling. Provides some basic functionality.
    """

    def __init__(self, db_dir="db/"):
        self.db_dir = db_dir
        self.database = os.path.join(self.db_dir, 'secfsdstools.db')

    def db_file_exists(self) -> bool:
        """
        Checks if the configured db files is actually present.

        Returns:
            bool: returns True, if the dbfile was found
        """
        return os.path.exists(self.database)

    def table_exists(self, table_name: str) -> bool:
        """
        Checks whether a table exists.

        Returns:
            bool: True if the  table is present
        """
        if not self.db_file_exists():
            return False

        # using the sqlite_master table to check whether the table exists
        sql = f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'"
        return len(self.execute_fetchall(sql)) > 0

    def get_connection(self) -> sqlite3.Connection:
        """
        creates a connection to the db.

        Returns:
            sqlite3.Connection: sqlite3 connection instance
        """
        return sqlite3.connect(self.database)

    def execute_read_as_df(self, sql: str) -> pd.DataFrame:
        """
        directly read the content into a pandas dataframe
        Args:
             sql (str): Select String
        Returns:
            pd.DataFrame: pd.DataFrame
        """
        conn = self.get_connection()
        try:
            LOGGER.debug("execute %s", sql)
            return pd.read_sql_query(sql, conn)
        finally:
            conn.close()

    def execute_fetchall(self, sql: str) -> List[Tuple]:
        """
        returns all results of the sql
        Args:
             sql (str): sql statement
        Returns:
            List[Tuple]: list with tuples
        """
        conn = self.get_connection()
        try:
            LOGGER.debug("execute %s", sql)
            return conn.execute(sql).fetchall()
        finally:
            conn.close()

    def execute_fetchall_typed(self, sql: str, T) -> List[T]:  # pylint: disable=W0621,C0103
        """fetches all data of the sql statement and directly wraps it
        into the provided type.
        Note all selected columns in the sql have to exist with the same
         name in the dataclass of type T.

        Args:
             sql (str): sql string
             T: type class
        Returns:
             List[T]: list of instances of the type
        """
        conn = self.get_connection()
        try:
            LOGGER.debug("execute %s", sql)
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            cursor.execute(sql)
            results = cursor.fetchall()
            return [T(**dict(x)) for x in results]
        finally:
            conn.close()

    def execute_single(self, sql: str, conn: sqlite3.Connection):
        """
        executes a single sql statement without any parameters.
        Args:
             sql (str): sql string, not paramterized
             conn (sqlite3.Connection): connection to use
        """
        LOGGER.debug("execute %s", sql)
        conn.execute(sql)

    def execute_many(self, sql: str, params: List[Tuple], conn: sqlite3.Connection):
        """
        executes a parameterized statement for every tuple in the params list
        Args:
             sql (str): parameterized statement
             params (List[Tuple]): list with tuples containing the parameters
             conn (sqlite3.Connection): connection to use
        """
        LOGGER.debug("execute %s", sql)
        conn.executemany(sql, params)

    def append_df_to_table(self, table_name: str, dataframe: pd.DataFrame,
                           conn: sqlite3.Connection):
        """
        add the content of a df to the table. The name of the columns in df
        and table have to match

        Args:
             table_name (str): name of the table to append the data
             dataframe (pd.DataFrame):  the df with the data
             conn (sqlite3.Connection): connection to use
        """
        dataframe.to_sql(table_name, conn, if_exists="append", index=False)

    def create_insert_statement_for_dataclass(self, table_name: str, data) -> str:
        """
        creates the insert sql statement based on the fields of a dataclass

        Args:
             table_name (str): name of the table to insert into
             data: object of the dataclass
        Returns:
            str: 'insert into' statement
        """
        # todo: None handling
        fields: List[Field]
        if isinstance(data.__dataclass_fields__, dict):
            # __dataclass_fields__ is a dict, so you can use the
            # .values() method to get the Field objects
            fields = data.__dataclass_fields__.values()
        else:  # from python 3.10
            # __dataclass_fields__ is a tuple, so you can just use it directly
            fields = data.__dataclass_fields__

        column_list = [f"'{field.name}'" for field in fields]
        value_list = []
        for field in fields:
            quotes = ""
            if field.type == str:
                quotes = "'"
            value_list.append(quotes + str(getattr(data, field.name)) + quotes)

        column_str = ', '.join(column_list)
        value_str = ', '.join(value_list)
        return f"INSERT INTO {table_name} ({column_str}) VALUES ({value_str})"

Ancestors

  • abc.ABC

Subclasses

Methods

def append_df_to_table(self, table_name: str, dataframe: pandas.core.frame.DataFrame, conn: sqlite3.Connection)

add the content of a df to the table. The name of the columns in df and table have to match

Args

table_name : str
name of the table to append the data
dataframe : pd.DataFrame
the df with the data
conn : sqlite3.Connection
connection to use
Expand source code
def append_df_to_table(self, table_name: str, dataframe: pd.DataFrame,
                       conn: sqlite3.Connection):
    """
    add the content of a df to the table. The name of the columns in df
    and table have to match

    Args:
         table_name (str): name of the table to append the data
         dataframe (pd.DataFrame):  the df with the data
         conn (sqlite3.Connection): connection to use
    """
    dataframe.to_sql(table_name, conn, if_exists="append", index=False)
def create_insert_statement_for_dataclass(self, table_name: str, data) ‑> str

creates the insert sql statement based on the fields of a dataclass

Args

table_name : str
name of the table to insert into
data
object of the dataclass

Returns

str
'insert into' statement
Expand source code
def create_insert_statement_for_dataclass(self, table_name: str, data) -> str:
    """
    creates the insert sql statement based on the fields of a dataclass

    Args:
         table_name (str): name of the table to insert into
         data: object of the dataclass
    Returns:
        str: 'insert into' statement
    """
    # todo: None handling
    fields: List[Field]
    if isinstance(data.__dataclass_fields__, dict):
        # __dataclass_fields__ is a dict, so you can use the
        # .values() method to get the Field objects
        fields = data.__dataclass_fields__.values()
    else:  # from python 3.10
        # __dataclass_fields__ is a tuple, so you can just use it directly
        fields = data.__dataclass_fields__

    column_list = [f"'{field.name}'" for field in fields]
    value_list = []
    for field in fields:
        quotes = ""
        if field.type == str:
            quotes = "'"
        value_list.append(quotes + str(getattr(data, field.name)) + quotes)

    column_str = ', '.join(column_list)
    value_str = ', '.join(value_list)
    return f"INSERT INTO {table_name} ({column_str}) VALUES ({value_str})"
def db_file_exists(self) ‑> bool

Checks if the configured db files is actually present.

Returns

bool
returns True, if the dbfile was found
Expand source code
def db_file_exists(self) -> bool:
    """
    Checks if the configured db files is actually present.

    Returns:
        bool: returns True, if the dbfile was found
    """
    return os.path.exists(self.database)
def execute_fetchall(self, sql: str) ‑> List[Tuple[]]

returns all results of the sql

Args

sql : str
sql statement

Returns

List[Tuple]
list with tuples
Expand source code
def execute_fetchall(self, sql: str) -> List[Tuple]:
    """
    returns all results of the sql
    Args:
         sql (str): sql statement
    Returns:
        List[Tuple]: list with tuples
    """
    conn = self.get_connection()
    try:
        LOGGER.debug("execute %s", sql)
        return conn.execute(sql).fetchall()
    finally:
        conn.close()
def execute_fetchall_typed(self, sql: str, T) ‑> List[~T]

fetches all data of the sql statement and directly wraps it into the provided type. Note all selected columns in the sql have to exist with the same name in the dataclass of type T.

Args

sql : str
sql string
T
type class

Returns

List[T]
list of instances of the type
Expand source code
def execute_fetchall_typed(self, sql: str, T) -> List[T]:  # pylint: disable=W0621,C0103
    """fetches all data of the sql statement and directly wraps it
    into the provided type.
    Note all selected columns in the sql have to exist with the same
     name in the dataclass of type T.

    Args:
         sql (str): sql string
         T: type class
    Returns:
         List[T]: list of instances of the type
    """
    conn = self.get_connection()
    try:
        LOGGER.debug("execute %s", sql)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        cursor.execute(sql)
        results = cursor.fetchall()
        return [T(**dict(x)) for x in results]
    finally:
        conn.close()
def execute_many(self, sql: str, params: List[Tuple[]], conn: sqlite3.Connection)

executes a parameterized statement for every tuple in the params list

Args

sql : str
parameterized statement
params : List[Tuple]
list with tuples containing the parameters
conn : sqlite3.Connection
connection to use
Expand source code
def execute_many(self, sql: str, params: List[Tuple], conn: sqlite3.Connection):
    """
    executes a parameterized statement for every tuple in the params list
    Args:
         sql (str): parameterized statement
         params (List[Tuple]): list with tuples containing the parameters
         conn (sqlite3.Connection): connection to use
    """
    LOGGER.debug("execute %s", sql)
    conn.executemany(sql, params)
def execute_read_as_df(self, sql: str) ‑> pandas.core.frame.DataFrame

directly read the content into a pandas dataframe

Args

sql : str
Select String

Returns

pd.DataFrame
pd.DataFrame
Expand source code
def execute_read_as_df(self, sql: str) -> pd.DataFrame:
    """
    directly read the content into a pandas dataframe
    Args:
         sql (str): Select String
    Returns:
        pd.DataFrame: pd.DataFrame
    """
    conn = self.get_connection()
    try:
        LOGGER.debug("execute %s", sql)
        return pd.read_sql_query(sql, conn)
    finally:
        conn.close()
def execute_single(self, sql: str, conn: sqlite3.Connection)

executes a single sql statement without any parameters.

Args

sql : str
sql string, not paramterized
conn : sqlite3.Connection
connection to use
Expand source code
def execute_single(self, sql: str, conn: sqlite3.Connection):
    """
    executes a single sql statement without any parameters.
    Args:
         sql (str): sql string, not paramterized
         conn (sqlite3.Connection): connection to use
    """
    LOGGER.debug("execute %s", sql)
    conn.execute(sql)
def get_connection(self) ‑> sqlite3.Connection

creates a connection to the db.

Returns

sqlite3.Connection
sqlite3 connection instance
Expand source code
def get_connection(self) -> sqlite3.Connection:
    """
    creates a connection to the db.

    Returns:
        sqlite3.Connection: sqlite3 connection instance
    """
    return sqlite3.connect(self.database)
def table_exists(self, table_name: str) ‑> bool

Checks whether a table exists.

Returns

bool
True if the table is present
Expand source code
def table_exists(self, table_name: str) -> bool:
    """
    Checks whether a table exists.

    Returns:
        bool: True if the  table is present
    """
    if not self.db_file_exists():
        return False

    # using the sqlite_master table to check whether the table exists
    sql = f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'"
    return len(self.execute_fetchall(sql)) > 0
class DBStateAcessor (db_dir='db/')

Helper class to write and read values into the status table

Expand source code
class DBStateAcessor(DB):
    """
    Helper class to write and read values into the status table
    """
    STATUS_TABLE_NAME = 'status'
    KEY_COL_NAME = 'keyName'
    VALUE_COL_NAME = 'value'

    def set_key(self, key: str, value: str):
        """
        Sets the provided key to the provided value.
        Args:
            key: key as string
            value: value as string
        """

        sql = f"""INSERT INTO {DBStateAcessor.STATUS_TABLE_NAME}
                                      ({DBStateAcessor.KEY_COL_NAME}, {DBStateAcessor.VALUE_COL_NAME})
                         VALUES ('{key}', '{value}') """

        # python 3.7 uses sqlite 3.21, which does not support the upsert functionality
        # with ON CONFLICT DO UPDATE SET
        # so we first have to check if the key exists and use update instead of insert
        if self.get_key(key):
            # update
            sql = f"""UPDATE {DBStateAcessor.STATUS_TABLE_NAME}
                        SET {DBStateAcessor.VALUE_COL_NAME} = '{value}'
                        WHERE {DBStateAcessor.KEY_COL_NAME} = '{key}'"""

        with self.get_connection() as conn:
            self.execute_single(sql, conn)

    def get_key(self, key: str) -> Optional[str]:
        """
        Reads the value of key from the status table or returns None if the key is not present
        Args:
            key: key to read

        Returns:
            str: the stored value or None
        """
        sql = f"""SELECT {DBStateAcessor.VALUE_COL_NAME}
                   FROM  {DBStateAcessor.STATUS_TABLE_NAME}
                   WHERE {DBStateAcessor.KEY_COL_NAME} = '{key}'"""
        result = self.execute_fetchall(sql)

        return None if len(result) == 0 else result[0][0]

Ancestors

  • DB
  • abc.ABC

Class variables

var KEY_COL_NAME
var STATUS_TABLE_NAME
var VALUE_COL_NAME

Methods

def get_key(self, key: str) ‑> Optional[str]

Reads the value of key from the status table or returns None if the key is not present

Args

key
key to read

Returns

str
the stored value or None
Expand source code
def get_key(self, key: str) -> Optional[str]:
    """
    Reads the value of key from the status table or returns None if the key is not present
    Args:
        key: key to read

    Returns:
        str: the stored value or None
    """
    sql = f"""SELECT {DBStateAcessor.VALUE_COL_NAME}
               FROM  {DBStateAcessor.STATUS_TABLE_NAME}
               WHERE {DBStateAcessor.KEY_COL_NAME} = '{key}'"""
    result = self.execute_fetchall(sql)

    return None if len(result) == 0 else result[0][0]
def set_key(self, key: str, value: str)

Sets the provided key to the provided value.

Args

key
key as string
value
value as string
Expand source code
def set_key(self, key: str, value: str):
    """
    Sets the provided key to the provided value.
    Args:
        key: key as string
        value: value as string
    """

    sql = f"""INSERT INTO {DBStateAcessor.STATUS_TABLE_NAME}
                                  ({DBStateAcessor.KEY_COL_NAME}, {DBStateAcessor.VALUE_COL_NAME})
                     VALUES ('{key}', '{value}') """

    # python 3.7 uses sqlite 3.21, which does not support the upsert functionality
    # with ON CONFLICT DO UPDATE SET
    # so we first have to check if the key exists and use update instead of insert
    if self.get_key(key):
        # update
        sql = f"""UPDATE {DBStateAcessor.STATUS_TABLE_NAME}
                    SET {DBStateAcessor.VALUE_COL_NAME} = '{value}'
                    WHERE {DBStateAcessor.KEY_COL_NAME} = '{key}'"""

    with self.get_connection() as conn:
        self.execute_single(sql, conn)

Inherited members