Module secfsdstools.e_collector.multireportcollecting

Reads several reports from different files parallel

Expand source code
"""
Reads several reports from different files parallel
"""
from collections import defaultdict
from dataclasses import dataclass
from typing import Optional, List, Dict

from secfsdstools.e_collector.basecollector import BaseCollector

from secfsdstools.a_config.configmgt import ConfigurationManager
from secfsdstools.a_config.configmodel import Configuration
from secfsdstools.a_utils.parallelexecution import ParallelExecutor
from secfsdstools.c_index.indexdataaccess import IndexReport, ParquetDBIndexingAccessor
from secfsdstools.d_container.databagmodel import RawDataBag

@dataclass
class MultiReportCollector:
    """
    The MultiReport Reader can read reports from different zip files
    and provide their data in single RawDataBag.
    """

    @classmethod
    def get_reports_by_adshs(cls, adshs: List[str],
                             stmt_filter: Optional[List[str]] = None,
                             tag_filter: Optional[List[str]] = None,
                             configuration: Optional[Configuration] = None):
        """
        creates the MultiReportCollector instance for a certain list of adshs.

        if no configuration is passed, it reads the config from the config file

        Args:
            adshs (List[str]): List with unique report ids to load
            stmt_filter (List[str], optional, None):
                List of stmts that should be read (BS, IS, ...)
            tag_filter (List[str], optional, None:
                List of tags that should be read (Assets, Liabilities, ...)

            configuration (Configuration optional, default=None): Optional configuration object

        Returns:
            MultiReportCollector: instance of MultiReportCollector
        """
        if configuration is None:
            configuration = ConfigurationManager.read_config_file()

        dbaccessor = ParquetDBIndexingAccessor(db_dir=configuration.db_dir)

        index_reports = dbaccessor.read_index_reports_for_adshs(adshs=adshs)
        return MultiReportCollector(index_reports=index_reports,
                                    stmt_filter=stmt_filter,
                                    tag_filter=tag_filter)

    @classmethod
    def get_reports_by_indexreports(cls,
                                    index_reports: List[IndexReport],
                                    stmt_filter: Optional[List[str]] = None,
                                    tag_filter: Optional[List[str]] = None,
                                    ):
        """
        crates the MultiReportCollector instance based on IndexReport instances

        Args:
            index_reports (List[IndexReport]): instances of IndexReport
            stmt_filter (List[str], optional, None):
                List of stmts that should be read (BS, IS, ...)
            tag_filter (List[str], optional, None:
                List of tags that should be read (Assets, Liabilities, ...)

        Returns:
            MultiReportCollector: instance of MultiReportCollector
        """
        return MultiReportCollector(index_reports=index_reports,
                                    stmt_filter=stmt_filter,
                                    tag_filter=tag_filter)

    def __init__(self, index_reports: List[IndexReport],
                 stmt_filter: Optional[List[str]] = None,
                 tag_filter: Optional[List[str]] = None):
        super().__init__()
        self.index_reports = index_reports
        self.stmt_filter = stmt_filter
        self.tag_filter = tag_filter

    def _multi_collect(self) -> RawDataBag:
        """
        Reads the list of defined index_reports parallel and concats the content in single
        DataBag.

        Returns:
            RawDataBag: a single DataBag containing all the collected reports
        """
        # todo: consider optimization to group by the same source file
        #   and use the filter option on pd.read_parquet()
        reports: List[IndexReport] = self.index_reports

        # organize by originfile
        adshs_per_file: Dict[str, List[IndexReport]] = defaultdict(list)
        for report in reports:
            adshs_per_file[report.originFile].append(report)

        def get_entries() -> List[List[IndexReport]]:
            # the result is a list of list of IndexReports. Every IndexReport list has the same
            # originFile and therefore also the same fullPath.
            return list(adshs_per_file.values())

        def process_element(element: List[IndexReport]) -> RawDataBag:
            # the received list only contains reports that are stored in the same file, so
            # they all have the same fullPath.
            datapath = element[0].fullPath
            adshs = [x.adsh for x in element]

            collector = BaseCollector(datapath=datapath,
                                      stmt_filter=self.stmt_filter,
                                      tag_filter=self.tag_filter)

            adsh_filter = ('adsh', 'in', adshs)

            return collector.basecollect(sub_df_filter=adsh_filter)

        def post_process(parts: List[RawDataBag]) -> List[RawDataBag]:
            # do nothing
            return parts

        execute_serial = False
        if len(self.index_reports) == 1:
            execute_serial = True
        executor = ParallelExecutor(chunksize=0, execute_serial=execute_serial)

        executor.set_get_entries_function(get_entries)
        executor.set_process_element_function(process_element)
        executor.set_post_process_chunk_function(post_process)

        # we ignore the missing, since get_entries always returns the whole list
        collected_reports: List[RawDataBag]
        collected_reports, _ = executor.execute()

        return RawDataBag.concat(collected_reports)

    def collect(self) -> RawDataBag:
        """
        collects the data and returns a Databag

        Returns:
            RawDataBag: the collected Data
        """
        return self._multi_collect()

Classes

class MultiReportCollector (index_reports: List[IndexReport], stmt_filter: Optional[List[str]] = None, tag_filter: Optional[List[str]] = None)

The MultiReport Reader can read reports from different zip files and provide their data in single RawDataBag.

Expand source code
class MultiReportCollector:
    """
    The MultiReport Reader can read reports from different zip files
    and provide their data in single RawDataBag.
    """

    @classmethod
    def get_reports_by_adshs(cls, adshs: List[str],
                             stmt_filter: Optional[List[str]] = None,
                             tag_filter: Optional[List[str]] = None,
                             configuration: Optional[Configuration] = None):
        """
        creates the MultiReportCollector instance for a certain list of adshs.

        if no configuration is passed, it reads the config from the config file

        Args:
            adshs (List[str]): List with unique report ids to load
            stmt_filter (List[str], optional, None):
                List of stmts that should be read (BS, IS, ...)
            tag_filter (List[str], optional, None:
                List of tags that should be read (Assets, Liabilities, ...)

            configuration (Configuration optional, default=None): Optional configuration object

        Returns:
            MultiReportCollector: instance of MultiReportCollector
        """
        if configuration is None:
            configuration = ConfigurationManager.read_config_file()

        dbaccessor = ParquetDBIndexingAccessor(db_dir=configuration.db_dir)

        index_reports = dbaccessor.read_index_reports_for_adshs(adshs=adshs)
        return MultiReportCollector(index_reports=index_reports,
                                    stmt_filter=stmt_filter,
                                    tag_filter=tag_filter)

    @classmethod
    def get_reports_by_indexreports(cls,
                                    index_reports: List[IndexReport],
                                    stmt_filter: Optional[List[str]] = None,
                                    tag_filter: Optional[List[str]] = None,
                                    ):
        """
        crates the MultiReportCollector instance based on IndexReport instances

        Args:
            index_reports (List[IndexReport]): instances of IndexReport
            stmt_filter (List[str], optional, None):
                List of stmts that should be read (BS, IS, ...)
            tag_filter (List[str], optional, None:
                List of tags that should be read (Assets, Liabilities, ...)

        Returns:
            MultiReportCollector: instance of MultiReportCollector
        """
        return MultiReportCollector(index_reports=index_reports,
                                    stmt_filter=stmt_filter,
                                    tag_filter=tag_filter)

    def __init__(self, index_reports: List[IndexReport],
                 stmt_filter: Optional[List[str]] = None,
                 tag_filter: Optional[List[str]] = None):
        super().__init__()
        self.index_reports = index_reports
        self.stmt_filter = stmt_filter
        self.tag_filter = tag_filter

    def _multi_collect(self) -> RawDataBag:
        """
        Reads the list of defined index_reports parallel and concats the content in single
        DataBag.

        Returns:
            RawDataBag: a single DataBag containing all the collected reports
        """
        # todo: consider optimization to group by the same source file
        #   and use the filter option on pd.read_parquet()
        reports: List[IndexReport] = self.index_reports

        # organize by originfile
        adshs_per_file: Dict[str, List[IndexReport]] = defaultdict(list)
        for report in reports:
            adshs_per_file[report.originFile].append(report)

        def get_entries() -> List[List[IndexReport]]:
            # the result is a list of list of IndexReports. Every IndexReport list has the same
            # originFile and therefore also the same fullPath.
            return list(adshs_per_file.values())

        def process_element(element: List[IndexReport]) -> RawDataBag:
            # the received list only contains reports that are stored in the same file, so
            # they all have the same fullPath.
            datapath = element[0].fullPath
            adshs = [x.adsh for x in element]

            collector = BaseCollector(datapath=datapath,
                                      stmt_filter=self.stmt_filter,
                                      tag_filter=self.tag_filter)

            adsh_filter = ('adsh', 'in', adshs)

            return collector.basecollect(sub_df_filter=adsh_filter)

        def post_process(parts: List[RawDataBag]) -> List[RawDataBag]:
            # do nothing
            return parts

        execute_serial = False
        if len(self.index_reports) == 1:
            execute_serial = True
        executor = ParallelExecutor(chunksize=0, execute_serial=execute_serial)

        executor.set_get_entries_function(get_entries)
        executor.set_process_element_function(process_element)
        executor.set_post_process_chunk_function(post_process)

        # we ignore the missing, since get_entries always returns the whole list
        collected_reports: List[RawDataBag]
        collected_reports, _ = executor.execute()

        return RawDataBag.concat(collected_reports)

    def collect(self) -> RawDataBag:
        """
        collects the data and returns a Databag

        Returns:
            RawDataBag: the collected Data
        """
        return self._multi_collect()

Static methods

def get_reports_by_adshs(adshs: List[str], stmt_filter: Optional[List[str]] = None, tag_filter: Optional[List[str]] = None, configuration: Optional[Configuration] = None)

creates the MultiReportCollector instance for a certain list of adshs.

if no configuration is passed, it reads the config from the config file

Args

adshs : List[str]
List with unique report ids to load

stmt_filter (List[str], optional, None): List of stmts that should be read (BS, IS, …) tag_filter (List[str], optional, None: List of tags that should be read (Assets, Liabilities, …)

configuration : Configuration optional, default=None
Optional configuration object

Returns

MultiReportCollector
instance of MultiReportCollector
Expand source code
@classmethod
def get_reports_by_adshs(cls, adshs: List[str],
                         stmt_filter: Optional[List[str]] = None,
                         tag_filter: Optional[List[str]] = None,
                         configuration: Optional[Configuration] = None):
    """
    creates the MultiReportCollector instance for a certain list of adshs.

    if no configuration is passed, it reads the config from the config file

    Args:
        adshs (List[str]): List with unique report ids to load
        stmt_filter (List[str], optional, None):
            List of stmts that should be read (BS, IS, ...)
        tag_filter (List[str], optional, None:
            List of tags that should be read (Assets, Liabilities, ...)

        configuration (Configuration optional, default=None): Optional configuration object

    Returns:
        MultiReportCollector: instance of MultiReportCollector
    """
    if configuration is None:
        configuration = ConfigurationManager.read_config_file()

    dbaccessor = ParquetDBIndexingAccessor(db_dir=configuration.db_dir)

    index_reports = dbaccessor.read_index_reports_for_adshs(adshs=adshs)
    return MultiReportCollector(index_reports=index_reports,
                                stmt_filter=stmt_filter,
                                tag_filter=tag_filter)
def get_reports_by_indexreports(index_reports: List[IndexReport], stmt_filter: Optional[List[str]] = None, tag_filter: Optional[List[str]] = None)

crates the MultiReportCollector instance based on IndexReport instances

Args

index_reports : List[IndexReport]
instances of IndexReport

stmt_filter (List[str], optional, None): List of stmts that should be read (BS, IS, …) tag_filter (List[str], optional, None: List of tags that should be read (Assets, Liabilities, …)

Returns

MultiReportCollector
instance of MultiReportCollector
Expand source code
@classmethod
def get_reports_by_indexreports(cls,
                                index_reports: List[IndexReport],
                                stmt_filter: Optional[List[str]] = None,
                                tag_filter: Optional[List[str]] = None,
                                ):
    """
    crates the MultiReportCollector instance based on IndexReport instances

    Args:
        index_reports (List[IndexReport]): instances of IndexReport
        stmt_filter (List[str], optional, None):
            List of stmts that should be read (BS, IS, ...)
        tag_filter (List[str], optional, None:
            List of tags that should be read (Assets, Liabilities, ...)

    Returns:
        MultiReportCollector: instance of MultiReportCollector
    """
    return MultiReportCollector(index_reports=index_reports,
                                stmt_filter=stmt_filter,
                                tag_filter=tag_filter)

Methods

def collect(self) ‑> RawDataBag

collects the data and returns a Databag

Returns

RawDataBag
the collected Data
Expand source code
def collect(self) -> RawDataBag:
    """
    collects the data and returns a Databag

    Returns:
        RawDataBag: the collected Data
    """
    return self._multi_collect()