Module secfsdstools.b_setup.setupdb
Creation of the database.
Expand source code
"""
Creation of the database.
"""
import glob
import logging
import os
import sqlite3
from typing import Dict
from secfsdstools.a_utils.dbutils import DB
CURRENT_DIR, CURRENT_FILE = os.path.split(__file__)
DDL_PATH = os.path.join(CURRENT_DIR, "sql")
LOGGER = logging.getLogger(__name__)
class DbCreator(DB):
"""
responsible to create the database.
"""
def __init__(self, db_dir: str):
super().__init__(db_dir=db_dir)
def add_column_if_not_exists(self, conn, table_name, column_name, data_type):
"""
adds a column to an existing table, if the column does not exist
Args:
conn: connection
table_name: table_name
column_name: column_name to add
data_type: data type of the column
"""
try:
cursor = conn.cursor()
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {data_type}")
conn.commit()
print(f"Column '{column_name}' added successfully.")
except sqlite3.OperationalError as exc:
if "duplicate column name" in str(exc):
print(f"Column '{column_name}' already exists.")
else:
print(f"An error occurred: {exc}")
def create_db(self):
"""
reads the ddl files from the ddl directory and creates the tables
"""
sqlfiles = list(glob.glob(f"{DDL_PATH}/*.sql"))
indexes_dict: Dict[int, str] = {}
for sqlfile in sqlfiles:
LOGGER.debug("extract version from sql file %s ... ", sqlfile)
index = int(sqlfile[sqlfile.rfind(f'{os.path.sep}V') + 2:sqlfile.find('__')])
LOGGER.debug(" ... extracted version: %d", index)
indexes_dict[index] = sqlfile
indexes = sorted(indexes_dict.keys())
if not os.path.isdir(self.db_dir):
LOGGER.info("creating folder for db: %s", self.db_dir)
os.makedirs(self.db_dir)
conn = self.get_connection()
curr = conn.cursor()
for index in indexes:
sqlfile = indexes_dict[index]
with open(sqlfile, 'r', encoding='utf8') as scriptfile:
script = scriptfile.read()
LOGGER.debug("execute creation script %s", sqlfile)
curr.executescript(script)
conn.commit()
# adding columns that do not exist - so far none are needed
conn.close()
Classes
class DbCreator (db_dir: str)
-
responsible to create the database.
Expand source code
class DbCreator(DB): """ responsible to create the database. """ def __init__(self, db_dir: str): super().__init__(db_dir=db_dir) def add_column_if_not_exists(self, conn, table_name, column_name, data_type): """ adds a column to an existing table, if the column does not exist Args: conn: connection table_name: table_name column_name: column_name to add data_type: data type of the column """ try: cursor = conn.cursor() cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {data_type}") conn.commit() print(f"Column '{column_name}' added successfully.") except sqlite3.OperationalError as exc: if "duplicate column name" in str(exc): print(f"Column '{column_name}' already exists.") else: print(f"An error occurred: {exc}") def create_db(self): """ reads the ddl files from the ddl directory and creates the tables """ sqlfiles = list(glob.glob(f"{DDL_PATH}/*.sql")) indexes_dict: Dict[int, str] = {} for sqlfile in sqlfiles: LOGGER.debug("extract version from sql file %s ... ", sqlfile) index = int(sqlfile[sqlfile.rfind(f'{os.path.sep}V') + 2:sqlfile.find('__')]) LOGGER.debug(" ... extracted version: %d", index) indexes_dict[index] = sqlfile indexes = sorted(indexes_dict.keys()) if not os.path.isdir(self.db_dir): LOGGER.info("creating folder for db: %s", self.db_dir) os.makedirs(self.db_dir) conn = self.get_connection() curr = conn.cursor() for index in indexes: sqlfile = indexes_dict[index] with open(sqlfile, 'r', encoding='utf8') as scriptfile: script = scriptfile.read() LOGGER.debug("execute creation script %s", sqlfile) curr.executescript(script) conn.commit() # adding columns that do not exist - so far none are needed conn.close()
Ancestors
- DB
- abc.ABC
Methods
def add_column_if_not_exists(self, conn, table_name, column_name, data_type)
-
adds a column to an existing table, if the column does not exist
Args
conn
- connection
table_name
- table_name
column_name
- column_name to add
data_type
- data type of the column
Expand source code
def add_column_if_not_exists(self, conn, table_name, column_name, data_type): """ adds a column to an existing table, if the column does not exist Args: conn: connection table_name: table_name column_name: column_name to add data_type: data type of the column """ try: cursor = conn.cursor() cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {data_type}") conn.commit() print(f"Column '{column_name}' added successfully.") except sqlite3.OperationalError as exc: if "duplicate column name" in str(exc): print(f"Column '{column_name}' already exists.") else: print(f"An error occurred: {exc}")
def create_db(self)
-
reads the ddl files from the ddl directory and creates the tables
Expand source code
def create_db(self): """ reads the ddl files from the ddl directory and creates the tables """ sqlfiles = list(glob.glob(f"{DDL_PATH}/*.sql")) indexes_dict: Dict[int, str] = {} for sqlfile in sqlfiles: LOGGER.debug("extract version from sql file %s ... ", sqlfile) index = int(sqlfile[sqlfile.rfind(f'{os.path.sep}V') + 2:sqlfile.find('__')]) LOGGER.debug(" ... extracted version: %d", index) indexes_dict[index] = sqlfile indexes = sorted(indexes_dict.keys()) if not os.path.isdir(self.db_dir): LOGGER.info("creating folder for db: %s", self.db_dir) os.makedirs(self.db_dir) conn = self.get_connection() curr = conn.cursor() for index in indexes: sqlfile = indexes_dict[index] with open(sqlfile, 'r', encoding='utf8') as scriptfile: script = scriptfile.read() LOGGER.debug("execute creation script %s", sqlfile) curr.executescript(script) conn.commit() # adding columns that do not exist - so far none are needed conn.close()
Inherited members