Module secfsdstools.g_pipelines.pipeline_utils
Util functions used in pipeline tasks
Expand source code
"""
Util functions used in pipeline tasks
"""
import logging
from pathlib import Path
from typing import List
from secfsdstools.d_container.databagmodel import RawDataBag, JoinedDataBag
from secfsdstools.f_standardize.standardizing import StandardizedBag
LOGGER = logging.getLogger(__name__)
def concat_bags(paths_to_concat: List[Path], target_path: Path,
drop_duplicates_sub_df: bool = True):
"""
Concatenates all the Bags in paths_to_concatenate by using the provided bag_type
into the target_dir directory.
The logic checks for the type of the bag (Raw or Joined) and handles them accordingly.
Of course, all paths in the paths_to_concat must be of the same type
Args:
paths_to_concat (List[Path]) : List with paths to read the datafrome
target_path (Path) : path to write the concatenated data to
drop_duplicates_sub_df (bool, False) : if the final sub_df should be check for duplicates
"""
if len(paths_to_concat) == 0:
# nothing to do
return
LOGGER.info("Concat in memory - number of paths: %d - target: %s",
len(paths_to_concat), target_path)
if RawDataBag.is_rawbag_path(paths_to_concat[0]):
all_bags = [RawDataBag.load(str(path)) for path in paths_to_concat]
all_bag: RawDataBag = RawDataBag.concat(all_bags,
drop_duplicates_sub_df=drop_duplicates_sub_df)
all_bag.save(target_path=str(target_path))
elif JoinedDataBag.is_joinedbag_path(paths_to_concat[0]):
all_bags = [JoinedDataBag.load(str(path)) for path in paths_to_concat]
all_bag: JoinedDataBag = JoinedDataBag.concat(all_bags,
drop_duplicates_sub_df=drop_duplicates_sub_df)
all_bag.save(target_path=str(target_path))
elif StandardizedBag.is_standardizebag_path(paths_to_concat[0]):
all_bags = [StandardizedBag.load(str(path)) for path in paths_to_concat]
all_bag: StandardizedBag = StandardizedBag.concat(all_bags)
all_bag.save(target_path=str(target_path))
else:
raise ValueError("bag_type must be either raw, joined, or standardized")
def concat_bags_filebased(paths_to_concat: List[Path],
target_path: Path,
drop_duplicates_sub_df: bool = False):
"""
Concatenates all the Bags in paths_to_concatenate by using the provided bag_type
into the target_dir directory.
The logic checks for the type of the bag (Raw or Joined) and handles them accordingly.
Of course, all paths in the paths_to_concat must be of the same type
Args:
paths_to_concat (List[Path]) : List with paths to read the datafrome
target_path (Path) : path to write the concatenated data to
drop_duplicates_sub_df (bool, False) : if the final sub_df should be check for duplicates
Returns:
"""
if len(paths_to_concat) == 0:
# nothing to do
return
LOGGER.info("Concat on filesystem - number of paths: %d - target: %s",
len(paths_to_concat), target_path)
if RawDataBag.is_rawbag_path(paths_to_concat[0]):
RawDataBag.concat_filebased(paths_to_concat=paths_to_concat,
target_path=target_path,
drop_duplicates_sub_df=drop_duplicates_sub_df)
elif JoinedDataBag.is_joinedbag_path(paths_to_concat[0]):
JoinedDataBag.concat_filebased(paths_to_concat=paths_to_concat,
target_path=target_path,
drop_duplicates_sub_df=drop_duplicates_sub_df)
else:
raise ValueError("bag_type must be either raw or joined")
Functions
def concat_bags(paths_to_concat: List[pathlib.Path], target_path: pathlib.Path, drop_duplicates_sub_df: bool = True)
-
Concatenates all the Bags in paths_to_concatenate by using the provided bag_type into the target_dir directory.
The logic checks for the type of the bag (Raw or Joined) and handles them accordingly. Of course, all paths in the paths_to_concat must be of the same type
Args
paths_to_concat (List[Path]) : List with paths to read the datafrome target_path (Path) : path to write the concatenated data to drop_duplicates_sub_df (bool, False) : if the final sub_df should be check for duplicates
Expand source code
def concat_bags(paths_to_concat: List[Path], target_path: Path, drop_duplicates_sub_df: bool = True): """ Concatenates all the Bags in paths_to_concatenate by using the provided bag_type into the target_dir directory. The logic checks for the type of the bag (Raw or Joined) and handles them accordingly. Of course, all paths in the paths_to_concat must be of the same type Args: paths_to_concat (List[Path]) : List with paths to read the datafrome target_path (Path) : path to write the concatenated data to drop_duplicates_sub_df (bool, False) : if the final sub_df should be check for duplicates """ if len(paths_to_concat) == 0: # nothing to do return LOGGER.info("Concat in memory - number of paths: %d - target: %s", len(paths_to_concat), target_path) if RawDataBag.is_rawbag_path(paths_to_concat[0]): all_bags = [RawDataBag.load(str(path)) for path in paths_to_concat] all_bag: RawDataBag = RawDataBag.concat(all_bags, drop_duplicates_sub_df=drop_duplicates_sub_df) all_bag.save(target_path=str(target_path)) elif JoinedDataBag.is_joinedbag_path(paths_to_concat[0]): all_bags = [JoinedDataBag.load(str(path)) for path in paths_to_concat] all_bag: JoinedDataBag = JoinedDataBag.concat(all_bags, drop_duplicates_sub_df=drop_duplicates_sub_df) all_bag.save(target_path=str(target_path)) elif StandardizedBag.is_standardizebag_path(paths_to_concat[0]): all_bags = [StandardizedBag.load(str(path)) for path in paths_to_concat] all_bag: StandardizedBag = StandardizedBag.concat(all_bags) all_bag.save(target_path=str(target_path)) else: raise ValueError("bag_type must be either raw, joined, or standardized")
def concat_bags_filebased(paths_to_concat: List[pathlib.Path], target_path: pathlib.Path, drop_duplicates_sub_df: bool = False)
-
Concatenates all the Bags in paths_to_concatenate by using the provided bag_type into the target_dir directory.
The logic checks for the type of the bag (Raw or Joined) and handles them accordingly. Of course, all paths in the paths_to_concat must be of the same type
Args
paths_to_concat (List[Path]) : List with paths to read the datafrome target_path (Path) : path to write the concatenated data to drop_duplicates_sub_df (bool, False) : if the final sub_df should be check for duplicates Returns:
Expand source code
def concat_bags_filebased(paths_to_concat: List[Path], target_path: Path, drop_duplicates_sub_df: bool = False): """ Concatenates all the Bags in paths_to_concatenate by using the provided bag_type into the target_dir directory. The logic checks for the type of the bag (Raw or Joined) and handles them accordingly. Of course, all paths in the paths_to_concat must be of the same type Args: paths_to_concat (List[Path]) : List with paths to read the datafrome target_path (Path) : path to write the concatenated data to drop_duplicates_sub_df (bool, False) : if the final sub_df should be check for duplicates Returns: """ if len(paths_to_concat) == 0: # nothing to do return LOGGER.info("Concat on filesystem - number of paths: %d - target: %s", len(paths_to_concat), target_path) if RawDataBag.is_rawbag_path(paths_to_concat[0]): RawDataBag.concat_filebased(paths_to_concat=paths_to_concat, target_path=target_path, drop_duplicates_sub_df=drop_duplicates_sub_df) elif JoinedDataBag.is_joinedbag_path(paths_to_concat[0]): JoinedDataBag.concat_filebased(paths_to_concat=paths_to_concat, target_path=target_path, drop_duplicates_sub_df=drop_duplicates_sub_df) else: raise ValueError("bag_type must be either raw or joined")