Module secfsdstools.u_usecases.bulk_loading

Contains basic logic that produces separated databags for every of the main financial statements (BS, CF, IS) containing data from all available zip files.

The same logic is also contained and further explained in the 06_bulk_data_processing_deep_dive.ipynb notebook. Please have a look at this notebook for a detailed explanation of the logic

Expand source code
"""
Contains basic logic that produces separated databags for every of the main financial
statements (BS, CF, IS) containing data from all available zip files.

The same logic is also contained and further explained in the
06_bulk_data_processing_deep_dive.ipynb notebook.
Please have a look at this notebook for a detailed explanation of the logic
"""
import os
from glob import glob
from typing import Callable, Optional
from typing import List

from secfsdstools.a_config.configmgt import ConfigurationManager
from secfsdstools.c_index.indexdataaccess import ParquetDBIndexingAccessor
from secfsdstools.d_container.databagmodel import RawDataBag, JoinedDataBag
from secfsdstools.e_collector.zipcollecting import ZipCollector


def default_postloadfilter(databag: RawDataBag) -> RawDataBag:
    """
    defines a default post pathfilter method that can be used ba ZipCollectors.
    It combines the filters:
            ReportPeriodRawFilter, MainCoregRawFilter, OfficialTagsOnlyRawFilter, USDOnlyRawFilter
    """
    # pylint: disable=C0415
    from secfsdstools.e_filter.rawfiltering import ReportPeriodRawFilter, MainCoregRawFilter, \
        OfficialTagsOnlyRawFilter, USDOnlyRawFilter

    return databag[ReportPeriodRawFilter()][MainCoregRawFilter()][OfficialTagsOnlyRawFilter()][
        USDOnlyRawFilter()]


def save_databag(databag: RawDataBag, base_path: str, sub_path: str) -> JoinedDataBag:
    """
    helper method to save the RawDataBag and the joined version of it under a certain base_path
    and sub_path.

    the target path for the rawdatabag is <base_path>/<sub_path>/raw.
    the target path for the joineddatabag is <base_path>/<sub_path>/joined.

    Args:
        databag: databag to be saved
        base_path: base path under which the data will be stored
        sub_path: sub path under which the data will be stored

    Returns:
        JoinedDataBag: the joined databag that was created during the save process
    """

    target_path_raw = os.path.join(base_path, sub_path, 'raw')
    print(f"store rawdatabag under {target_path_raw}")
    os.makedirs(target_path_raw, exist_ok=True)
    databag.save(target_path_raw)

    target_path_joined = os.path.join(base_path, sub_path, 'joined')
    os.makedirs(target_path_joined, exist_ok=True)
    print("create joined databag")
    joined_databag = databag.join()

    print(f"store joineddatabag under {target_path_joined}")
    joined_databag.save(target_path_joined)
    return joined_databag


def load_all_financial_statements_parallel(
        financial_statement: str,
        post_load_filter: Optional[Callable[[RawDataBag], RawDataBag]] = None) -> RawDataBag:
    """
    loads the data for a certain statement (e.g. BS, CF, IS, ...) from all availalbe zip files
    and returns a single RawDataBag with all information.
    it filters for 10-K and 10-Q reports.

    Args:
        financial_statement (str): the statement you want to read the data for "BS", "CF", "IS"
        post_load_filter (Callable, optional): a post_load_filter method that is applied after
         loading of every zip file

    Returns:
        RawDataBag: the databag with the read data

    """

    collector: ZipCollector = ZipCollector.get_all_zips(forms_filter=["10-K", "10-Q"],
                                                        stmt_filter=[financial_statement],
                                                        post_load_filter=post_load_filter)
    return collector.collect()


def create_datasets_for_main_statements_parallel(base_path: str = "./set/parallel/"):
    """
    Creates the raw and joined datasets for all the three main statements: BS, CF, IS.

    The data from the different zip files are loaded in parallel. Therefore, about 16GB
    of free memory is needed.

    the created folder hiearchy looks as follows:
    <pre>
        - <base_path>
          - BS
            - raw
            - joined
          - CF
            - raw
            - joined
          - IS
            - raw
            - joined
    </pre>
    """

    for statement_to_load in ["BS", "CF", "IS"]:
        print("load data for ", statement_to_load)
        rawdatabag = load_all_financial_statements_parallel(
            financial_statement=statement_to_load,
            post_load_filter=default_postloadfilter
        )
        save_databag(databag=rawdatabag, base_path=base_path, sub_path=statement_to_load)


def read_all_zip_names() -> List[str]:
    """
    Returns a list with all available zip-file names.

    Returns:
        List[str]: list with the names of the available zip files
    """
    configuration = ConfigurationManager.read_config_file()
    dbaccessor = ParquetDBIndexingAccessor(db_dir=configuration.db_dir)

    # exclude 2009q1.zip, since this is empty and causes an error when it is read with a pathfilter
    return [x.fileName for x in dbaccessor.read_all_indexfileprocessing() if
            not x.fullPath.endswith("2009q1.zip")]


def build_tmp_set(financial_statement: str,
                  file_names: List[str],
                  base_path: str = "set/tmp/",
                  post_load_filter: Optional[Callable[[RawDataBag], RawDataBag]] = None):
    """
    This function reads the data in sequence from the provided list of zip file names.
    It filters according to the defined financial_statement and stores the data in
    specific subfolders.

    the folder structure will look like
    <target_path>/<file_name>/<financial_statement>/raw
    <target_path>/<file_name>/<financial_statement>/joined

    Args:
        financial_statement (str): the statement you want to read the data for "BS", "CF", "IS"
        post_load_filter (Callable, optional): a post_load_filter method that is applied after
         loading of every zip file
        file_names (List[str]): List with the filenames to be processed
        base_path (str): base_path under which the process data is saved.
    """

    for file_name in file_names:
        collector = ZipCollector.get_zip_by_name(name=file_name,
                                                 forms_filter=["10-K", "10-Q"],
                                                 stmt_filter=[financial_statement],
                                                 post_load_filter=post_load_filter)

        rawdatabag = collector.collect()

        target_path = os.path.join(base_path, file_name)
        # saving the raw databag, joining and saving the joined databag
        save_databag(databag=rawdatabag, base_path=target_path, sub_path=financial_statement)


def create_rawdatabag(financial_statement: str,
                      tmp_path: str = "set/tmp/",
                      target_path: str = "set/serial/"):
    """
    Concatenates the preprocessed and by statement separated rawdatabags into a single databag.

    Args:
        financial_statement: the statement for which data has to be concatenated.
        tmp_path: the path where the temporary files are stored
        target_path: the target path of the daset
    """
    raw_files = glob(f"{tmp_path}/*/{financial_statement}/raw/", recursive=True)
    raw_databags = [RawDataBag.load(file) for file in raw_files]
    raw_databag = RawDataBag.concat(raw_databags)
    target_path_raw = os.path.join(target_path, financial_statement, 'raw')
    print(f"store rawdatabag under {target_path_raw}")
    os.makedirs(target_path_raw, exist_ok=True)
    raw_databag.save(target_path_raw)


def create_joineddatabag(financial_statement: str,
                         tmp_path: str = "set/tmp/",
                         target_path: str = "set/serial/"):
    """
    Concatenates the preprocessed and by statement separated joineddatabag into a single databag.

    Args:
        financial_statement: the statement for which data has to be concatenated.
        tmp_path: the path where the temporary files are stored
        target_path: the target path of the daset
    """

    joined_files = glob(f"{tmp_path}/*/{financial_statement}/joined/", recursive=True)
    joined_databags = [JoinedDataBag.load(file) for file in joined_files]
    joined_databag = JoinedDataBag.concat(joined_databags)
    target_path_joined = os.path.join(target_path, financial_statement, 'joined')
    print(f"store joineddatabag under {target_path_joined}")
    os.makedirs(target_path_joined, exist_ok=True)
    joined_databag.save(target_path_joined)


def create_datasets_for_main_statements_serial(target_path: str = "set/parallel/",
                                               tmp_path: str = "set/tmp"):
    """
    Creates the rawdatabag and joineddatabag for all three main financial statements (BS, CF, IS).

    It is done in serial manner, and therefore needs less resources than the parallel approach.

    the created folder hiearchy looks as follows:
    <pre>
        - <target_path>
          - BS
            - raw
            - joined
          - CF
            - raw
            - joined
          - IS
            - raw
            - joined
    </pre>
    """
    file_names = read_all_zip_names()

    # create the temporary datasets
    # Note: calling the build_tmp_set doesn't needs a lot of memory
    build_tmp_set(financial_statement="BS", file_names=file_names,
                  post_load_filter=default_postloadfilter, base_path=tmp_path)
    build_tmp_set(financial_statement="IS", file_names=file_names,
                  post_load_filter=default_postloadfilter, base_path=tmp_path)
    build_tmp_set(financial_statement="CF", file_names=file_names,
                  post_load_filter=default_postloadfilter, base_path=tmp_path)

    # Note: calling the create_rawdatabag needs about 8-10 GB of free memory.
    #       the memory should be garbage collected between the different create_rawdatabag calls
    create_rawdatabag(financial_statement="BS", target_path=target_path, tmp_path=tmp_path)
    create_rawdatabag(financial_statement="IS", target_path=target_path, tmp_path=tmp_path)
    create_rawdatabag(financial_statement="CF", target_path=target_path, tmp_path=tmp_path)

    # Note: calling the create_joinedatabag needs about 4 GB of free memory.
    #       the memory should be garbage collected between the different create_joineddatabag calls
    create_joineddatabag(financial_statement="BS", target_path=target_path, tmp_path=tmp_path)
    create_joineddatabag(financial_statement="IS", target_path=target_path, tmp_path=tmp_path)
    create_joineddatabag(financial_statement="CF", target_path=target_path, tmp_path=tmp_path)


if __name__ == '__main__':
    print("depending on your hardware resources, run the parallel or the serial logic.",
          "Just uncommented the desired line of code..")
    # create_datasets_for_main_statements_parallel()
    # create_datasets_for_main_statements_serial()

Functions

def build_tmp_set(financial_statement: str, file_names: List[str], base_path: str = 'set/tmp/', post_load_filter: Optional[Callable[[RawDataBag], RawDataBag]] = None)

This function reads the data in sequence from the provided list of zip file names. It filters according to the defined financial_statement and stores the data in specific subfolders.

the folder structure will look like ///raw ///joined

Args

financial_statement : str
the statement you want to read the data for "BS", "CF", "IS"
post_load_filter : Callable, optional
a post_load_filter method that is applied after
loading of every zip file
file_names : List[str]
List with the filenames to be processed
base_path : str
base_path under which the process data is saved.
Expand source code
def build_tmp_set(financial_statement: str,
                  file_names: List[str],
                  base_path: str = "set/tmp/",
                  post_load_filter: Optional[Callable[[RawDataBag], RawDataBag]] = None):
    """
    This function reads the data in sequence from the provided list of zip file names.
    It filters according to the defined financial_statement and stores the data in
    specific subfolders.

    the folder structure will look like
    <target_path>/<file_name>/<financial_statement>/raw
    <target_path>/<file_name>/<financial_statement>/joined

    Args:
        financial_statement (str): the statement you want to read the data for "BS", "CF", "IS"
        post_load_filter (Callable, optional): a post_load_filter method that is applied after
         loading of every zip file
        file_names (List[str]): List with the filenames to be processed
        base_path (str): base_path under which the process data is saved.
    """

    for file_name in file_names:
        collector = ZipCollector.get_zip_by_name(name=file_name,
                                                 forms_filter=["10-K", "10-Q"],
                                                 stmt_filter=[financial_statement],
                                                 post_load_filter=post_load_filter)

        rawdatabag = collector.collect()

        target_path = os.path.join(base_path, file_name)
        # saving the raw databag, joining and saving the joined databag
        save_databag(databag=rawdatabag, base_path=target_path, sub_path=financial_statement)
def create_datasets_for_main_statements_parallel(base_path: str = './set/parallel/')

Creates the raw and joined datasets for all the three main statements: BS, CF, IS.

The data from the different zip files are loaded in parallel. Therefore, about 16GB of free memory is needed.

the created folder hiearchy looks as follows:

    - 
      - BS
        - raw
        - joined
      - CF
        - raw
        - joined
      - IS
        - raw
        - joined
Expand source code
def create_datasets_for_main_statements_parallel(base_path: str = "./set/parallel/"):
    """
    Creates the raw and joined datasets for all the three main statements: BS, CF, IS.

    The data from the different zip files are loaded in parallel. Therefore, about 16GB
    of free memory is needed.

    the created folder hiearchy looks as follows:
    <pre>
        - <base_path>
          - BS
            - raw
            - joined
          - CF
            - raw
            - joined
          - IS
            - raw
            - joined
    </pre>
    """

    for statement_to_load in ["BS", "CF", "IS"]:
        print("load data for ", statement_to_load)
        rawdatabag = load_all_financial_statements_parallel(
            financial_statement=statement_to_load,
            post_load_filter=default_postloadfilter
        )
        save_databag(databag=rawdatabag, base_path=base_path, sub_path=statement_to_load)
def create_datasets_for_main_statements_serial(target_path: str = 'set/parallel/', tmp_path: str = 'set/tmp')

Creates the rawdatabag and joineddatabag for all three main financial statements (BS, CF, IS).

It is done in serial manner, and therefore needs less resources than the parallel approach.

the created folder hiearchy looks as follows:

    - 
      - BS
        - raw
        - joined
      - CF
        - raw
        - joined
      - IS
        - raw
        - joined
Expand source code
def create_datasets_for_main_statements_serial(target_path: str = "set/parallel/",
                                               tmp_path: str = "set/tmp"):
    """
    Creates the rawdatabag and joineddatabag for all three main financial statements (BS, CF, IS).

    It is done in serial manner, and therefore needs less resources than the parallel approach.

    the created folder hiearchy looks as follows:
    <pre>
        - <target_path>
          - BS
            - raw
            - joined
          - CF
            - raw
            - joined
          - IS
            - raw
            - joined
    </pre>
    """
    file_names = read_all_zip_names()

    # create the temporary datasets
    # Note: calling the build_tmp_set doesn't needs a lot of memory
    build_tmp_set(financial_statement="BS", file_names=file_names,
                  post_load_filter=default_postloadfilter, base_path=tmp_path)
    build_tmp_set(financial_statement="IS", file_names=file_names,
                  post_load_filter=default_postloadfilter, base_path=tmp_path)
    build_tmp_set(financial_statement="CF", file_names=file_names,
                  post_load_filter=default_postloadfilter, base_path=tmp_path)

    # Note: calling the create_rawdatabag needs about 8-10 GB of free memory.
    #       the memory should be garbage collected between the different create_rawdatabag calls
    create_rawdatabag(financial_statement="BS", target_path=target_path, tmp_path=tmp_path)
    create_rawdatabag(financial_statement="IS", target_path=target_path, tmp_path=tmp_path)
    create_rawdatabag(financial_statement="CF", target_path=target_path, tmp_path=tmp_path)

    # Note: calling the create_joinedatabag needs about 4 GB of free memory.
    #       the memory should be garbage collected between the different create_joineddatabag calls
    create_joineddatabag(financial_statement="BS", target_path=target_path, tmp_path=tmp_path)
    create_joineddatabag(financial_statement="IS", target_path=target_path, tmp_path=tmp_path)
    create_joineddatabag(financial_statement="CF", target_path=target_path, tmp_path=tmp_path)
def create_joineddatabag(financial_statement: str, tmp_path: str = 'set/tmp/', target_path: str = 'set/serial/')

Concatenates the preprocessed and by statement separated joineddatabag into a single databag.

Args

financial_statement
the statement for which data has to be concatenated.
tmp_path
the path where the temporary files are stored
target_path
the target path of the daset
Expand source code
def create_joineddatabag(financial_statement: str,
                         tmp_path: str = "set/tmp/",
                         target_path: str = "set/serial/"):
    """
    Concatenates the preprocessed and by statement separated joineddatabag into a single databag.

    Args:
        financial_statement: the statement for which data has to be concatenated.
        tmp_path: the path where the temporary files are stored
        target_path: the target path of the daset
    """

    joined_files = glob(f"{tmp_path}/*/{financial_statement}/joined/", recursive=True)
    joined_databags = [JoinedDataBag.load(file) for file in joined_files]
    joined_databag = JoinedDataBag.concat(joined_databags)
    target_path_joined = os.path.join(target_path, financial_statement, 'joined')
    print(f"store joineddatabag under {target_path_joined}")
    os.makedirs(target_path_joined, exist_ok=True)
    joined_databag.save(target_path_joined)
def create_rawdatabag(financial_statement: str, tmp_path: str = 'set/tmp/', target_path: str = 'set/serial/')

Concatenates the preprocessed and by statement separated rawdatabags into a single databag.

Args

financial_statement
the statement for which data has to be concatenated.
tmp_path
the path where the temporary files are stored
target_path
the target path of the daset
Expand source code
def create_rawdatabag(financial_statement: str,
                      tmp_path: str = "set/tmp/",
                      target_path: str = "set/serial/"):
    """
    Concatenates the preprocessed and by statement separated rawdatabags into a single databag.

    Args:
        financial_statement: the statement for which data has to be concatenated.
        tmp_path: the path where the temporary files are stored
        target_path: the target path of the daset
    """
    raw_files = glob(f"{tmp_path}/*/{financial_statement}/raw/", recursive=True)
    raw_databags = [RawDataBag.load(file) for file in raw_files]
    raw_databag = RawDataBag.concat(raw_databags)
    target_path_raw = os.path.join(target_path, financial_statement, 'raw')
    print(f"store rawdatabag under {target_path_raw}")
    os.makedirs(target_path_raw, exist_ok=True)
    raw_databag.save(target_path_raw)
def default_postloadfilter(databag: RawDataBag) ‑> RawDataBag

defines a default post pathfilter method that can be used ba ZipCollectors. It combines the filters: ReportPeriodRawFilter, MainCoregRawFilter, OfficialTagsOnlyRawFilter, USDOnlyRawFilter

Expand source code
def default_postloadfilter(databag: RawDataBag) -> RawDataBag:
    """
    defines a default post pathfilter method that can be used ba ZipCollectors.
    It combines the filters:
            ReportPeriodRawFilter, MainCoregRawFilter, OfficialTagsOnlyRawFilter, USDOnlyRawFilter
    """
    # pylint: disable=C0415
    from secfsdstools.e_filter.rawfiltering import ReportPeriodRawFilter, MainCoregRawFilter, \
        OfficialTagsOnlyRawFilter, USDOnlyRawFilter

    return databag[ReportPeriodRawFilter()][MainCoregRawFilter()][OfficialTagsOnlyRawFilter()][
        USDOnlyRawFilter()]
def load_all_financial_statements_parallel(financial_statement: str, post_load_filter: Optional[Callable[[RawDataBag], RawDataBag]] = None) ‑> RawDataBag

loads the data for a certain statement (e.g. BS, CF, IS, …) from all availalbe zip files and returns a single RawDataBag with all information. it filters for 10-K and 10-Q reports.

Args

financial_statement : str
the statement you want to read the data for "BS", "CF", "IS"
post_load_filter : Callable, optional
a post_load_filter method that is applied after

loading of every zip file

Returns

RawDataBag
the databag with the read data
Expand source code
def load_all_financial_statements_parallel(
        financial_statement: str,
        post_load_filter: Optional[Callable[[RawDataBag], RawDataBag]] = None) -> RawDataBag:
    """
    loads the data for a certain statement (e.g. BS, CF, IS, ...) from all availalbe zip files
    and returns a single RawDataBag with all information.
    it filters for 10-K and 10-Q reports.

    Args:
        financial_statement (str): the statement you want to read the data for "BS", "CF", "IS"
        post_load_filter (Callable, optional): a post_load_filter method that is applied after
         loading of every zip file

    Returns:
        RawDataBag: the databag with the read data

    """

    collector: ZipCollector = ZipCollector.get_all_zips(forms_filter=["10-K", "10-Q"],
                                                        stmt_filter=[financial_statement],
                                                        post_load_filter=post_load_filter)
    return collector.collect()
def read_all_zip_names() ‑> List[str]

Returns a list with all available zip-file names.

Returns

List[str]
list with the names of the available zip files
Expand source code
def read_all_zip_names() -> List[str]:
    """
    Returns a list with all available zip-file names.

    Returns:
        List[str]: list with the names of the available zip files
    """
    configuration = ConfigurationManager.read_config_file()
    dbaccessor = ParquetDBIndexingAccessor(db_dir=configuration.db_dir)

    # exclude 2009q1.zip, since this is empty and causes an error when it is read with a pathfilter
    return [x.fileName for x in dbaccessor.read_all_indexfileprocessing() if
            not x.fullPath.endswith("2009q1.zip")]
def save_databag(databag: RawDataBag, base_path: str, sub_path: str) ‑> JoinedDataBag

helper method to save the RawDataBag and the joined version of it under a certain base_path and sub_path.

the target path for the rawdatabag is //raw. the target path for the joineddatabag is //joined.

Args

databag
databag to be saved
base_path
base path under which the data will be stored
sub_path
sub path under which the data will be stored

Returns

JoinedDataBag
the joined databag that was created during the save process
Expand source code
def save_databag(databag: RawDataBag, base_path: str, sub_path: str) -> JoinedDataBag:
    """
    helper method to save the RawDataBag and the joined version of it under a certain base_path
    and sub_path.

    the target path for the rawdatabag is <base_path>/<sub_path>/raw.
    the target path for the joineddatabag is <base_path>/<sub_path>/joined.

    Args:
        databag: databag to be saved
        base_path: base path under which the data will be stored
        sub_path: sub path under which the data will be stored

    Returns:
        JoinedDataBag: the joined databag that was created during the save process
    """

    target_path_raw = os.path.join(base_path, sub_path, 'raw')
    print(f"store rawdatabag under {target_path_raw}")
    os.makedirs(target_path_raw, exist_ok=True)
    databag.save(target_path_raw)

    target_path_joined = os.path.join(base_path, sub_path, 'joined')
    os.makedirs(target_path_joined, exist_ok=True)
    print("create joined databag")
    joined_databag = databag.join()

    print(f"store joineddatabag under {target_path_joined}")
    joined_databag.save(target_path_joined)
    return joined_databag