Module secfsdstools.d_container.databagmodel

Defines the container that keeps the data of sub.txt, num.txt, and pre.txt together.

Expand source code
"""
Defines the container that keeps the data of sub.txt, num.txt, and  pre.txt together.
"""
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, TypeVar, Generic, Optional

import pandas as pd

from secfsdstools.a_utils.constants import SUB_TXT, PRE_TXT, NUM_TXT, PRE_NUM_TXT
from secfsdstools.a_utils.fileutils import check_dir, concat_parquet_files
from secfsdstools.d_container.filter import FilterBase
from secfsdstools.d_container.presentation import Presenter

RAW = TypeVar('RAW', bound='RawDataBag')
JOINED = TypeVar('JOINED', bound='JoinedDataBag')
T = TypeVar('T')

LOGGER = logging.getLogger(__name__)


def get_pre_num_filters(adshs: Optional[List[str]],
                        stmts: Optional[List[str]],
                        tags: Optional[List[str]]):
    """ creates filter definitions to be directly applied to num and pre files. """

    pre_filter = []
    num_filter = []

    if adshs:
        adsh_filter_expression = ('adsh', 'in', adshs)
        pre_filter.append(adsh_filter_expression)
        num_filter.append(adsh_filter_expression)

    if stmts:
        pre_filter.append(('stmt', 'in', stmts))

    if tags:
        tag_filter_expression = ('tag', 'in', tags)
        pre_filter.append(tag_filter_expression)
        num_filter.append(tag_filter_expression)

    return pre_filter, num_filter


def concat_bags_file_based_internal(paths_to_concat: List[Path],
                                    target_path: Path,
                                    file_list: List[str],
                                    drop_duplicates_sub_df: bool = False):
    """
    Helper method to concat files of multiple bags into a new bag-directory without actually
    loading the data and therefore having a low memory footprint.

    Args:
        paths_to_concat: list of paths that we want to concat
        target_path:  target path to where the concat data will be stored. the necessary directories
                      will be created
        file_list:  list of filenames to be concatenated without SUB_TXT. So this is either
                    ['pre.txt', 'num.txt'] or ['pre_num.txt'].
        drop_duplicates_sub_df: indicates whether drop duplicates has to be applied on the sub_df.
                                if true, the data for the sub.txt files must be read into memory.
                                This has to be true, for instance if you have separate bags
                                for BS, IS, and CF and want to concat them. In this case, they
                                all have the same data in sub.txt.


    """

    target_path.mkdir(parents=True, exist_ok=True)
    if not drop_duplicates_sub_df:
        file_list.append(SUB_TXT)

    for file_name in file_list:
        target_path_file = str(target_path / f'{file_name}.parquet')
        paths_to_concat_file = [str(p / f'{file_name}.parquet') for p in paths_to_concat]
        concat_parquet_files(paths_to_concat_file, target_path_file)

    # if we have to drop the duplicates, we need to read the data for the sub_df into memory
    if drop_duplicates_sub_df:
        sub_dfs: List[pd.DataFrame] = []
        for path_to_concat in paths_to_concat:
            sub_dfs.append(pd.read_parquet(path_to_concat / f'{SUB_TXT}.parquet'))

        sub_df = pd.concat(sub_dfs, ignore_index=True)
        sub_df.drop_duplicates(inplace=True)
        sub_df.to_parquet(target_path / f'{SUB_TXT}.parquet')


class DataBagBase(Generic[T]):
    """
    Base class for the DataBag types
    """

    def __getitem__(self, bagfilter: FilterBase[T]) -> T:
        """
        forwards to the pathfilter method, so that filters can be chained in a simple syntax:
        bag[filter1][filter2] is equal to bag.pathfilter(filter1).pathfilter(filter2)

        Args:
            bagfilter: the pathfilter to be applied

        Returns:
            RawDataBag: the databag with the filtered content
        """

        return self.filter(bagfilter)

    def filter(self, bagfilter: FilterBase[T]) -> T:
        """
        applies a pathfilter to the bag and produces a new bag based on the pathfilter.
        instead of using the pathfilter, you can also use the "index" syntax to apply filters:
        bag[filter1][filter2] is equal to bag.pathfilter(filter1).pathfilter(filter2)

        Args:
            bagfilter: the pathfilter to be applied

        Returns:
            RawDataBag: the databag with the filtered content
        """
        return bagfilter.filter(self)

    def present(self, presenter: Presenter[T]) -> pd.DataFrame:
        """
        apply a presenter
        """
        return presenter.present(self)

    @staticmethod
    def load_sub_df_by_filter(target_path: str,
                              ciks: Optional[List[int]] = None,
                              adshs: Optional[List[str]] = None,
                              forms: Optional[List[str]] = None) -> pd.DataFrame:
        """
        loads the sub_txt datafrome from the target_path by directly applying the
        defined filters during loading.

        Args:
            target_path: root_path with the parquet files for sub, pre, and num
            ciks: optional list of cik numbers to filter for during loading
            forms: optional list of forms (10-K, 10-Q) to filter for during loading
            adshs: optional list of adhs to filter during the loading

        Returns:
            pd.DataFrame the loaded sub_df content
        """

        sub_filters: List = []
        if ciks:
            sub_filters.append(('cik', 'in', ciks))
        if adshs:
            sub_filters.append(('adsh', 'in', adshs))
        elif forms:
            sub_filters.append(('form', 'in', forms))

        if sub_filters:
            LOGGER.info("apply sub_df filter: %s", sub_filters)

        sub_df = pd.read_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'),
                                 filters=sub_filters if sub_filters else None)

        return sub_df


class JoinedDataBag(DataBagBase[JOINED]):
    """
    the DataBag in which the pre.txt and the num.txt are joined based on the
    adsh, tag, and version.
    """

    @classmethod
    def create(cls, sub_df: pd.DataFrame, pre_num_df: pd.DataFrame) -> JOINED:
        """
        create a new JoinedDataBag.

        Args:
            sub_df: sub.txt dataframe

            pre_num_df: joined pre.txt and num.txt dataframe

        Returns:
            JoinedDataBag: new instance of JoinedDataBag
        """
        return JoinedDataBag(sub_df=sub_df, pre_num_df=pre_num_df)

    def __init__(self, sub_df: pd.DataFrame, pre_num_df: pd.DataFrame):
        """
        constructor.
        Args:
            sub_df: sub.txt dataframe
            pre_num_df: joined pre.txt and num.txt dataframe
        """
        self.sub_df = sub_df
        self.pre_num_df = pre_num_df

    def get_sub_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the sub dataframe.

        Returns:
            pd.DataFrame: copy of the sub dataframe.
        """
        return self.sub_df.copy()

    def get_pre_num_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the joined pre_num dataframe.

        Returns:
            pd.DataFrame: copy of joined pre_num dataframe.
        """
        return self.pre_num_df.copy()

    def copy_bag(self) -> JOINED:
        """
        creates a bag with new copies of the internal dataframes.

        Returns:
            JoinedDataBag: new instance of JoinedDataBag
        """
        return JoinedDataBag.create(sub_df=self.sub_df.copy(),
                                    pre_num_df=self.pre_num_df.copy())

    def save(self, target_path: str):
        """
        Stores the bag under the given directory.
        The directory has to exist and must be empty.

        Args:
            target_path: the directory under which the parquet files for sub and pre_num
                  will be created

        """
        check_dir(target_path)

        self.sub_df.to_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'))
        self.pre_num_df.to_parquet(os.path.join(target_path, f'{PRE_NUM_TXT}.parquet'))

    @staticmethod
    def load(target_path: str,
             ciks_filter: Optional[List[int]] = None,
             adshs_filter: Optional[List[str]] = None,
             forms_filter: Optional[List[str]] = None,
             stmt_filter: Optional[List[str]] = None,
             tag_filter: Optional[List[str]] = None) -> JOINED:
        """
            Loads the content of the current bag at the specified location.

            There are optional filters for adshs, forms, stmts and tags, that are
            applied directly during the load process and hence are more efficient and
            less memory consuming than loading the data and then applying filters.

            This makes especially sense, when you concatenated together data from different
            zip files.

        Args:
            target_path: root_path with the parquet files for sub, pre, and num
            ciks_filter: optional list of cik numbers to filter for during loading
            forms_filter: optional list of forms (10-K, 10-Q) to filter for during loading
            adshs_filter: optional list of adhs to filter during the laoding
            stmt_filter: optional list of stmts (BS, IS, CF, ..) to filter during the loading
            tag_filter: optional list of tags to filter during the loading

        Returns:
            RawDataBag: the loaded Databag
        """
        sub_df = DataBagBase.load_sub_df_by_filter(
            target_path=target_path, adshs=adshs_filter, forms=forms_filter, ciks=ciks_filter
        )

        # if the forms and/or ciks filter was applied, overwrite the adshs list,
        # since this are adshs values that we should filter for in the pre_num dataframe
        if forms_filter or ciks_filter:
            adshs_filter = sub_df.adsh.to_list()

        pre_num_filter = []

        filter_log_str: List[str] = []

        if adshs_filter:
            pre_num_filter.append(('adsh', 'in', adshs_filter))

            # the list of adshs could be quite huge, so we trim the message that we log
            # to max 100 characters
            log_part = str(('adsh', 'in', adshs_filter))
            if len(log_part) > 100:
                log_part = log_part[:100] + "...)"
            filter_log_str.append(log_part)

        if stmt_filter:
            pre_num_filter.append(('stmt', 'in', stmt_filter))
            filter_log_str.append(str(('stmt', 'in', stmt_filter)))

        if tag_filter:
            pre_num_filter.append(('tag', 'in', tag_filter))
            filter_log_str.append(str(('tag', 'in', tag_filter)))

        if len(pre_num_filter) > 0:
            LOGGER.info("apply pre_num_df filter: %s", filter_log_str)

        pre_num_df = pd.read_parquet(os.path.join(target_path, f'{PRE_NUM_TXT}.parquet'),
                                     filters=pre_num_filter if pre_num_filter else None)

        return JoinedDataBag.create(sub_df=sub_df, pre_num_df=pre_num_df)

    @staticmethod
    def concat(bags: List[JOINED], drop_duplicates_sub_df: bool = False) -> JOINED:
        """
        Merges multiple Bags together into one bag.
        Note: merge does not check if DataBags with the same reports are merged together.

        Args:
            bags: List of bags to be merged
            drop_duplicates_sub_df: set to True, if you want to remove duplicates in the sub_df

        Returns:
            JoinedDataBag: a Bag with the merged content

        """
        sub_dfs = [db.sub_df for db in bags]
        pre_num_dfs = [db.pre_num_df for db in bags]

        sub_df = pd.concat(sub_dfs, ignore_index=True)
        pre_num_df = pd.concat(pre_num_dfs, ignore_index=True)

        if drop_duplicates_sub_df:
            sub_df.drop_duplicates(inplace=True)

        return JoinedDataBag.create(sub_df=sub_df,
                                    pre_num_df=pre_num_df)

    @staticmethod
    def concat_filebased(paths_to_concat: List[Path],
                         target_path: Path,
                         drop_duplicates_sub_df: bool = False):
        """
        Concatenates all the Bags in paths_to_concatenate into the target_dir directory.

        It is directly working on the files and does not load the data into the memory.


        Args:
            paths_to_concat (List[Path]) : List with paths to read the datafrome
            target_path (Path) : path to write the concatenated data to
            drop_duplicates_sub_df (bool, False): indicates whether drop duplicates
                                has to be applied on the sub_df.
                                if true, the data for the sub.txt files must be read into memory.
                                This has to be true, for instance if you have separate bags
                                for BS, IS, and CF and want to concat them. In this case, they
                                all have the same data in sub.txt.

        Returns:
        """
        if len(paths_to_concat) == 0:
            # nothing to do
            return

        concat_bags_file_based_internal(
            paths_to_concat=paths_to_concat,
            target_path=target_path,
            file_list=[PRE_NUM_TXT],
            drop_duplicates_sub_df=drop_duplicates_sub_df
        )

    @staticmethod
    def is_joinedbag_path(path: Path) -> bool:
        """ Check whether the provided path contains the files of a JoinedDatabag. """
        return (path / "pre_num.txt.parquet").exists()


@dataclass
class RawDataBagStats:
    """
    Contains simple statistics of a report.
    """
    num_entries: int
    pre_entries: int
    number_of_reports: int
    reports_per_form: Dict[str, int]
    reports_per_period_date: Dict[int, int]


class RawDataBag(DataBagBase[RAW]):
    """
    Container class to keep the data for sub.txt, pre.txt, and num.txt together.
    """

    @classmethod
    def create(cls, sub_df: pd.DataFrame, pre_df: pd.DataFrame, num_df: pd.DataFrame) -> RAW:
        """
        create method for RawDataBag
        Args:
            sub_df(pd.DataFrame): sub.txt dataframe
            pre_df(pd.DataFrame): pre.txt dataframe
            num_df(pd.DataFrame): num.txt dataframe

        Returns:
            RawDataBag:
        """
        return RawDataBag(sub_df=sub_df, pre_df=pre_df, num_df=num_df)

    def __init__(self, sub_df: pd.DataFrame, pre_df: pd.DataFrame, num_df: pd.DataFrame):
        self.sub_df = sub_df
        self.pre_df = pre_df
        self.num_df = num_df

    def copy_bag(self):
        """
        creates a bag with new copies of the internal dataframes.

        Returns:
            RawDataBag: new instance of JoinedDataBag
        """

        return RawDataBag.create(sub_df=self.sub_df.copy(),
                                 pre_df=self.pre_df.copy(),
                                 num_df=self.num_df.copy())

    def get_sub_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the sub.txt dataframe.

        Returns:
            pd.DataFrame: copy of the sub.txt dataframe.
        """
        return self.sub_df.copy()

    def get_pre_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the pre.txt dataframe.

        Returns:
            pd.DataFrame: copy of the pre.txt dataframe.
        """
        return self.pre_df.copy()

    def get_num_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the num.txt dataframe.

        Returns:
            pd.DataFrame: copy of the num.txt dataframe.
        """
        return self.num_df.copy()

    def join(self) -> JoinedDataBag:
        """
        merges the raw data of pre and num together.

        Returns:
            JoinedDataBag: the DataBag where pre and num are merged

        """

        # merge num and pre together. only rows in num are considered for which entries in pre exist
        pre_num_df = pd.merge(self.num_df,
                              self.pre_df,
                              on=['adsh', 'tag',
                                  'version'])  # don't produce index_x and index_y columns

        return JoinedDataBag.create(sub_df=self.sub_df, pre_num_df=pre_num_df)

    def statistics(self) -> RawDataBagStats:
        """
        calculate a few simple statistics of a report.
        - number of entries in the num-file
        - number of entries in the pre-file
        - number of reports in the zip-file (equals number of entries in sub-file)
        - number of reports per form (10-K, 10-Q, ...)
        - number of reports per period date (counts per value in the period column of sub-file)

        Returns:
            RawDataBagStats: instance with basic report infos
        """

        num_entries = len(self.num_df)
        pre_entries = len(self.pre_df)
        number_of_reports = len(self.sub_df)
        reports_per_period_date: Dict[int, int] = self.sub_df.period.value_counts().to_dict()
        reports_per_form: Dict[str, int] = self.sub_df.form.value_counts().to_dict()

        return RawDataBagStats(num_entries=num_entries,
                               pre_entries=pre_entries,
                               number_of_reports=number_of_reports,
                               reports_per_form=reports_per_form,
                               reports_per_period_date=reports_per_period_date
                               )

    def save(self, target_path: str):
        """
        Stores the bag under the given directory.
        The directory has to exist and must be empty.

        Args:
            target_path: the directory under which three parquet files for sub_txt, pre_text,
                  and num_txt will be created
        """
        check_dir(target_path)

        self.sub_df.to_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'))
        self.pre_df.to_parquet(os.path.join(target_path, f'{PRE_TXT}.parquet'))
        self.num_df.to_parquet(os.path.join(target_path, f'{NUM_TXT}.parquet'))

    @staticmethod
    def load(target_path: str,
             ciks_filter: Optional[List[int]] = None,
             adshs_filter: Optional[List[str]] = None,
             forms_filter: Optional[List[str]] = None,
             stmt_filter: Optional[List[str]] = None,
             tag_filter: Optional[List[str]] = None) -> RAW:
        """
            Loads the content of the current bag at the specified location.

            There are optional filters for adshs, forms, stmts and tags, that are
            applied directly during the load process and hence are more efficient and
            less memory consuming than loading the data and then applying filters.

            This makes especially sense, when you concatenated together data from different
            zip files.

            Note: the adsh are mutally exclusive and adsh has the higher precedence.

        Args:
            target_path: root_path with the parquet files for sub, pre, and num
            ciks_filter: optional list of cik numbers to filter for during loading
            forms_filter: optional list of forms (10-K, 10-Q) to filter for during loading
            adshs_filter: optional list of adhs to filter during the laoding
            stmt_filter: optional list of stmts (BS, IS, CF, ..) to filter during the loading
            tag_filter: optional list of tags to filter during the loading

        Returns:
            RawDataBag: the loaded Databag
        """
        sub_df = DataBagBase.load_sub_df_by_filter(
            target_path=target_path, adshs=adshs_filter, forms=forms_filter, ciks=ciks_filter
        )

        # if the forms and/or ciks filter was applied, overwrite the adshs list,
        # since this are adshs values that we should filter for in the pre and num dataframes
        if forms_filter or ciks_filter:
            adshs_filter = sub_df.adsh.to_list()

        pre_filter, num_filter = get_pre_num_filters(adshs=adshs_filter,
                                                     stmts=stmt_filter,
                                                     tags=tag_filter)

        if len(num_filter) > 0:
            LOGGER.info("apply num_df filter: %s", num_filter)

        if len(pre_filter) > 0:
            LOGGER.info("apply pre_df filter: %s", pre_filter)

        pre_df = pd.read_parquet(os.path.join(target_path, f'{PRE_TXT}.parquet'),
                                 filters=pre_filter if pre_filter else None)

        num_df = pd.read_parquet(os.path.join(target_path, f'{NUM_TXT}.parquet'),
                                 filters=num_filter if num_filter else None)

        return RawDataBag.create(sub_df=sub_df, pre_df=pre_df, num_df=num_df)

    @staticmethod
    def concat(bags: List[RAW], drop_duplicates_sub_df: bool = False) -> RAW:
        """
        Merges multiple Bags together into one bag.
        Note: merge does not check if DataBags with the same reports are merged together.

        Args:
            bags: List of bags to be merged
            drop_duplicates_sub_df: set to True, if you want to remove duplicates in the sub_df

        Returns:
            RawDataBag: a Bag with the merged content

        """
        sub_dfs = [db.sub_df for db in bags]
        pre_dfs = [db.pre_df for db in bags]
        num_dfs = [db.num_df for db in bags]

        sub_df = pd.concat(sub_dfs, ignore_index=True)
        pre_df = pd.concat(pre_dfs, ignore_index=True)
        num_df = pd.concat(num_dfs, ignore_index=True)

        if drop_duplicates_sub_df:
            sub_df.drop_duplicates(inplace=True)

        return RawDataBag.create(sub_df=sub_df,
                                 pre_df=pre_df,
                                 num_df=num_df)

    @staticmethod
    def concat_filebased(paths_to_concat: List[Path],
                         target_path: Path,
                         drop_duplicates_sub_df: bool = False):
        """
        Concatenates all the Bags in paths_to_concatenate into the target_dir directory.

        It is directly working on the files and does not load the data into the memory.


        Args:
            paths_to_concat (List[Path]) : List with paths to read the datafrome
            target_path (Path) : path to write the concatenated data to
            drop_duplicates_sub_df (bool, False): indicates whether drop duplicates
                                has to be applied on the sub_df.
                                if true, the data for the sub.txt files must be read into memory.
                                This has to be true, for instance if you have separate bags
                                for BS, IS, and CF and want to concat them. In this case, they
                                all have the same data in sub.txt.

        Returns:
        """
        if len(paths_to_concat) == 0:
            # nothing to do
            return

        concat_bags_file_based_internal(
            paths_to_concat=paths_to_concat,
            target_path=target_path,
            file_list=[PRE_TXT, NUM_TXT],
            drop_duplicates_sub_df=drop_duplicates_sub_df
        )

    @staticmethod
    def is_rawbag_path(path: Path) -> bool:
        """ Check whether the provided path contains the files of a RawDatabag. """
        return (path / "num.txt.parquet").exists()

Functions

def concat_bags_file_based_internal(paths_to_concat: List[pathlib.Path], target_path: pathlib.Path, file_list: List[str], drop_duplicates_sub_df: bool = False)

Helper method to concat files of multiple bags into a new bag-directory without actually loading the data and therefore having a low memory footprint.

Args

paths_to_concat
list of paths that we want to concat
target_path
target path to where the concat data will be stored. the necessary directories will be created
file_list
list of filenames to be concatenated without SUB_TXT. So this is either ['pre.txt', 'num.txt'] or ['pre_num.txt'].
drop_duplicates_sub_df
indicates whether drop duplicates has to be applied on the sub_df. if true, the data for the sub.txt files must be read into memory. This has to be true, for instance if you have separate bags for BS, IS, and CF and want to concat them. In this case, they all have the same data in sub.txt.
Expand source code
def concat_bags_file_based_internal(paths_to_concat: List[Path],
                                    target_path: Path,
                                    file_list: List[str],
                                    drop_duplicates_sub_df: bool = False):
    """
    Helper method to concat files of multiple bags into a new bag-directory without actually
    loading the data and therefore having a low memory footprint.

    Args:
        paths_to_concat: list of paths that we want to concat
        target_path:  target path to where the concat data will be stored. the necessary directories
                      will be created
        file_list:  list of filenames to be concatenated without SUB_TXT. So this is either
                    ['pre.txt', 'num.txt'] or ['pre_num.txt'].
        drop_duplicates_sub_df: indicates whether drop duplicates has to be applied on the sub_df.
                                if true, the data for the sub.txt files must be read into memory.
                                This has to be true, for instance if you have separate bags
                                for BS, IS, and CF and want to concat them. In this case, they
                                all have the same data in sub.txt.


    """

    target_path.mkdir(parents=True, exist_ok=True)
    if not drop_duplicates_sub_df:
        file_list.append(SUB_TXT)

    for file_name in file_list:
        target_path_file = str(target_path / f'{file_name}.parquet')
        paths_to_concat_file = [str(p / f'{file_name}.parquet') for p in paths_to_concat]
        concat_parquet_files(paths_to_concat_file, target_path_file)

    # if we have to drop the duplicates, we need to read the data for the sub_df into memory
    if drop_duplicates_sub_df:
        sub_dfs: List[pd.DataFrame] = []
        for path_to_concat in paths_to_concat:
            sub_dfs.append(pd.read_parquet(path_to_concat / f'{SUB_TXT}.parquet'))

        sub_df = pd.concat(sub_dfs, ignore_index=True)
        sub_df.drop_duplicates(inplace=True)
        sub_df.to_parquet(target_path / f'{SUB_TXT}.parquet')
def get_pre_num_filters(adshs: Optional[List[str]], stmts: Optional[List[str]], tags: Optional[List[str]])

creates filter definitions to be directly applied to num and pre files.

Expand source code
def get_pre_num_filters(adshs: Optional[List[str]],
                        stmts: Optional[List[str]],
                        tags: Optional[List[str]]):
    """ creates filter definitions to be directly applied to num and pre files. """

    pre_filter = []
    num_filter = []

    if adshs:
        adsh_filter_expression = ('adsh', 'in', adshs)
        pre_filter.append(adsh_filter_expression)
        num_filter.append(adsh_filter_expression)

    if stmts:
        pre_filter.append(('stmt', 'in', stmts))

    if tags:
        tag_filter_expression = ('tag', 'in', tags)
        pre_filter.append(tag_filter_expression)
        num_filter.append(tag_filter_expression)

    return pre_filter, num_filter

Classes

class DataBagBase (*args, **kwds)

Base class for the DataBag types

Expand source code
class DataBagBase(Generic[T]):
    """
    Base class for the DataBag types
    """

    def __getitem__(self, bagfilter: FilterBase[T]) -> T:
        """
        forwards to the pathfilter method, so that filters can be chained in a simple syntax:
        bag[filter1][filter2] is equal to bag.pathfilter(filter1).pathfilter(filter2)

        Args:
            bagfilter: the pathfilter to be applied

        Returns:
            RawDataBag: the databag with the filtered content
        """

        return self.filter(bagfilter)

    def filter(self, bagfilter: FilterBase[T]) -> T:
        """
        applies a pathfilter to the bag and produces a new bag based on the pathfilter.
        instead of using the pathfilter, you can also use the "index" syntax to apply filters:
        bag[filter1][filter2] is equal to bag.pathfilter(filter1).pathfilter(filter2)

        Args:
            bagfilter: the pathfilter to be applied

        Returns:
            RawDataBag: the databag with the filtered content
        """
        return bagfilter.filter(self)

    def present(self, presenter: Presenter[T]) -> pd.DataFrame:
        """
        apply a presenter
        """
        return presenter.present(self)

    @staticmethod
    def load_sub_df_by_filter(target_path: str,
                              ciks: Optional[List[int]] = None,
                              adshs: Optional[List[str]] = None,
                              forms: Optional[List[str]] = None) -> pd.DataFrame:
        """
        loads the sub_txt datafrome from the target_path by directly applying the
        defined filters during loading.

        Args:
            target_path: root_path with the parquet files for sub, pre, and num
            ciks: optional list of cik numbers to filter for during loading
            forms: optional list of forms (10-K, 10-Q) to filter for during loading
            adshs: optional list of adhs to filter during the loading

        Returns:
            pd.DataFrame the loaded sub_df content
        """

        sub_filters: List = []
        if ciks:
            sub_filters.append(('cik', 'in', ciks))
        if adshs:
            sub_filters.append(('adsh', 'in', adshs))
        elif forms:
            sub_filters.append(('form', 'in', forms))

        if sub_filters:
            LOGGER.info("apply sub_df filter: %s", sub_filters)

        sub_df = pd.read_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'),
                                 filters=sub_filters if sub_filters else None)

        return sub_df

Ancestors

  • typing.Generic

Subclasses

Static methods

def load_sub_df_by_filter(target_path: str, ciks: Optional[List[int]] = None, adshs: Optional[List[str]] = None, forms: Optional[List[str]] = None) ‑> pandas.core.frame.DataFrame

loads the sub_txt datafrome from the target_path by directly applying the defined filters during loading.

Args

target_path
root_path with the parquet files for sub, pre, and num
ciks
optional list of cik numbers to filter for during loading
forms
optional list of forms (10-K, 10-Q) to filter for during loading
adshs
optional list of adhs to filter during the loading

Returns

pd.DataFrame the loaded sub_df content

Expand source code
@staticmethod
def load_sub_df_by_filter(target_path: str,
                          ciks: Optional[List[int]] = None,
                          adshs: Optional[List[str]] = None,
                          forms: Optional[List[str]] = None) -> pd.DataFrame:
    """
    loads the sub_txt datafrome from the target_path by directly applying the
    defined filters during loading.

    Args:
        target_path: root_path with the parquet files for sub, pre, and num
        ciks: optional list of cik numbers to filter for during loading
        forms: optional list of forms (10-K, 10-Q) to filter for during loading
        adshs: optional list of adhs to filter during the loading

    Returns:
        pd.DataFrame the loaded sub_df content
    """

    sub_filters: List = []
    if ciks:
        sub_filters.append(('cik', 'in', ciks))
    if adshs:
        sub_filters.append(('adsh', 'in', adshs))
    elif forms:
        sub_filters.append(('form', 'in', forms))

    if sub_filters:
        LOGGER.info("apply sub_df filter: %s", sub_filters)

    sub_df = pd.read_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'),
                             filters=sub_filters if sub_filters else None)

    return sub_df

Methods

def filter(self, bagfilter: FilterBase[~T]) ‑> ~T

applies a pathfilter to the bag and produces a new bag based on the pathfilter. instead of using the pathfilter, you can also use the "index" syntax to apply filters: bag[filter1][filter2] is equal to bag.pathfilter(filter1).pathfilter(filter2)

Args

bagfilter
the pathfilter to be applied

Returns

RawDataBag
the databag with the filtered content
Expand source code
def filter(self, bagfilter: FilterBase[T]) -> T:
    """
    applies a pathfilter to the bag and produces a new bag based on the pathfilter.
    instead of using the pathfilter, you can also use the "index" syntax to apply filters:
    bag[filter1][filter2] is equal to bag.pathfilter(filter1).pathfilter(filter2)

    Args:
        bagfilter: the pathfilter to be applied

    Returns:
        RawDataBag: the databag with the filtered content
    """
    return bagfilter.filter(self)
def present(self, presenter: Presenter[~T]) ‑> pandas.core.frame.DataFrame

apply a presenter

Expand source code
def present(self, presenter: Presenter[T]) -> pd.DataFrame:
    """
    apply a presenter
    """
    return presenter.present(self)
class JoinedDataBag (sub_df: pandas.core.frame.DataFrame, pre_num_df: pandas.core.frame.DataFrame)

the DataBag in which the pre.txt and the num.txt are joined based on the adsh, tag, and version.

constructor.

Args

sub_df
sub.txt dataframe
pre_num_df
joined pre.txt and num.txt dataframe
Expand source code
class JoinedDataBag(DataBagBase[JOINED]):
    """
    the DataBag in which the pre.txt and the num.txt are joined based on the
    adsh, tag, and version.
    """

    @classmethod
    def create(cls, sub_df: pd.DataFrame, pre_num_df: pd.DataFrame) -> JOINED:
        """
        create a new JoinedDataBag.

        Args:
            sub_df: sub.txt dataframe

            pre_num_df: joined pre.txt and num.txt dataframe

        Returns:
            JoinedDataBag: new instance of JoinedDataBag
        """
        return JoinedDataBag(sub_df=sub_df, pre_num_df=pre_num_df)

    def __init__(self, sub_df: pd.DataFrame, pre_num_df: pd.DataFrame):
        """
        constructor.
        Args:
            sub_df: sub.txt dataframe
            pre_num_df: joined pre.txt and num.txt dataframe
        """
        self.sub_df = sub_df
        self.pre_num_df = pre_num_df

    def get_sub_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the sub dataframe.

        Returns:
            pd.DataFrame: copy of the sub dataframe.
        """
        return self.sub_df.copy()

    def get_pre_num_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the joined pre_num dataframe.

        Returns:
            pd.DataFrame: copy of joined pre_num dataframe.
        """
        return self.pre_num_df.copy()

    def copy_bag(self) -> JOINED:
        """
        creates a bag with new copies of the internal dataframes.

        Returns:
            JoinedDataBag: new instance of JoinedDataBag
        """
        return JoinedDataBag.create(sub_df=self.sub_df.copy(),
                                    pre_num_df=self.pre_num_df.copy())

    def save(self, target_path: str):
        """
        Stores the bag under the given directory.
        The directory has to exist and must be empty.

        Args:
            target_path: the directory under which the parquet files for sub and pre_num
                  will be created

        """
        check_dir(target_path)

        self.sub_df.to_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'))
        self.pre_num_df.to_parquet(os.path.join(target_path, f'{PRE_NUM_TXT}.parquet'))

    @staticmethod
    def load(target_path: str,
             ciks_filter: Optional[List[int]] = None,
             adshs_filter: Optional[List[str]] = None,
             forms_filter: Optional[List[str]] = None,
             stmt_filter: Optional[List[str]] = None,
             tag_filter: Optional[List[str]] = None) -> JOINED:
        """
            Loads the content of the current bag at the specified location.

            There are optional filters for adshs, forms, stmts and tags, that are
            applied directly during the load process and hence are more efficient and
            less memory consuming than loading the data and then applying filters.

            This makes especially sense, when you concatenated together data from different
            zip files.

        Args:
            target_path: root_path with the parquet files for sub, pre, and num
            ciks_filter: optional list of cik numbers to filter for during loading
            forms_filter: optional list of forms (10-K, 10-Q) to filter for during loading
            adshs_filter: optional list of adhs to filter during the laoding
            stmt_filter: optional list of stmts (BS, IS, CF, ..) to filter during the loading
            tag_filter: optional list of tags to filter during the loading

        Returns:
            RawDataBag: the loaded Databag
        """
        sub_df = DataBagBase.load_sub_df_by_filter(
            target_path=target_path, adshs=adshs_filter, forms=forms_filter, ciks=ciks_filter
        )

        # if the forms and/or ciks filter was applied, overwrite the adshs list,
        # since this are adshs values that we should filter for in the pre_num dataframe
        if forms_filter or ciks_filter:
            adshs_filter = sub_df.adsh.to_list()

        pre_num_filter = []

        filter_log_str: List[str] = []

        if adshs_filter:
            pre_num_filter.append(('adsh', 'in', adshs_filter))

            # the list of adshs could be quite huge, so we trim the message that we log
            # to max 100 characters
            log_part = str(('adsh', 'in', adshs_filter))
            if len(log_part) > 100:
                log_part = log_part[:100] + "...)"
            filter_log_str.append(log_part)

        if stmt_filter:
            pre_num_filter.append(('stmt', 'in', stmt_filter))
            filter_log_str.append(str(('stmt', 'in', stmt_filter)))

        if tag_filter:
            pre_num_filter.append(('tag', 'in', tag_filter))
            filter_log_str.append(str(('tag', 'in', tag_filter)))

        if len(pre_num_filter) > 0:
            LOGGER.info("apply pre_num_df filter: %s", filter_log_str)

        pre_num_df = pd.read_parquet(os.path.join(target_path, f'{PRE_NUM_TXT}.parquet'),
                                     filters=pre_num_filter if pre_num_filter else None)

        return JoinedDataBag.create(sub_df=sub_df, pre_num_df=pre_num_df)

    @staticmethod
    def concat(bags: List[JOINED], drop_duplicates_sub_df: bool = False) -> JOINED:
        """
        Merges multiple Bags together into one bag.
        Note: merge does not check if DataBags with the same reports are merged together.

        Args:
            bags: List of bags to be merged
            drop_duplicates_sub_df: set to True, if you want to remove duplicates in the sub_df

        Returns:
            JoinedDataBag: a Bag with the merged content

        """
        sub_dfs = [db.sub_df for db in bags]
        pre_num_dfs = [db.pre_num_df for db in bags]

        sub_df = pd.concat(sub_dfs, ignore_index=True)
        pre_num_df = pd.concat(pre_num_dfs, ignore_index=True)

        if drop_duplicates_sub_df:
            sub_df.drop_duplicates(inplace=True)

        return JoinedDataBag.create(sub_df=sub_df,
                                    pre_num_df=pre_num_df)

    @staticmethod
    def concat_filebased(paths_to_concat: List[Path],
                         target_path: Path,
                         drop_duplicates_sub_df: bool = False):
        """
        Concatenates all the Bags in paths_to_concatenate into the target_dir directory.

        It is directly working on the files and does not load the data into the memory.


        Args:
            paths_to_concat (List[Path]) : List with paths to read the datafrome
            target_path (Path) : path to write the concatenated data to
            drop_duplicates_sub_df (bool, False): indicates whether drop duplicates
                                has to be applied on the sub_df.
                                if true, the data for the sub.txt files must be read into memory.
                                This has to be true, for instance if you have separate bags
                                for BS, IS, and CF and want to concat them. In this case, they
                                all have the same data in sub.txt.

        Returns:
        """
        if len(paths_to_concat) == 0:
            # nothing to do
            return

        concat_bags_file_based_internal(
            paths_to_concat=paths_to_concat,
            target_path=target_path,
            file_list=[PRE_NUM_TXT],
            drop_duplicates_sub_df=drop_duplicates_sub_df
        )

    @staticmethod
    def is_joinedbag_path(path: Path) -> bool:
        """ Check whether the provided path contains the files of a JoinedDatabag. """
        return (path / "pre_num.txt.parquet").exists()

Ancestors

Static methods

def concat(bags: List[~JOINED], drop_duplicates_sub_df: bool = False) ‑> ~JOINED

Merges multiple Bags together into one bag. Note: merge does not check if DataBags with the same reports are merged together.

Args

bags
List of bags to be merged
drop_duplicates_sub_df
set to True, if you want to remove duplicates in the sub_df

Returns

JoinedDataBag
a Bag with the merged content
Expand source code
@staticmethod
def concat(bags: List[JOINED], drop_duplicates_sub_df: bool = False) -> JOINED:
    """
    Merges multiple Bags together into one bag.
    Note: merge does not check if DataBags with the same reports are merged together.

    Args:
        bags: List of bags to be merged
        drop_duplicates_sub_df: set to True, if you want to remove duplicates in the sub_df

    Returns:
        JoinedDataBag: a Bag with the merged content

    """
    sub_dfs = [db.sub_df for db in bags]
    pre_num_dfs = [db.pre_num_df for db in bags]

    sub_df = pd.concat(sub_dfs, ignore_index=True)
    pre_num_df = pd.concat(pre_num_dfs, ignore_index=True)

    if drop_duplicates_sub_df:
        sub_df.drop_duplicates(inplace=True)

    return JoinedDataBag.create(sub_df=sub_df,
                                pre_num_df=pre_num_df)
def concat_filebased(paths_to_concat: List[pathlib.Path], target_path: pathlib.Path, drop_duplicates_sub_df: bool = False)

Concatenates all the Bags in paths_to_concatenate into the target_dir directory.

It is directly working on the files and does not load the data into the memory.

Args

paths_to_concat (List[Path]) : List with paths to read the datafrome
target_path (Path) : path to write the concatenated data to
drop_duplicates_sub_df : bool, False
indicates whether drop duplicates has to be applied on the sub_df. if true, the data for the sub.txt files must be read into memory. This has to be true, for instance if you have separate bags for BS, IS, and CF and want to concat them. In this case, they all have the same data in sub.txt.

Returns:

Expand source code
@staticmethod
def concat_filebased(paths_to_concat: List[Path],
                     target_path: Path,
                     drop_duplicates_sub_df: bool = False):
    """
    Concatenates all the Bags in paths_to_concatenate into the target_dir directory.

    It is directly working on the files and does not load the data into the memory.


    Args:
        paths_to_concat (List[Path]) : List with paths to read the datafrome
        target_path (Path) : path to write the concatenated data to
        drop_duplicates_sub_df (bool, False): indicates whether drop duplicates
                            has to be applied on the sub_df.
                            if true, the data for the sub.txt files must be read into memory.
                            This has to be true, for instance if you have separate bags
                            for BS, IS, and CF and want to concat them. In this case, they
                            all have the same data in sub.txt.

    Returns:
    """
    if len(paths_to_concat) == 0:
        # nothing to do
        return

    concat_bags_file_based_internal(
        paths_to_concat=paths_to_concat,
        target_path=target_path,
        file_list=[PRE_NUM_TXT],
        drop_duplicates_sub_df=drop_duplicates_sub_df
    )
def create(sub_df: pandas.core.frame.DataFrame, pre_num_df: pandas.core.frame.DataFrame) ‑> ~JOINED

create a new JoinedDataBag.

Args

sub_df
sub.txt dataframe
pre_num_df
joined pre.txt and num.txt dataframe

Returns

JoinedDataBag
new instance of JoinedDataBag
Expand source code
@classmethod
def create(cls, sub_df: pd.DataFrame, pre_num_df: pd.DataFrame) -> JOINED:
    """
    create a new JoinedDataBag.

    Args:
        sub_df: sub.txt dataframe

        pre_num_df: joined pre.txt and num.txt dataframe

    Returns:
        JoinedDataBag: new instance of JoinedDataBag
    """
    return JoinedDataBag(sub_df=sub_df, pre_num_df=pre_num_df)
def is_joinedbag_path(path: pathlib.Path) ‑> bool

Check whether the provided path contains the files of a JoinedDatabag.

Expand source code
@staticmethod
def is_joinedbag_path(path: Path) -> bool:
    """ Check whether the provided path contains the files of a JoinedDatabag. """
    return (path / "pre_num.txt.parquet").exists()
def load(target_path: str, ciks_filter: Optional[List[int]] = None, adshs_filter: Optional[List[str]] = None, forms_filter: Optional[List[str]] = None, stmt_filter: Optional[List[str]] = None, tag_filter: Optional[List[str]] = None) ‑> ~JOINED

Loads the content of the current bag at the specified location.

There are optional filters for adshs, forms, stmts and tags, that are
applied directly during the load process and hence are more efficient and
less memory consuming than loading the data and then applying filters.

This makes especially sense, when you concatenated together data from different
zip files.

Args

target_path
root_path with the parquet files for sub, pre, and num
ciks_filter
optional list of cik numbers to filter for during loading
forms_filter
optional list of forms (10-K, 10-Q) to filter for during loading
adshs_filter
optional list of adhs to filter during the laoding
stmt_filter
optional list of stmts (BS, IS, CF, ..) to filter during the loading
tag_filter
optional list of tags to filter during the loading

Returns

RawDataBag
the loaded Databag
Expand source code
@staticmethod
def load(target_path: str,
         ciks_filter: Optional[List[int]] = None,
         adshs_filter: Optional[List[str]] = None,
         forms_filter: Optional[List[str]] = None,
         stmt_filter: Optional[List[str]] = None,
         tag_filter: Optional[List[str]] = None) -> JOINED:
    """
        Loads the content of the current bag at the specified location.

        There are optional filters for adshs, forms, stmts and tags, that are
        applied directly during the load process and hence are more efficient and
        less memory consuming than loading the data and then applying filters.

        This makes especially sense, when you concatenated together data from different
        zip files.

    Args:
        target_path: root_path with the parquet files for sub, pre, and num
        ciks_filter: optional list of cik numbers to filter for during loading
        forms_filter: optional list of forms (10-K, 10-Q) to filter for during loading
        adshs_filter: optional list of adhs to filter during the laoding
        stmt_filter: optional list of stmts (BS, IS, CF, ..) to filter during the loading
        tag_filter: optional list of tags to filter during the loading

    Returns:
        RawDataBag: the loaded Databag
    """
    sub_df = DataBagBase.load_sub_df_by_filter(
        target_path=target_path, adshs=adshs_filter, forms=forms_filter, ciks=ciks_filter
    )

    # if the forms and/or ciks filter was applied, overwrite the adshs list,
    # since this are adshs values that we should filter for in the pre_num dataframe
    if forms_filter or ciks_filter:
        adshs_filter = sub_df.adsh.to_list()

    pre_num_filter = []

    filter_log_str: List[str] = []

    if adshs_filter:
        pre_num_filter.append(('adsh', 'in', adshs_filter))

        # the list of adshs could be quite huge, so we trim the message that we log
        # to max 100 characters
        log_part = str(('adsh', 'in', adshs_filter))
        if len(log_part) > 100:
            log_part = log_part[:100] + "...)"
        filter_log_str.append(log_part)

    if stmt_filter:
        pre_num_filter.append(('stmt', 'in', stmt_filter))
        filter_log_str.append(str(('stmt', 'in', stmt_filter)))

    if tag_filter:
        pre_num_filter.append(('tag', 'in', tag_filter))
        filter_log_str.append(str(('tag', 'in', tag_filter)))

    if len(pre_num_filter) > 0:
        LOGGER.info("apply pre_num_df filter: %s", filter_log_str)

    pre_num_df = pd.read_parquet(os.path.join(target_path, f'{PRE_NUM_TXT}.parquet'),
                                 filters=pre_num_filter if pre_num_filter else None)

    return JoinedDataBag.create(sub_df=sub_df, pre_num_df=pre_num_df)

Methods

def copy_bag(self) ‑> ~JOINED

creates a bag with new copies of the internal dataframes.

Returns

JoinedDataBag
new instance of JoinedDataBag
Expand source code
def copy_bag(self) -> JOINED:
    """
    creates a bag with new copies of the internal dataframes.

    Returns:
        JoinedDataBag: new instance of JoinedDataBag
    """
    return JoinedDataBag.create(sub_df=self.sub_df.copy(),
                                pre_num_df=self.pre_num_df.copy())
def get_pre_num_copy(self) ‑> pandas.core.frame.DataFrame

Returns a copy of the joined pre_num dataframe.

Returns

pd.DataFrame
copy of joined pre_num dataframe.
Expand source code
def get_pre_num_copy(self) -> pd.DataFrame:
    """
    Returns a copy of the joined pre_num dataframe.

    Returns:
        pd.DataFrame: copy of joined pre_num dataframe.
    """
    return self.pre_num_df.copy()
def get_sub_copy(self) ‑> pandas.core.frame.DataFrame

Returns a copy of the sub dataframe.

Returns

pd.DataFrame
copy of the sub dataframe.
Expand source code
def get_sub_copy(self) -> pd.DataFrame:
    """
    Returns a copy of the sub dataframe.

    Returns:
        pd.DataFrame: copy of the sub dataframe.
    """
    return self.sub_df.copy()
def save(self, target_path: str)

Stores the bag under the given directory. The directory has to exist and must be empty.

Args

target_path
the directory under which the parquet files for sub and pre_num will be created
Expand source code
def save(self, target_path: str):
    """
    Stores the bag under the given directory.
    The directory has to exist and must be empty.

    Args:
        target_path: the directory under which the parquet files for sub and pre_num
              will be created

    """
    check_dir(target_path)

    self.sub_df.to_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'))
    self.pre_num_df.to_parquet(os.path.join(target_path, f'{PRE_NUM_TXT}.parquet'))

Inherited members

class RawDataBag (sub_df: pandas.core.frame.DataFrame, pre_df: pandas.core.frame.DataFrame, num_df: pandas.core.frame.DataFrame)

Container class to keep the data for sub.txt, pre.txt, and num.txt together.

Expand source code
class RawDataBag(DataBagBase[RAW]):
    """
    Container class to keep the data for sub.txt, pre.txt, and num.txt together.
    """

    @classmethod
    def create(cls, sub_df: pd.DataFrame, pre_df: pd.DataFrame, num_df: pd.DataFrame) -> RAW:
        """
        create method for RawDataBag
        Args:
            sub_df(pd.DataFrame): sub.txt dataframe
            pre_df(pd.DataFrame): pre.txt dataframe
            num_df(pd.DataFrame): num.txt dataframe

        Returns:
            RawDataBag:
        """
        return RawDataBag(sub_df=sub_df, pre_df=pre_df, num_df=num_df)

    def __init__(self, sub_df: pd.DataFrame, pre_df: pd.DataFrame, num_df: pd.DataFrame):
        self.sub_df = sub_df
        self.pre_df = pre_df
        self.num_df = num_df

    def copy_bag(self):
        """
        creates a bag with new copies of the internal dataframes.

        Returns:
            RawDataBag: new instance of JoinedDataBag
        """

        return RawDataBag.create(sub_df=self.sub_df.copy(),
                                 pre_df=self.pre_df.copy(),
                                 num_df=self.num_df.copy())

    def get_sub_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the sub.txt dataframe.

        Returns:
            pd.DataFrame: copy of the sub.txt dataframe.
        """
        return self.sub_df.copy()

    def get_pre_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the pre.txt dataframe.

        Returns:
            pd.DataFrame: copy of the pre.txt dataframe.
        """
        return self.pre_df.copy()

    def get_num_copy(self) -> pd.DataFrame:
        """
        Returns a copy of the num.txt dataframe.

        Returns:
            pd.DataFrame: copy of the num.txt dataframe.
        """
        return self.num_df.copy()

    def join(self) -> JoinedDataBag:
        """
        merges the raw data of pre and num together.

        Returns:
            JoinedDataBag: the DataBag where pre and num are merged

        """

        # merge num and pre together. only rows in num are considered for which entries in pre exist
        pre_num_df = pd.merge(self.num_df,
                              self.pre_df,
                              on=['adsh', 'tag',
                                  'version'])  # don't produce index_x and index_y columns

        return JoinedDataBag.create(sub_df=self.sub_df, pre_num_df=pre_num_df)

    def statistics(self) -> RawDataBagStats:
        """
        calculate a few simple statistics of a report.
        - number of entries in the num-file
        - number of entries in the pre-file
        - number of reports in the zip-file (equals number of entries in sub-file)
        - number of reports per form (10-K, 10-Q, ...)
        - number of reports per period date (counts per value in the period column of sub-file)

        Returns:
            RawDataBagStats: instance with basic report infos
        """

        num_entries = len(self.num_df)
        pre_entries = len(self.pre_df)
        number_of_reports = len(self.sub_df)
        reports_per_period_date: Dict[int, int] = self.sub_df.period.value_counts().to_dict()
        reports_per_form: Dict[str, int] = self.sub_df.form.value_counts().to_dict()

        return RawDataBagStats(num_entries=num_entries,
                               pre_entries=pre_entries,
                               number_of_reports=number_of_reports,
                               reports_per_form=reports_per_form,
                               reports_per_period_date=reports_per_period_date
                               )

    def save(self, target_path: str):
        """
        Stores the bag under the given directory.
        The directory has to exist and must be empty.

        Args:
            target_path: the directory under which three parquet files for sub_txt, pre_text,
                  and num_txt will be created
        """
        check_dir(target_path)

        self.sub_df.to_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'))
        self.pre_df.to_parquet(os.path.join(target_path, f'{PRE_TXT}.parquet'))
        self.num_df.to_parquet(os.path.join(target_path, f'{NUM_TXT}.parquet'))

    @staticmethod
    def load(target_path: str,
             ciks_filter: Optional[List[int]] = None,
             adshs_filter: Optional[List[str]] = None,
             forms_filter: Optional[List[str]] = None,
             stmt_filter: Optional[List[str]] = None,
             tag_filter: Optional[List[str]] = None) -> RAW:
        """
            Loads the content of the current bag at the specified location.

            There are optional filters for adshs, forms, stmts and tags, that are
            applied directly during the load process and hence are more efficient and
            less memory consuming than loading the data and then applying filters.

            This makes especially sense, when you concatenated together data from different
            zip files.

            Note: the adsh are mutally exclusive and adsh has the higher precedence.

        Args:
            target_path: root_path with the parquet files for sub, pre, and num
            ciks_filter: optional list of cik numbers to filter for during loading
            forms_filter: optional list of forms (10-K, 10-Q) to filter for during loading
            adshs_filter: optional list of adhs to filter during the laoding
            stmt_filter: optional list of stmts (BS, IS, CF, ..) to filter during the loading
            tag_filter: optional list of tags to filter during the loading

        Returns:
            RawDataBag: the loaded Databag
        """
        sub_df = DataBagBase.load_sub_df_by_filter(
            target_path=target_path, adshs=adshs_filter, forms=forms_filter, ciks=ciks_filter
        )

        # if the forms and/or ciks filter was applied, overwrite the adshs list,
        # since this are adshs values that we should filter for in the pre and num dataframes
        if forms_filter or ciks_filter:
            adshs_filter = sub_df.adsh.to_list()

        pre_filter, num_filter = get_pre_num_filters(adshs=adshs_filter,
                                                     stmts=stmt_filter,
                                                     tags=tag_filter)

        if len(num_filter) > 0:
            LOGGER.info("apply num_df filter: %s", num_filter)

        if len(pre_filter) > 0:
            LOGGER.info("apply pre_df filter: %s", pre_filter)

        pre_df = pd.read_parquet(os.path.join(target_path, f'{PRE_TXT}.parquet'),
                                 filters=pre_filter if pre_filter else None)

        num_df = pd.read_parquet(os.path.join(target_path, f'{NUM_TXT}.parquet'),
                                 filters=num_filter if num_filter else None)

        return RawDataBag.create(sub_df=sub_df, pre_df=pre_df, num_df=num_df)

    @staticmethod
    def concat(bags: List[RAW], drop_duplicates_sub_df: bool = False) -> RAW:
        """
        Merges multiple Bags together into one bag.
        Note: merge does not check if DataBags with the same reports are merged together.

        Args:
            bags: List of bags to be merged
            drop_duplicates_sub_df: set to True, if you want to remove duplicates in the sub_df

        Returns:
            RawDataBag: a Bag with the merged content

        """
        sub_dfs = [db.sub_df for db in bags]
        pre_dfs = [db.pre_df for db in bags]
        num_dfs = [db.num_df for db in bags]

        sub_df = pd.concat(sub_dfs, ignore_index=True)
        pre_df = pd.concat(pre_dfs, ignore_index=True)
        num_df = pd.concat(num_dfs, ignore_index=True)

        if drop_duplicates_sub_df:
            sub_df.drop_duplicates(inplace=True)

        return RawDataBag.create(sub_df=sub_df,
                                 pre_df=pre_df,
                                 num_df=num_df)

    @staticmethod
    def concat_filebased(paths_to_concat: List[Path],
                         target_path: Path,
                         drop_duplicates_sub_df: bool = False):
        """
        Concatenates all the Bags in paths_to_concatenate into the target_dir directory.

        It is directly working on the files and does not load the data into the memory.


        Args:
            paths_to_concat (List[Path]) : List with paths to read the datafrome
            target_path (Path) : path to write the concatenated data to
            drop_duplicates_sub_df (bool, False): indicates whether drop duplicates
                                has to be applied on the sub_df.
                                if true, the data for the sub.txt files must be read into memory.
                                This has to be true, for instance if you have separate bags
                                for BS, IS, and CF and want to concat them. In this case, they
                                all have the same data in sub.txt.

        Returns:
        """
        if len(paths_to_concat) == 0:
            # nothing to do
            return

        concat_bags_file_based_internal(
            paths_to_concat=paths_to_concat,
            target_path=target_path,
            file_list=[PRE_TXT, NUM_TXT],
            drop_duplicates_sub_df=drop_duplicates_sub_df
        )

    @staticmethod
    def is_rawbag_path(path: Path) -> bool:
        """ Check whether the provided path contains the files of a RawDatabag. """
        return (path / "num.txt.parquet").exists()

Ancestors

Static methods

def concat(bags: List[~RAW], drop_duplicates_sub_df: bool = False) ‑> ~RAW

Merges multiple Bags together into one bag. Note: merge does not check if DataBags with the same reports are merged together.

Args

bags
List of bags to be merged
drop_duplicates_sub_df
set to True, if you want to remove duplicates in the sub_df

Returns

RawDataBag
a Bag with the merged content
Expand source code
@staticmethod
def concat(bags: List[RAW], drop_duplicates_sub_df: bool = False) -> RAW:
    """
    Merges multiple Bags together into one bag.
    Note: merge does not check if DataBags with the same reports are merged together.

    Args:
        bags: List of bags to be merged
        drop_duplicates_sub_df: set to True, if you want to remove duplicates in the sub_df

    Returns:
        RawDataBag: a Bag with the merged content

    """
    sub_dfs = [db.sub_df for db in bags]
    pre_dfs = [db.pre_df for db in bags]
    num_dfs = [db.num_df for db in bags]

    sub_df = pd.concat(sub_dfs, ignore_index=True)
    pre_df = pd.concat(pre_dfs, ignore_index=True)
    num_df = pd.concat(num_dfs, ignore_index=True)

    if drop_duplicates_sub_df:
        sub_df.drop_duplicates(inplace=True)

    return RawDataBag.create(sub_df=sub_df,
                             pre_df=pre_df,
                             num_df=num_df)
def concat_filebased(paths_to_concat: List[pathlib.Path], target_path: pathlib.Path, drop_duplicates_sub_df: bool = False)

Concatenates all the Bags in paths_to_concatenate into the target_dir directory.

It is directly working on the files and does not load the data into the memory.

Args

paths_to_concat (List[Path]) : List with paths to read the datafrome
target_path (Path) : path to write the concatenated data to
drop_duplicates_sub_df : bool, False
indicates whether drop duplicates has to be applied on the sub_df. if true, the data for the sub.txt files must be read into memory. This has to be true, for instance if you have separate bags for BS, IS, and CF and want to concat them. In this case, they all have the same data in sub.txt.

Returns:

Expand source code
@staticmethod
def concat_filebased(paths_to_concat: List[Path],
                     target_path: Path,
                     drop_duplicates_sub_df: bool = False):
    """
    Concatenates all the Bags in paths_to_concatenate into the target_dir directory.

    It is directly working on the files and does not load the data into the memory.


    Args:
        paths_to_concat (List[Path]) : List with paths to read the datafrome
        target_path (Path) : path to write the concatenated data to
        drop_duplicates_sub_df (bool, False): indicates whether drop duplicates
                            has to be applied on the sub_df.
                            if true, the data for the sub.txt files must be read into memory.
                            This has to be true, for instance if you have separate bags
                            for BS, IS, and CF and want to concat them. In this case, they
                            all have the same data in sub.txt.

    Returns:
    """
    if len(paths_to_concat) == 0:
        # nothing to do
        return

    concat_bags_file_based_internal(
        paths_to_concat=paths_to_concat,
        target_path=target_path,
        file_list=[PRE_TXT, NUM_TXT],
        drop_duplicates_sub_df=drop_duplicates_sub_df
    )
def create(sub_df: pandas.core.frame.DataFrame, pre_df: pandas.core.frame.DataFrame, num_df: pandas.core.frame.DataFrame) ‑> ~RAW

create method for RawDataBag

Args

sub_df(pd.DataFrame): sub.txt dataframe pre_df(pd.DataFrame): pre.txt dataframe num_df(pd.DataFrame): num.txt dataframe

Returns

RawDataBag:

Expand source code
@classmethod
def create(cls, sub_df: pd.DataFrame, pre_df: pd.DataFrame, num_df: pd.DataFrame) -> RAW:
    """
    create method for RawDataBag
    Args:
        sub_df(pd.DataFrame): sub.txt dataframe
        pre_df(pd.DataFrame): pre.txt dataframe
        num_df(pd.DataFrame): num.txt dataframe

    Returns:
        RawDataBag:
    """
    return RawDataBag(sub_df=sub_df, pre_df=pre_df, num_df=num_df)
def is_rawbag_path(path: pathlib.Path) ‑> bool

Check whether the provided path contains the files of a RawDatabag.

Expand source code
@staticmethod
def is_rawbag_path(path: Path) -> bool:
    """ Check whether the provided path contains the files of a RawDatabag. """
    return (path / "num.txt.parquet").exists()
def load(target_path: str, ciks_filter: Optional[List[int]] = None, adshs_filter: Optional[List[str]] = None, forms_filter: Optional[List[str]] = None, stmt_filter: Optional[List[str]] = None, tag_filter: Optional[List[str]] = None) ‑> ~RAW

Loads the content of the current bag at the specified location.

There are optional filters for adshs, forms, stmts and tags, that are
applied directly during the load process and hence are more efficient and
less memory consuming than loading the data and then applying filters.

This makes especially sense, when you concatenated together data from different
zip files.

Note: the adsh are mutally exclusive and adsh has the higher precedence.

Args

target_path
root_path with the parquet files for sub, pre, and num
ciks_filter
optional list of cik numbers to filter for during loading
forms_filter
optional list of forms (10-K, 10-Q) to filter for during loading
adshs_filter
optional list of adhs to filter during the laoding
stmt_filter
optional list of stmts (BS, IS, CF, ..) to filter during the loading
tag_filter
optional list of tags to filter during the loading

Returns

RawDataBag
the loaded Databag
Expand source code
@staticmethod
def load(target_path: str,
         ciks_filter: Optional[List[int]] = None,
         adshs_filter: Optional[List[str]] = None,
         forms_filter: Optional[List[str]] = None,
         stmt_filter: Optional[List[str]] = None,
         tag_filter: Optional[List[str]] = None) -> RAW:
    """
        Loads the content of the current bag at the specified location.

        There are optional filters for adshs, forms, stmts and tags, that are
        applied directly during the load process and hence are more efficient and
        less memory consuming than loading the data and then applying filters.

        This makes especially sense, when you concatenated together data from different
        zip files.

        Note: the adsh are mutally exclusive and adsh has the higher precedence.

    Args:
        target_path: root_path with the parquet files for sub, pre, and num
        ciks_filter: optional list of cik numbers to filter for during loading
        forms_filter: optional list of forms (10-K, 10-Q) to filter for during loading
        adshs_filter: optional list of adhs to filter during the laoding
        stmt_filter: optional list of stmts (BS, IS, CF, ..) to filter during the loading
        tag_filter: optional list of tags to filter during the loading

    Returns:
        RawDataBag: the loaded Databag
    """
    sub_df = DataBagBase.load_sub_df_by_filter(
        target_path=target_path, adshs=adshs_filter, forms=forms_filter, ciks=ciks_filter
    )

    # if the forms and/or ciks filter was applied, overwrite the adshs list,
    # since this are adshs values that we should filter for in the pre and num dataframes
    if forms_filter or ciks_filter:
        adshs_filter = sub_df.adsh.to_list()

    pre_filter, num_filter = get_pre_num_filters(adshs=adshs_filter,
                                                 stmts=stmt_filter,
                                                 tags=tag_filter)

    if len(num_filter) > 0:
        LOGGER.info("apply num_df filter: %s", num_filter)

    if len(pre_filter) > 0:
        LOGGER.info("apply pre_df filter: %s", pre_filter)

    pre_df = pd.read_parquet(os.path.join(target_path, f'{PRE_TXT}.parquet'),
                             filters=pre_filter if pre_filter else None)

    num_df = pd.read_parquet(os.path.join(target_path, f'{NUM_TXT}.parquet'),
                             filters=num_filter if num_filter else None)

    return RawDataBag.create(sub_df=sub_df, pre_df=pre_df, num_df=num_df)

Methods

def copy_bag(self)

creates a bag with new copies of the internal dataframes.

Returns

RawDataBag
new instance of JoinedDataBag
Expand source code
def copy_bag(self):
    """
    creates a bag with new copies of the internal dataframes.

    Returns:
        RawDataBag: new instance of JoinedDataBag
    """

    return RawDataBag.create(sub_df=self.sub_df.copy(),
                             pre_df=self.pre_df.copy(),
                             num_df=self.num_df.copy())
def get_num_copy(self) ‑> pandas.core.frame.DataFrame

Returns a copy of the num.txt dataframe.

Returns

pd.DataFrame
copy of the num.txt dataframe.
Expand source code
def get_num_copy(self) -> pd.DataFrame:
    """
    Returns a copy of the num.txt dataframe.

    Returns:
        pd.DataFrame: copy of the num.txt dataframe.
    """
    return self.num_df.copy()
def get_pre_copy(self) ‑> pandas.core.frame.DataFrame

Returns a copy of the pre.txt dataframe.

Returns

pd.DataFrame
copy of the pre.txt dataframe.
Expand source code
def get_pre_copy(self) -> pd.DataFrame:
    """
    Returns a copy of the pre.txt dataframe.

    Returns:
        pd.DataFrame: copy of the pre.txt dataframe.
    """
    return self.pre_df.copy()
def get_sub_copy(self) ‑> pandas.core.frame.DataFrame

Returns a copy of the sub.txt dataframe.

Returns

pd.DataFrame
copy of the sub.txt dataframe.
Expand source code
def get_sub_copy(self) -> pd.DataFrame:
    """
    Returns a copy of the sub.txt dataframe.

    Returns:
        pd.DataFrame: copy of the sub.txt dataframe.
    """
    return self.sub_df.copy()
def join(self) ‑> JoinedDataBag

merges the raw data of pre and num together.

Returns

JoinedDataBag
the DataBag where pre and num are merged
Expand source code
def join(self) -> JoinedDataBag:
    """
    merges the raw data of pre and num together.

    Returns:
        JoinedDataBag: the DataBag where pre and num are merged

    """

    # merge num and pre together. only rows in num are considered for which entries in pre exist
    pre_num_df = pd.merge(self.num_df,
                          self.pre_df,
                          on=['adsh', 'tag',
                              'version'])  # don't produce index_x and index_y columns

    return JoinedDataBag.create(sub_df=self.sub_df, pre_num_df=pre_num_df)
def save(self, target_path: str)

Stores the bag under the given directory. The directory has to exist and must be empty.

Args

target_path
the directory under which three parquet files for sub_txt, pre_text, and num_txt will be created
Expand source code
def save(self, target_path: str):
    """
    Stores the bag under the given directory.
    The directory has to exist and must be empty.

    Args:
        target_path: the directory under which three parquet files for sub_txt, pre_text,
              and num_txt will be created
    """
    check_dir(target_path)

    self.sub_df.to_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'))
    self.pre_df.to_parquet(os.path.join(target_path, f'{PRE_TXT}.parquet'))
    self.num_df.to_parquet(os.path.join(target_path, f'{NUM_TXT}.parquet'))
def statistics(self) ‑> RawDataBagStats

calculate a few simple statistics of a report. - number of entries in the num-file - number of entries in the pre-file - number of reports in the zip-file (equals number of entries in sub-file) - number of reports per form (10-K, 10-Q, …) - number of reports per period date (counts per value in the period column of sub-file)

Returns

RawDataBagStats
instance with basic report infos
Expand source code
def statistics(self) -> RawDataBagStats:
    """
    calculate a few simple statistics of a report.
    - number of entries in the num-file
    - number of entries in the pre-file
    - number of reports in the zip-file (equals number of entries in sub-file)
    - number of reports per form (10-K, 10-Q, ...)
    - number of reports per period date (counts per value in the period column of sub-file)

    Returns:
        RawDataBagStats: instance with basic report infos
    """

    num_entries = len(self.num_df)
    pre_entries = len(self.pre_df)
    number_of_reports = len(self.sub_df)
    reports_per_period_date: Dict[int, int] = self.sub_df.period.value_counts().to_dict()
    reports_per_form: Dict[str, int] = self.sub_df.form.value_counts().to_dict()

    return RawDataBagStats(num_entries=num_entries,
                           pre_entries=pre_entries,
                           number_of_reports=number_of_reports,
                           reports_per_form=reports_per_form,
                           reports_per_period_date=reports_per_period_date
                           )

Inherited members

class RawDataBagStats (num_entries: int, pre_entries: int, number_of_reports: int, reports_per_form: Dict[str, int], reports_per_period_date: Dict[int, int])

Contains simple statistics of a report.

Expand source code
class RawDataBagStats:
    """
    Contains simple statistics of a report.
    """
    num_entries: int
    pre_entries: int
    number_of_reports: int
    reports_per_form: Dict[str, int]
    reports_per_period_date: Dict[int, int]

Class variables

var num_entries : int
var number_of_reports : int
var pre_entries : int
var reports_per_form : Dict[str, int]
var reports_per_period_date : Dict[int, int]