Module secfsdstools.e_collector.basecollector
Collector Base Class
Expand source code
"""
Collector Base Class
"""
import os
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple, Union
import pandas as pd
from secfsdstools.a_utils.constants import NUM_TXT, PRE_TXT, SUB_TXT
from secfsdstools.d_container.databagmodel import RawDataBag
class BaseCollector(ABC):
"""
Base class for Collector implementations
"""
def __init__(self, datapath: str,
stmt_filter: Optional[List[str]] = None,
tag_filter: Optional[List[str]] = None):
self.datapath = datapath
self.stmt_filter = stmt_filter
self.tag_filter = tag_filter
def _read_df_from_raw_parquet(self,
file: str,
filters=None) -> pd.DataFrame:
return pd.read_parquet(os.path.join(self.datapath, f'{file}.parquet'),
filters=filters)
def _get_pre_num_filters(self,
adshs: Optional[List[str]],
stmts: Optional[List[str]],
tags: Optional[List[str]]):
pre_filter = []
num_filter = []
if adshs:
adsh_filter_expression = ('adsh', 'in', adshs)
pre_filter.append(adsh_filter_expression)
num_filter.append(adsh_filter_expression)
if stmts:
pre_filter.append(('stmt', 'in', stmts))
if tags:
tag_filter_expression = ('tag', 'in', tags)
pre_filter.append(tag_filter_expression)
num_filter.append(tag_filter_expression)
return pre_filter, num_filter
def _collect(self, sub_df_filter: Tuple[str, str, Union[str, List[str]]]) -> RawDataBag:
sub_df = self._read_df_from_raw_parquet(file=SUB_TXT,
filters=[sub_df_filter] if sub_df_filter else None)
adshs = sub_df.adsh.to_list()
pre_filter, num_filter = self._get_pre_num_filters(adshs=adshs,
stmts=self.stmt_filter,
tags=self.tag_filter)
pre_df = self._read_df_from_raw_parquet(
file=PRE_TXT, filters=pre_filter if pre_filter else None
)
num_df = self._read_df_from_raw_parquet(
file=NUM_TXT, filters=num_filter if num_filter else None
)
# pandas pivot works better if coreg is not nan, so we set it here to a simple dash
num_df.loc[num_df.coreg.isna(), 'coreg'] = ''
return RawDataBag.create(sub_df=sub_df, pre_df=pre_df, num_df=num_df)
@abstractmethod
def collect(self) -> RawDataBag:
"""
collects the data and returns a Databag
Returns:
RawDataBag: the collected Data
"""
Classes
class BaseCollector (datapath: str, stmt_filter: Optional[List[str]] = None, tag_filter: Optional[List[str]] = None)
-
Base class for Collector implementations
Expand source code
class BaseCollector(ABC): """ Base class for Collector implementations """ def __init__(self, datapath: str, stmt_filter: Optional[List[str]] = None, tag_filter: Optional[List[str]] = None): self.datapath = datapath self.stmt_filter = stmt_filter self.tag_filter = tag_filter def _read_df_from_raw_parquet(self, file: str, filters=None) -> pd.DataFrame: return pd.read_parquet(os.path.join(self.datapath, f'{file}.parquet'), filters=filters) def _get_pre_num_filters(self, adshs: Optional[List[str]], stmts: Optional[List[str]], tags: Optional[List[str]]): pre_filter = [] num_filter = [] if adshs: adsh_filter_expression = ('adsh', 'in', adshs) pre_filter.append(adsh_filter_expression) num_filter.append(adsh_filter_expression) if stmts: pre_filter.append(('stmt', 'in', stmts)) if tags: tag_filter_expression = ('tag', 'in', tags) pre_filter.append(tag_filter_expression) num_filter.append(tag_filter_expression) return pre_filter, num_filter def _collect(self, sub_df_filter: Tuple[str, str, Union[str, List[str]]]) -> RawDataBag: sub_df = self._read_df_from_raw_parquet(file=SUB_TXT, filters=[sub_df_filter] if sub_df_filter else None) adshs = sub_df.adsh.to_list() pre_filter, num_filter = self._get_pre_num_filters(adshs=adshs, stmts=self.stmt_filter, tags=self.tag_filter) pre_df = self._read_df_from_raw_parquet( file=PRE_TXT, filters=pre_filter if pre_filter else None ) num_df = self._read_df_from_raw_parquet( file=NUM_TXT, filters=num_filter if num_filter else None ) # pandas pivot works better if coreg is not nan, so we set it here to a simple dash num_df.loc[num_df.coreg.isna(), 'coreg'] = '' return RawDataBag.create(sub_df=sub_df, pre_df=pre_df, num_df=num_df) @abstractmethod def collect(self) -> RawDataBag: """ collects the data and returns a Databag Returns: RawDataBag: the collected Data """
Ancestors
- abc.ABC
Subclasses
Methods
def collect(self) ‑> RawDataBag
-
collects the data and returns a Databag
Returns
RawDataBag
- the collected Data
Expand source code
@abstractmethod def collect(self) -> RawDataBag: """ collects the data and returns a Databag Returns: RawDataBag: the collected Data """