Module secfsdstools.x_examples.automation.automation

This module shows the automation to add additional steps after the usual update process (which is downloading new zip files, transforming them to parquet, indexding them).

You can configure this function in the secfsdstools configuration file, by adding a postupdateprocesses definition. For instance, if you want to use this example, just add the postupdateprocesses definition as shown below:

[DEFAULT]
downloaddirectory = ...
dbdirectory = ...
parquetdirectory = ...
useragentemail = ...
autoupdate = True
keepzipfiles = False
postupdateprocesses=secfsdstools.x_examples.automation.automation.define_extra_processes

If you want to use it, you also need to add additional configuration entries as shown below:

[Filter]
filtered_dir_by_stmt_joined = C:/data/sec/automated/_1_filtered_by_stmt_joined

[Concat]
concat_dir_by_stmt_joined = C:/data/sec/automated/_2_concat_by_stmt_joined

[Standardizer]
standardized_dir = C:/data/sec/automated/_3_standardized

; [SingleBag]
; singlebag_dir = C:/data/sec/automated/_4_single_bag

(A complete configuration file using the "define_extra_processes" function is available in the file automation_config.cfg which is in the same package as this module here.)

This example adds 4 main steps to the usual updated process.

First, it creates a joined bag for every zip file, filters it for 10-K and 10-Q reports only and also applies the filters ReportPeriodRawFilter, MainCoregRawFilter, USDOnlyRawFilter, OfficialTagsOnlyRawFilter. The filtered joined bag is stored under the path defined as filtered_dir_by_stmt_joined. Furthermore, the data will also be split by stmt.

Second, it creates a single joined bag for every statement (balance sheet, income statement, cash flow, cover page, …) that contains the data from all zip files, resp from all the available quarters. These bags are stored under the path defined as concat_dir_by_stmt_joined.

Third, it standardizes the data for balance sheet, income statement, and cash flow and stores the standardized bags under the path that is defined as standardized_dir.

The fourth step is optional and is only executed if the configuration file contains an entry for singlebag_dir. If it does, it will create a single joined bag concatenating all the bags created in the second step, so basically creating a single bag that contains all the data from all the available zip files, resp. quarters.

All this steps use basic implementations of the AbstractProcess class from the secfsdstools.g_pipeline package.

Furthermore, all these steps check if something changed since the last run and are only executed if something did change (for instance, if a new zip file became available).

Have also a look at the notebook 08_00_automation_basics.

Expand source code
"""
This module shows the automation to add additional steps after the usual update process
(which is downloading new zip files, transforming them to parquet, indexding them).

You can configure this function in the secfsdstools configuration file, by adding
a postupdateprocesses definition. For instance, if you want to use this example,
just add the postupdateprocesses definition as shown below:

<pre>
[DEFAULT]
downloaddirectory = ...
dbdirectory = ...
parquetdirectory = ...
useragentemail = ...
autoupdate = True
keepzipfiles = False
postupdateprocesses=secfsdstools.x_examples.automation.automation.define_extra_processes
</pre>

If you want to use it, you also need to add additional configuration entries as shown below:

<pre>
[Filter]
filtered_dir_by_stmt_joined = C:/data/sec/automated/_1_filtered_by_stmt_joined

[Concat]
concat_dir_by_stmt_joined = C:/data/sec/automated/_2_concat_by_stmt_joined

[Standardizer]
standardized_dir = C:/data/sec/automated/_3_standardized

; [SingleBag]
; singlebag_dir = C:/data/sec/automated/_4_single_bag
</pre>

(A complete configuration file using the "define_extra_processes" function is available in the file
 automation_config.cfg which is in the same package as this module here.)

This example adds 4 main steps to the usual updated process.

First, it creates a joined bag for every zip file, filters it for 10-K and 10-Q reports only
and also applies the filters  ReportPeriodRawFilter, MainCoregRawFilter, USDOnlyRawFilter,
OfficialTagsOnlyRawFilter. The filtered joined bag is stored under the path defined as
filtered_dir_by_stmt_joined. Furthermore, the data will also be split by stmt.

Second, it creates a single joined bag for every statement (balance sheet, income statement,
cash flow, cover page, ...) that contains the data from all zip files, resp from all the
available quarters. These bags are stored under the path defined as concat_dir_by_stmt_joined.

Third, it standardizes the data for balance sheet, income statement, and cash flow and stores
the standardized bags under the path that is defined as standardized_dir.

The fourth step is optional and is only executed if the configuration file contains an entry
for singlebag_dir. If it does, it will create a single joined bag concatenating all the bags
created in the second step, so basically creating a single bag that contains all the data from
all the available zip files, resp. quarters.

All this steps use basic implementations of the AbstractProcess class from the
secfsdstools.g_pipeline package.

Furthermore, all these steps check if something changed since the last run and are only executed
if something did change (for instance, if a new zip file became available).

Have also a look at the notebook 08_00_automation_basics.

"""
from typing import List

from secfsdstools.a_config.configmodel import Configuration
from secfsdstools.c_automation.task_framework import AbstractProcess
from secfsdstools.g_pipelines.concat_process import ConcatByNewSubfoldersProcess, \
    ConcatByChangedTimestampProcess
from secfsdstools.g_pipelines.filter_process import FilterProcess
from secfsdstools.g_pipelines.standardize_process import StandardizeProcess


def define_extra_processes(configuration: Configuration) -> List[AbstractProcess]:
    """
    example definition of an additional pipeline.
    It adds process steps that:
    1. Filter for 10-K and 10-Q reports, als apply the filters
       ReportPeriodRawFilter, MainCoregRawFilter, USDOnlyRawFilter, OfficialTagsOnlyRawFilter,
       then joins the data and splits up the data by stmt (BS, IS, CF, ...)
       This is done for every zipfile individually
    2. concats all the stmts together, so that there is one file for every stmt containing all
       the available data
    3. standardizing the data for BS, IS, CF
    4. optional and only executed if the singlebag_dir is configured in the configuration.
       it concats all the bags from step 2 together into a single bag.

    Please have a look at the notebook 08_00_automation_basics for further details.

    Args:
        configuration: the configuration

    Returns:
        List[AbstractProcess]: List with the defined process steps

    """

    joined_by_stmt_dir = configuration.config_parser.get(section="Filter",
                                                         option="filtered_dir_by_stmt_joined")

    concat_by_stmt_dir = configuration.config_parser.get(section="Concat",
                                                         option="concat_dir_by_stmt_joined")

    standardized_dir = configuration.config_parser.get(section="Standardizer",
                                                       option="standardized_dir")

    singlebag_dir = configuration.config_parser.get(section="SingleBag",
                                                    option="singlebag_dir",
                                                    fallback="")

    processes: List[AbstractProcess] = []

    processes.append(
        # 1. Filter, join, and save by stmt
        FilterProcess(db_dir=configuration.db_dir,
                      target_dir=joined_by_stmt_dir,
                      bag_type="joined",
                      save_by_stmt=True,
                      execute_serial=configuration.no_parallel_processing
                      )
    )

    processes.extend([
        # 2. building datasets with all entries by stmt
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/BS",
                                     pathfilter="*/BS"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/CF",
                                     pathfilter="*/CF"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/CI",
                                     pathfilter="*/CI"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/CP",
                                     pathfilter="*/CP"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/EQ",
                                     pathfilter="*/EQ"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/IS",
                                     pathfilter="*/IS"
                                     )
    ])

    processes.append(
        # 3. Standardize the data
        StandardizeProcess(root_dir=f"{concat_by_stmt_dir}",
                           target_dir=standardized_dir),
    )

    # 4. create a single joined bag with all the data, if it is defined
    if singlebag_dir != "":
        processes.append(
            ConcatByChangedTimestampProcess(
                root_dir=f"{concat_by_stmt_dir}/",
                target_dir=f"{singlebag_dir}/all",
            )
        )

    return processes

Functions

def define_extra_processes(configuration: Configuration) ‑> List[AbstractProcess]

example definition of an additional pipeline. It adds process steps that: 1. Filter for 10-K and 10-Q reports, als apply the filters ReportPeriodRawFilter, MainCoregRawFilter, USDOnlyRawFilter, OfficialTagsOnlyRawFilter, then joins the data and splits up the data by stmt (BS, IS, CF, …) This is done for every zipfile individually 2. concats all the stmts together, so that there is one file for every stmt containing all the available data 3. standardizing the data for BS, IS, CF 4. optional and only executed if the singlebag_dir is configured in the configuration. it concats all the bags from step 2 together into a single bag.

Please have a look at the notebook 08_00_automation_basics for further details.

Args

configuration
the configuration

Returns

List[AbstractProcess]
List with the defined process steps
Expand source code
def define_extra_processes(configuration: Configuration) -> List[AbstractProcess]:
    """
    example definition of an additional pipeline.
    It adds process steps that:
    1. Filter for 10-K and 10-Q reports, als apply the filters
       ReportPeriodRawFilter, MainCoregRawFilter, USDOnlyRawFilter, OfficialTagsOnlyRawFilter,
       then joins the data and splits up the data by stmt (BS, IS, CF, ...)
       This is done for every zipfile individually
    2. concats all the stmts together, so that there is one file for every stmt containing all
       the available data
    3. standardizing the data for BS, IS, CF
    4. optional and only executed if the singlebag_dir is configured in the configuration.
       it concats all the bags from step 2 together into a single bag.

    Please have a look at the notebook 08_00_automation_basics for further details.

    Args:
        configuration: the configuration

    Returns:
        List[AbstractProcess]: List with the defined process steps

    """

    joined_by_stmt_dir = configuration.config_parser.get(section="Filter",
                                                         option="filtered_dir_by_stmt_joined")

    concat_by_stmt_dir = configuration.config_parser.get(section="Concat",
                                                         option="concat_dir_by_stmt_joined")

    standardized_dir = configuration.config_parser.get(section="Standardizer",
                                                       option="standardized_dir")

    singlebag_dir = configuration.config_parser.get(section="SingleBag",
                                                    option="singlebag_dir",
                                                    fallback="")

    processes: List[AbstractProcess] = []

    processes.append(
        # 1. Filter, join, and save by stmt
        FilterProcess(db_dir=configuration.db_dir,
                      target_dir=joined_by_stmt_dir,
                      bag_type="joined",
                      save_by_stmt=True,
                      execute_serial=configuration.no_parallel_processing
                      )
    )

    processes.extend([
        # 2. building datasets with all entries by stmt
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/BS",
                                     pathfilter="*/BS"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/CF",
                                     pathfilter="*/CF"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/CI",
                                     pathfilter="*/CI"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/CP",
                                     pathfilter="*/CP"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/EQ",
                                     pathfilter="*/EQ"
                                     ),
        ConcatByNewSubfoldersProcess(root_dir=f"{joined_by_stmt_dir}/quarter",
                                     target_dir=f"{concat_by_stmt_dir}/IS",
                                     pathfilter="*/IS"
                                     )
    ])

    processes.append(
        # 3. Standardize the data
        StandardizeProcess(root_dir=f"{concat_by_stmt_dir}",
                           target_dir=standardized_dir),
    )

    # 4. create a single joined bag with all the data, if it is defined
    if singlebag_dir != "":
        processes.append(
            ConcatByChangedTimestampProcess(
                root_dir=f"{concat_by_stmt_dir}/",
                target_dir=f"{singlebag_dir}/all",
            )
        )

    return processes