Module secfsdstools.x_examples.automation.memory_optimized_daily_automation

This module shows the automation to add additional steps after the usual update process (which is downloading new zip files, transforming them to parquet, indexding them).

This example has a low memory footprint, so it is safe to use also on systems with 16GB of memory. It also activates daily processing.

If you update the configuration as defined below, you will get the following datasets:

  • for quarterly data provided and parsed by the SEC:

    • single filtered and joined bags for every stmt (BS, IS, CF, ..) containing the data from all available zip files.
    • a single filtered and joined bag containing the data from all available zip files.
    • single standardized bags for BS, IS, CF containing the data from all available zip files.
  • for daily data provided by the SEC and parsed locally with the secdaily package (not yet available as quaterly data):

    • single filtered and joined bags for every stmt (BS, IS, CF, ..) containing daily parsed data.
    • a single filtered and joined bag containing the data from all daily parsed data.
    • single standardized bags for BS, IS, CF containing the data from all daily parsed data.
  • combined quarterly and daily data:

    • single filtered and joined bags for every stmt (BS, IS, CF, ..) from all available data
    • a single filtered and joined bag containing the data from all available data.
    • single standardized bags for BS, IS, CF containing the data from all available data.

Moreover, these files will be automatically updated as soon as a new quarterly zip file or a new daily data becomes available at the SEC website.

You can configure this function in the secfsdstools configuration file, by adding a postupdateprocesses definition. For instance, if you want to use this example, just add the postupdateprocesses definition as shown below:

[DEFAULT]
downloaddirectory = ...
dbdirectory = ...
parquetdirectory = ...
useragentemail = ...
autoupdate = True
keepzipfiles = False

postupdateprocesses=secfsdstools.x_examples.automation.memory_optimized_daily_automation.define_extra_processes

# activate daily processing
dailyprocessing = True

If you want to use it, you also need to add additional configuration entries as shown below:

[Filter]
filtered_quarterly_joined_by_stmt_dir = C:/data/sec/automated/_1_by_quarter/_1_filtered_joined_by_stmt
filtered_daily_joined_by_stmt_dir = C:/data/sec/automated/_1_by_day/_1_filtered_joined_by_stmt
parallelize = True

[Standardizer]
standardized_quarterly_by_stmt_dir = C:/data/sec/automated/_1_by_quarter/_2_standardized_by_stmt
standardized_daily_by_stmt_dir = C:/data/sec/automated/_1_by_day/_2_standardized_by_stmt

[Concat]
concat_quarterly_joined_by_stmt_dir = C:/data/sec/automated/_2_all_quarter/_1_joined_by_stmt
concat_daily_joined_by_stmt_dir = C:/data/sec/automated/_2_all_day/_1_joined_by_stmt

concat_quarterly_joined_all_dir = C:/data/sec/automated/_2_all_quarter/_2_joined
concat_daily_joined_all_dir = C:/data/sec/automated/_2_all_day/_2_joined

concat_quarterly_standardized_by_stmt_dir = C:/data/sec/automated/_2_all_quarter/_3_standardized_by_stmt
concat_daily_standardized_by_stmt_dir = C:/data/sec/automated/_2_all_day/_3_standardized_by_stmt

concat_all_joined_by_stmt_dir = C:/data/sec/automated/_3_all/_1_joined_by_stmt
concat_all_joined_dir = C:/data/sec/automated/_3_all/_2_joined
concat_all_standardized_by_stmt_dir = C:/data/sec/automated/_3_all/_3_standardized_by_stmt

(A complete configuration file using the "define_extra_processes" function is available in the file memory_optimized_daily_automation_config.cfg which is in the same package as this module here.)

This example adds the following main steps to the usual update process.

  • for quarterly data provided and parsed by the SEC, as well as daily data not yet part of the quarterly data:

    First, it creates a joined bag for every zip file, filters it for 10-K and 10-Q reports only and also applies the filters ReportPeriodRawFilter, MainCoregRawFilter, USDOnlyRawFilter, OfficialTagsOnlyRawFilter. Furthermore, the data will also be split by stmt. Location: filtered_[quarter/daily]_joined_by_stmt_dir. Note: setting "parallelize" in the config to False, will be slower in the initial loading but using less memory.

    Second, it produces standardized bags for BS, IS, CF for every zip file based on the filtered data from the previous step. These bags are stored under the path defined as standardized_[quarterly/daily]_by_stmt_dir.

    Third, it creates a single joined bag for every statement (balance sheet, income statement, cash flow, …) based on the filtered data from the first step. These bags are stored under the path defined as concat_[quarterly/daily]_joined_by_stmt_dir.

    Fourth, it will create a single bag with all data from all different statements, by merging the bags from the previous step into a single big bag. This bag will be stored under the path defined as concat_[quarterly,daily]_joined_all_dir.

    Fifth, it will create single standardized bags for BS, IS, CF from the results in the second step. These bags will be stored under the path defined as concat_[quarterly/daily]_standardized_by_stmt_dir.

  • combined quarterly and daily data:

    First, it creates a single joined bag for every statement (balance sheet, income statement, cash flow, …) containing quarterly and daily data. These bags are stored under the path defined as concat_all_joined_by_stmt_dir.

    Second, it will create a single bag with all data from all different statements, by merging the bags from the previous step into a single big bag. This bag will be stored under the path defined as concat_all_joined_all_dir.

    Third, it will create single standardized bags for BS, IS, CF containing data from both quarterly and daily data. These bags will be stored under the path defined as concat_all_standardized_by_stmt_dir.

All this steps use basic implementations of the AbstractProcess class from the secfsdstools.g_pipeline package.

Furthermore, all these steps check if something changed since the last run and are only executed if something did change (for instance, if a new data became available).

Have also a look at the notebook 08_00_automation_basics.

Expand source code
# pylint: disable=C0301
"""
This module shows the automation to add additional steps after the usual update process
(which is downloading new zip files, transforming them to parquet, indexding them).

This example has a low memory footprint, so it is safe to use also on systems with 16GB
of memory. It also activates daily processing.

If you update the configuration as defined below, you will get the following datasets:

- for quarterly data provided and parsed by the SEC:
    - single filtered and joined bags for every stmt (BS, IS, CF, ..) containing the data from
      all available zip files.
    - a single filtered and joined bag containing the data from all available zip files.
    - single standardized bags for BS, IS, CF containing the data from all available zip files.

- for daily data provided by the SEC and parsed locally with the `secdaily` package
  (not yet available as quaterly data):
    - single filtered and joined bags for every stmt (BS, IS, CF, ..) containing daily parsed data.
    - a single filtered and joined bag containing the data from all daily parsed data.
    - single standardized bags for BS, IS, CF containing the data from all daily parsed data.

- combined quarterly and daily data:
    - single filtered and joined bags for every stmt (BS, IS, CF, ..) from all available data
    - a single filtered and joined bag containing the data from all available data.
    - single standardized bags for BS, IS, CF containing the data from all available data.


Moreover, these files will be automatically updated as soon as a new quarterly zip file or a new daily data
becomes available at the SEC website.

You can configure this function in the secfsdstools configuration file, by adding
a postupdateprocesses definition. For instance, if you want to use this example,
just add the postupdateprocesses definition as shown below:

<pre>
[DEFAULT]
downloaddirectory = ...
dbdirectory = ...
parquetdirectory = ...
useragentemail = ...
autoupdate = True
keepzipfiles = False

postupdateprocesses=secfsdstools.x_examples.automation.memory_optimized_daily_automation.define_extra_processes

# activate daily processing
dailyprocessing = True

</pre>

If you want to use it, you also need to add additional configuration entries as shown below:

<pre>
[Filter]
filtered_quarterly_joined_by_stmt_dir = C:/data/sec/automated/_1_by_quarter/_1_filtered_joined_by_stmt
filtered_daily_joined_by_stmt_dir = C:/data/sec/automated/_1_by_day/_1_filtered_joined_by_stmt
parallelize = True

[Standardizer]
standardized_quarterly_by_stmt_dir = C:/data/sec/automated/_1_by_quarter/_2_standardized_by_stmt
standardized_daily_by_stmt_dir = C:/data/sec/automated/_1_by_day/_2_standardized_by_stmt

[Concat]
concat_quarterly_joined_by_stmt_dir = C:/data/sec/automated/_2_all_quarter/_1_joined_by_stmt
concat_daily_joined_by_stmt_dir = C:/data/sec/automated/_2_all_day/_1_joined_by_stmt

concat_quarterly_joined_all_dir = C:/data/sec/automated/_2_all_quarter/_2_joined
concat_daily_joined_all_dir = C:/data/sec/automated/_2_all_day/_2_joined

concat_quarterly_standardized_by_stmt_dir = C:/data/sec/automated/_2_all_quarter/_3_standardized_by_stmt
concat_daily_standardized_by_stmt_dir = C:/data/sec/automated/_2_all_day/_3_standardized_by_stmt

concat_all_joined_by_stmt_dir = C:/data/sec/automated/_3_all/_1_joined_by_stmt
concat_all_joined_dir = C:/data/sec/automated/_3_all/_2_joined
concat_all_standardized_by_stmt_dir = C:/data/sec/automated/_3_all/_3_standardized_by_stmt

</pre>

(A complete configuration file using the "define_extra_processes" function is available in the file
 memory_optimized_daily_automation_config.cfg which is in the same package as this module here.)

This example adds the following main steps to the usual update process.

- for quarterly data provided and parsed by the SEC, as well as daily data not yet part of the quarterly data:

    First, it creates a joined bag for every zip file, filters it for 10-K and 10-Q reports only
    and also applies the filters  ReportPeriodRawFilter, MainCoregRawFilter, USDOnlyRawFilter,
    OfficialTagsOnlyRawFilter. Furthermore, the data will also be split by stmt.
    Location: filtered_[quarter/daily]_joined_by_stmt_dir.
    Note: setting "parallelize" in the config to False, will be slower in the initial loading
    but using less memory.

    Second, it produces standardized bags for BS, IS, CF for every zip file based on the filtered
    data from the previous step. These bags are stored under the path defined as
    `standardized_[quarterly/daily]_by_stmt_dir`.

    Third, it creates a single joined bag for every statement (balance sheet, income statement,
    cash flow, ...) based on the filtered data from the first step.
    These bags are stored under the path defined as `concat_[quarterly/daily]_joined_by_stmt_dir`.

    Fourth, it will create a single bag with all data from all different statements, by merging the
    bags from the previous step into a single big bag. This bag will be stored under the path defined
    as `concat_[quarterly,daily]_joined_all_dir`.

    Fifth, it will create single standardized bags for BS, IS, CF from the results in the second step.
    These bags will be stored under the path defined as `concat_[quarterly/daily]_standardized_by_stmt_dir`.

- combined quarterly and daily data:

    First, it creates a single joined bag for every statement (balance sheet, income statement,
    cash flow, ...) containing quarterly and daily data.
    These bags are stored under the path defined as `concat_all_joined_by_stmt_dir`.

    Second, it will create a single bag with all data from all different statements, by merging the
    bags from the previous step into a single big bag. This bag will be stored under the path defined
    as `concat_all_joined_all_dir`.

    Third, it will create single standardized bags for BS, IS, CF containing data from both quarterly and daily data.
    These bags will be stored under the path defined as `concat_all_standardized_by_stmt_dir`.


All this steps use basic implementations of the AbstractProcess class from the
secfsdstools.g_pipeline package.

Furthermore, all these steps check if something changed since the last run and are only executed
if something did change (for instance, if a new data became available).

Have also a look at the notebook 08_00_automation_basics.

"""
import shutil
from pathlib import Path
from typing import List

from secfsdstools.a_config.configmodel import Configuration
from secfsdstools.c_automation.task_framework import AbstractProcess, LoggingProcess
from secfsdstools.g_pipelines.concat_process import (
    ConcatByChangedTimestampProcess,
    ConcatByNewSubfoldersProcess,
    ConcatMultiRootByChangedTimestampProcess,
)
from secfsdstools.g_pipelines.filter_process import FilterProcess
from secfsdstools.g_pipelines.standardize_process import StandardizeProcess


class ClearDailyDataProcess(AbstractProcess):
    """
    Since daily processed data is removed, as soon as the data is contained in a quarterly zip file,
    we also need to remove the according data from the automated datasets.

    This process does this for the filtered and standardized daily data, which are created in the first step
    of processing the daily data.

    The following steps for the daily processing always creates the dataset fresh from all available daily data.
    Hence, we only need to clean the filtered and standardized daily data.
    """

    def __init__(self, filtered_daily_joined_by_stmt_dir: str, standardized_daily_by_stmt_dir: str):
        """
        Constructor.
        Args:
            filtered_daily_joined_by_stmt_dir: directory containing the filtered daily data
            standardized_daily_by_stmt_dir: directory containing the standardized daily data
        """
        super().__init__()
        self.filtered_daily_joined_by_stmt_path = Path(filtered_daily_joined_by_stmt_dir) / "daily"
        self.standardized_daily_by_stmt_path = Path(standardized_daily_by_stmt_dir)

    def clear_directory(self, cut_off_day: int, root_dir_path: Path):
        """
        removes all directories under root_dir_path that are older than the cut_off_day.
        """

        cut_off_file_name = f"{cut_off_day}.zip"

        if root_dir_path.exists():
            for dir_path in root_dir_path.iterdir():
                if dir_path.is_dir() and dir_path.name < cut_off_file_name:
                    shutil.rmtree(dir_path)

    def process(self):
        """
        Execute the process.
        """
        # DailyPreparationProcess writes the calculated cut_off_day into the context
        # and we have to make sure, that we cleanup the "postprocessing" daily data as well.
        cut_off_day = self.context["cut_off_day"]

        self.clear_directory(cut_off_day=cut_off_day, root_dir_path=self.filtered_daily_joined_by_stmt_path)
        self.clear_directory(cut_off_day=cut_off_day, root_dir_path=self.standardized_daily_by_stmt_path)


# pylint: disable=too-many-locals
def define_extra_processes(configuration: Configuration) -> List[AbstractProcess]:
    """
    example definition of an additional pipeline. It adds sevreal steps to process
    quarterly and daily data, as well as combining both. See the documentation of this
    module for more details.

    All these steps have a low memory footprint, so, the should run without any problems also
    on 16 GB machine.

    Please have a look at the notebook 08_00_automation_basics for further details.

    Args:
        configuration: the configuration

    Returns:
        List[AbstractProcess]: List with the defined process steps

    """

    filtered_quarterly_joined_by_stmt_dir = configuration.get_parser().get(
        section="Filter", option="filtered_quarterly_joined_by_stmt_dir"
    )
    filtered_daily_joined_by_stmt_dir = configuration.get_parser().get(
        section="Filter", option="filtered_daily_joined_by_stmt_dir"
    )

    filter_parallelize = configuration.get_parser().get(section="Filter", option="parallelize", fallback=True)

    standardized_quarterly_by_stmt_dir = configuration.get_parser().get(
        section="Standardizer", option="standardized_quarterly_by_stmt_dir"
    )
    standardized_daily_by_stmt_dir = configuration.get_parser().get(
        section="Standardizer", option="standardized_daily_by_stmt_dir"
    )

    concat_quarterly_joined_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_quarterly_joined_by_stmt_dir"
    )
    concat_daily_joined_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_daily_joined_by_stmt_dir"
    )

    concat_quarterly_joined_all_dir = configuration.get_parser().get(
        section="Concat", option="concat_quarterly_joined_all_dir"
    )
    concat_daily_joined_all_dir = configuration.get_parser().get(section="Concat", option="concat_daily_joined_all_dir")

    concat_quarterly_standardized_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_quarterly_standardized_by_stmt_dir"
    )
    concat_daily_standardized_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_daily_standardized_by_stmt_dir"
    )

    concat_all_joined_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_all_joined_by_stmt_dir"
    )

    concat_all_joined_dir = configuration.get_parser().get(section="Concat", option="concat_all_joined_dir")

    concat_all_standardized_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_all_standardized_by_stmt_dir"
    )

    processes: List[AbstractProcess] = []

    # QUARTERLY DATA Processing
    processes.append(LoggingProcess(title="Post Update Processes For Quarterly Data Started", lines=[]))

    processes.append(
        # 1. Filter, join, and save by stmt
        FilterProcess(
            db_dir=configuration.db_dir,
            target_dir=filtered_quarterly_joined_by_stmt_dir,
            bag_type="joined",
            save_by_stmt=True,
            execute_serial=not filter_parallelize,
            file_type="quarter",
        )
    )

    processes.append(
        # 2. Standardize the data for every quarter
        StandardizeProcess(
            root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter", target_dir=standardized_quarterly_by_stmt_dir
        ),
    )

    processes.extend(
        [
            # 3. building datasets with all entries by stmt
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/BS",
                pathfilter="*/BS",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/CF",
                pathfilter="*/CF",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/CI",
                pathfilter="*/CI",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/CP",
                pathfilter="*/CP",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/EQ",
                pathfilter="*/EQ",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/IS",
                pathfilter="*/IS",
            ),
        ]
    )

    # 4. create a single joined bag with all the data filtered and joined
    processes.append(
        ConcatByChangedTimestampProcess(
            root_dir=concat_quarterly_joined_by_stmt_dir,
            target_dir=concat_quarterly_joined_all_dir,
        )
    )

    # 5. concate the standardized bags together by stmt (BS, IS, CF).
    processes.extend(
        [
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_quarterly_by_stmt_dir,
                target_dir=f"{concat_quarterly_standardized_by_stmt_dir}/BS",
                pathfilter="*/BS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_quarterly_by_stmt_dir,
                target_dir=f"{concat_quarterly_standardized_by_stmt_dir}/CF",
                pathfilter="*/CF",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_quarterly_by_stmt_dir,
                target_dir=f"{concat_quarterly_standardized_by_stmt_dir}/IS",
                pathfilter="*/IS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
        ]
    )

    # DAILY DATA Processing
    processes.append(LoggingProcess(title="Post Update Processes For Daily Data Started", lines=[]))

    # clean daily data covered now by quarterly data
    processes.append(
        ClearDailyDataProcess(
            filtered_daily_joined_by_stmt_dir=filtered_daily_joined_by_stmt_dir,
            standardized_daily_by_stmt_dir=standardized_daily_by_stmt_dir,
        )
    )

    # 1. Filter, join, and save by stmt
    processes.append(
        FilterProcess(
            db_dir=configuration.db_dir,
            target_dir=filtered_daily_joined_by_stmt_dir,
            bag_type="joined",
            save_by_stmt=True,
            execute_serial=not filter_parallelize,
            file_type="daily",
        )
    )

    processes.append(
        # 2. Standardize the data for daily data
        StandardizeProcess(
            root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily", target_dir=standardized_daily_by_stmt_dir
        ),
    )

    processes.extend(
        [
            # 3. building datasets with all entries by stmt for daily data
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/BS",
                pathfilter="*/BS",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/CF",
                pathfilter="*/CF",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/CI",
                pathfilter="*/CI",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/CP",
                pathfilter="*/CP",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/EQ",
                pathfilter="*/EQ",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/IS",
                pathfilter="*/IS",
            ),
        ]
    )

    # 4. create a single joined bag with all the data filtered and joined for daily data
    processes.append(
        ConcatByChangedTimestampProcess(
            root_dir=concat_daily_joined_by_stmt_dir,
            target_dir=concat_daily_joined_all_dir,
        )
    )

    # 5. concate the standardized bags together by stmt (BS, IS, CF) for daily data.
    processes.extend(
        [
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_daily_by_stmt_dir,
                target_dir=f"{concat_daily_standardized_by_stmt_dir}/BS",
                pathfilter="*/BS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_daily_by_stmt_dir,
                target_dir=f"{concat_daily_standardized_by_stmt_dir}/CF",
                pathfilter="*/CF",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_daily_by_stmt_dir,
                target_dir=f"{concat_daily_standardized_by_stmt_dir}/IS",
                pathfilter="*/IS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
        ]
    )

    # Concat daily and quarter together
    processes.append(
        LoggingProcess(title="Post Update Processes To Combine Quarterly And Daily Data Started", lines=[])
    )

    # 1. concat joined_by_statement
    processes.extend(
        [
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/BS",
                pathfilter="BS",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/CF",
                pathfilter="CF",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/CI",
                pathfilter="CI",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/CP",
                pathfilter="CP",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/EQ",
                pathfilter="EQ",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/IS",
                pathfilter="IS",
            ),
        ]
    )

    # 2. concat joined
    processes.append(
        ConcatMultiRootByChangedTimestampProcess(
            root_dirs=[concat_daily_joined_all_dir, concat_quarterly_joined_all_dir],
            pathfilter="",
            target_dir=concat_all_joined_dir,
        )
    )

    # 3. concat standardized by statement
    processes.extend(
        [
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_daily_standardized_by_stmt_dir, concat_quarterly_standardized_by_stmt_dir],
                target_dir=f"{concat_all_standardized_by_stmt_dir}/BS",
                pathfilter="BS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_daily_standardized_by_stmt_dir, concat_quarterly_standardized_by_stmt_dir],
                target_dir=f"{concat_all_standardized_by_stmt_dir}/CF",
                pathfilter="CF",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_daily_standardized_by_stmt_dir, concat_quarterly_standardized_by_stmt_dir],
                target_dir=f"{concat_all_standardized_by_stmt_dir}/IS",
                pathfilter="IS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
        ]
    )
    return processes

Functions

def define_extra_processes(configuration: Configuration) ‑> List[AbstractProcess]

example definition of an additional pipeline. It adds sevreal steps to process quarterly and daily data, as well as combining both. See the documentation of this module for more details.

All these steps have a low memory footprint, so, the should run without any problems also on 16 GB machine.

Please have a look at the notebook 08_00_automation_basics for further details.

Args

configuration
the configuration

Returns

List[AbstractProcess]
List with the defined process steps
Expand source code
def define_extra_processes(configuration: Configuration) -> List[AbstractProcess]:
    """
    example definition of an additional pipeline. It adds sevreal steps to process
    quarterly and daily data, as well as combining both. See the documentation of this
    module for more details.

    All these steps have a low memory footprint, so, the should run without any problems also
    on 16 GB machine.

    Please have a look at the notebook 08_00_automation_basics for further details.

    Args:
        configuration: the configuration

    Returns:
        List[AbstractProcess]: List with the defined process steps

    """

    filtered_quarterly_joined_by_stmt_dir = configuration.get_parser().get(
        section="Filter", option="filtered_quarterly_joined_by_stmt_dir"
    )
    filtered_daily_joined_by_stmt_dir = configuration.get_parser().get(
        section="Filter", option="filtered_daily_joined_by_stmt_dir"
    )

    filter_parallelize = configuration.get_parser().get(section="Filter", option="parallelize", fallback=True)

    standardized_quarterly_by_stmt_dir = configuration.get_parser().get(
        section="Standardizer", option="standardized_quarterly_by_stmt_dir"
    )
    standardized_daily_by_stmt_dir = configuration.get_parser().get(
        section="Standardizer", option="standardized_daily_by_stmt_dir"
    )

    concat_quarterly_joined_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_quarterly_joined_by_stmt_dir"
    )
    concat_daily_joined_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_daily_joined_by_stmt_dir"
    )

    concat_quarterly_joined_all_dir = configuration.get_parser().get(
        section="Concat", option="concat_quarterly_joined_all_dir"
    )
    concat_daily_joined_all_dir = configuration.get_parser().get(section="Concat", option="concat_daily_joined_all_dir")

    concat_quarterly_standardized_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_quarterly_standardized_by_stmt_dir"
    )
    concat_daily_standardized_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_daily_standardized_by_stmt_dir"
    )

    concat_all_joined_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_all_joined_by_stmt_dir"
    )

    concat_all_joined_dir = configuration.get_parser().get(section="Concat", option="concat_all_joined_dir")

    concat_all_standardized_by_stmt_dir = configuration.get_parser().get(
        section="Concat", option="concat_all_standardized_by_stmt_dir"
    )

    processes: List[AbstractProcess] = []

    # QUARTERLY DATA Processing
    processes.append(LoggingProcess(title="Post Update Processes For Quarterly Data Started", lines=[]))

    processes.append(
        # 1. Filter, join, and save by stmt
        FilterProcess(
            db_dir=configuration.db_dir,
            target_dir=filtered_quarterly_joined_by_stmt_dir,
            bag_type="joined",
            save_by_stmt=True,
            execute_serial=not filter_parallelize,
            file_type="quarter",
        )
    )

    processes.append(
        # 2. Standardize the data for every quarter
        StandardizeProcess(
            root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter", target_dir=standardized_quarterly_by_stmt_dir
        ),
    )

    processes.extend(
        [
            # 3. building datasets with all entries by stmt
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/BS",
                pathfilter="*/BS",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/CF",
                pathfilter="*/CF",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/CI",
                pathfilter="*/CI",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/CP",
                pathfilter="*/CP",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/EQ",
                pathfilter="*/EQ",
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=f"{filtered_quarterly_joined_by_stmt_dir}/quarter",
                target_dir=f"{concat_quarterly_joined_by_stmt_dir}/IS",
                pathfilter="*/IS",
            ),
        ]
    )

    # 4. create a single joined bag with all the data filtered and joined
    processes.append(
        ConcatByChangedTimestampProcess(
            root_dir=concat_quarterly_joined_by_stmt_dir,
            target_dir=concat_quarterly_joined_all_dir,
        )
    )

    # 5. concate the standardized bags together by stmt (BS, IS, CF).
    processes.extend(
        [
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_quarterly_by_stmt_dir,
                target_dir=f"{concat_quarterly_standardized_by_stmt_dir}/BS",
                pathfilter="*/BS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_quarterly_by_stmt_dir,
                target_dir=f"{concat_quarterly_standardized_by_stmt_dir}/CF",
                pathfilter="*/CF",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_quarterly_by_stmt_dir,
                target_dir=f"{concat_quarterly_standardized_by_stmt_dir}/IS",
                pathfilter="*/IS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
        ]
    )

    # DAILY DATA Processing
    processes.append(LoggingProcess(title="Post Update Processes For Daily Data Started", lines=[]))

    # clean daily data covered now by quarterly data
    processes.append(
        ClearDailyDataProcess(
            filtered_daily_joined_by_stmt_dir=filtered_daily_joined_by_stmt_dir,
            standardized_daily_by_stmt_dir=standardized_daily_by_stmt_dir,
        )
    )

    # 1. Filter, join, and save by stmt
    processes.append(
        FilterProcess(
            db_dir=configuration.db_dir,
            target_dir=filtered_daily_joined_by_stmt_dir,
            bag_type="joined",
            save_by_stmt=True,
            execute_serial=not filter_parallelize,
            file_type="daily",
        )
    )

    processes.append(
        # 2. Standardize the data for daily data
        StandardizeProcess(
            root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily", target_dir=standardized_daily_by_stmt_dir
        ),
    )

    processes.extend(
        [
            # 3. building datasets with all entries by stmt for daily data
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/BS",
                pathfilter="*/BS",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/CF",
                pathfilter="*/CF",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/CI",
                pathfilter="*/CI",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/CP",
                pathfilter="*/CP",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/EQ",
                pathfilter="*/EQ",
            ),
            ConcatByChangedTimestampProcess(
                root_dir=f"{filtered_daily_joined_by_stmt_dir}/daily",
                target_dir=f"{concat_daily_joined_by_stmt_dir}/IS",
                pathfilter="*/IS",
            ),
        ]
    )

    # 4. create a single joined bag with all the data filtered and joined for daily data
    processes.append(
        ConcatByChangedTimestampProcess(
            root_dir=concat_daily_joined_by_stmt_dir,
            target_dir=concat_daily_joined_all_dir,
        )
    )

    # 5. concate the standardized bags together by stmt (BS, IS, CF) for daily data.
    processes.extend(
        [
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_daily_by_stmt_dir,
                target_dir=f"{concat_daily_standardized_by_stmt_dir}/BS",
                pathfilter="*/BS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_daily_by_stmt_dir,
                target_dir=f"{concat_daily_standardized_by_stmt_dir}/CF",
                pathfilter="*/CF",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatByNewSubfoldersProcess(
                root_dir=standardized_daily_by_stmt_dir,
                target_dir=f"{concat_daily_standardized_by_stmt_dir}/IS",
                pathfilter="*/IS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
        ]
    )

    # Concat daily and quarter together
    processes.append(
        LoggingProcess(title="Post Update Processes To Combine Quarterly And Daily Data Started", lines=[])
    )

    # 1. concat joined_by_statement
    processes.extend(
        [
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/BS",
                pathfilter="BS",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/CF",
                pathfilter="CF",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/CI",
                pathfilter="CI",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/CP",
                pathfilter="CP",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/EQ",
                pathfilter="EQ",
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_quarterly_joined_by_stmt_dir, concat_daily_joined_by_stmt_dir],
                target_dir=f"{concat_all_joined_by_stmt_dir}/IS",
                pathfilter="IS",
            ),
        ]
    )

    # 2. concat joined
    processes.append(
        ConcatMultiRootByChangedTimestampProcess(
            root_dirs=[concat_daily_joined_all_dir, concat_quarterly_joined_all_dir],
            pathfilter="",
            target_dir=concat_all_joined_dir,
        )
    )

    # 3. concat standardized by statement
    processes.extend(
        [
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_daily_standardized_by_stmt_dir, concat_quarterly_standardized_by_stmt_dir],
                target_dir=f"{concat_all_standardized_by_stmt_dir}/BS",
                pathfilter="BS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_daily_standardized_by_stmt_dir, concat_quarterly_standardized_by_stmt_dir],
                target_dir=f"{concat_all_standardized_by_stmt_dir}/CF",
                pathfilter="CF",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
            ConcatMultiRootByChangedTimestampProcess(
                root_dirs=[concat_daily_standardized_by_stmt_dir, concat_quarterly_standardized_by_stmt_dir],
                target_dir=f"{concat_all_standardized_by_stmt_dir}/IS",
                pathfilter="IS",
                in_memory=True,  # Standardized Bag only work with in_memory
            ),
        ]
    )
    return processes

Classes

class ClearDailyDataProcess (filtered_daily_joined_by_stmt_dir: str, standardized_daily_by_stmt_dir: str)

Since daily processed data is removed, as soon as the data is contained in a quarterly zip file, we also need to remove the according data from the automated datasets.

This process does this for the filtered and standardized daily data, which are created in the first step of processing the daily data.

The following steps for the daily processing always creates the dataset fresh from all available daily data. Hence, we only need to clean the filtered and standardized daily data.

Constructor.

Args

filtered_daily_joined_by_stmt_dir
directory containing the filtered daily data
standardized_daily_by_stmt_dir
directory containing the standardized daily data
Expand source code
class ClearDailyDataProcess(AbstractProcess):
    """
    Since daily processed data is removed, as soon as the data is contained in a quarterly zip file,
    we also need to remove the according data from the automated datasets.

    This process does this for the filtered and standardized daily data, which are created in the first step
    of processing the daily data.

    The following steps for the daily processing always creates the dataset fresh from all available daily data.
    Hence, we only need to clean the filtered and standardized daily data.
    """

    def __init__(self, filtered_daily_joined_by_stmt_dir: str, standardized_daily_by_stmt_dir: str):
        """
        Constructor.
        Args:
            filtered_daily_joined_by_stmt_dir: directory containing the filtered daily data
            standardized_daily_by_stmt_dir: directory containing the standardized daily data
        """
        super().__init__()
        self.filtered_daily_joined_by_stmt_path = Path(filtered_daily_joined_by_stmt_dir) / "daily"
        self.standardized_daily_by_stmt_path = Path(standardized_daily_by_stmt_dir)

    def clear_directory(self, cut_off_day: int, root_dir_path: Path):
        """
        removes all directories under root_dir_path that are older than the cut_off_day.
        """

        cut_off_file_name = f"{cut_off_day}.zip"

        if root_dir_path.exists():
            for dir_path in root_dir_path.iterdir():
                if dir_path.is_dir() and dir_path.name < cut_off_file_name:
                    shutil.rmtree(dir_path)

    def process(self):
        """
        Execute the process.
        """
        # DailyPreparationProcess writes the calculated cut_off_day into the context
        # and we have to make sure, that we cleanup the "postprocessing" daily data as well.
        cut_off_day = self.context["cut_off_day"]

        self.clear_directory(cut_off_day=cut_off_day, root_dir_path=self.filtered_daily_joined_by_stmt_path)
        self.clear_directory(cut_off_day=cut_off_day, root_dir_path=self.standardized_daily_by_stmt_path)

Ancestors

Methods

def clear_directory(self, cut_off_day: int, root_dir_path: pathlib.Path)

removes all directories under root_dir_path that are older than the cut_off_day.

Expand source code
def clear_directory(self, cut_off_day: int, root_dir_path: Path):
    """
    removes all directories under root_dir_path that are older than the cut_off_day.
    """

    cut_off_file_name = f"{cut_off_day}.zip"

    if root_dir_path.exists():
        for dir_path in root_dir_path.iterdir():
            if dir_path.is_dir() and dir_path.name < cut_off_file_name:
                shutil.rmtree(dir_path)
def process(self)

Execute the process.

Expand source code
def process(self):
    """
    Execute the process.
    """
    # DailyPreparationProcess writes the calculated cut_off_day into the context
    # and we have to make sure, that we cleanup the "postprocessing" daily data as well.
    cut_off_day = self.context["cut_off_day"]

    self.clear_directory(cut_off_day=cut_off_day, root_dir_path=self.filtered_daily_joined_by_stmt_path)
    self.clear_directory(cut_off_day=cut_off_day, root_dir_path=self.standardized_daily_by_stmt_path)

Inherited members