Module secfsdstools.g_pipelines.pipeline_utils

Util functions used in pipeline tasks

Expand source code
"""
Util functions used in pipeline tasks
"""
from pathlib import Path
from typing import List

from secfsdstools.d_container.databagmodel import RawDataBag, JoinedDataBag, is_rawbag_path, \
    is_joinedbag_path


def concat_bags(paths_to_concat: List[Path], target_path: Path):
    """
    Concatenates all the Bags in paths_to_concatenate by using the provided bag_type
    into the target_dir directory.

    The logic checks for the type of the bag (Raw or Joined) and handles them accordingly.
    Of course, all paths in the paths_to_concat must be of the same type

    Args:
        paths_to_concat (List[Path]) :
        target_dir:

    Returns:

    """
    if len(paths_to_concat) == 0:
        # nothing to do
        return

    if is_rawbag_path(paths_to_concat[0]):
        all_bags = [RawDataBag.load(str(path)) for path in paths_to_concat]

        all_bag: RawDataBag = RawDataBag.concat(all_bags, drop_duplicates_sub_df=True)
        all_bag.save(target_path=str(target_path))
    elif is_joinedbag_path(paths_to_concat[0]):
        all_bags = [JoinedDataBag.load(str(path)) for path in paths_to_concat]

        all_bag: JoinedDataBag = JoinedDataBag.concat(all_bags, drop_duplicates_sub_df=True)
        all_bag.save(target_path=str(target_path))
    else:
        raise ValueError("bag_type must be either raw or joined")

Functions

def concat_bags(paths_to_concat: List[pathlib.Path], target_path: pathlib.Path)

Concatenates all the Bags in paths_to_concatenate by using the provided bag_type into the target_dir directory.

The logic checks for the type of the bag (Raw or Joined) and handles them accordingly. Of course, all paths in the paths_to_concat must be of the same type

Args

paths_to_concat (List[Path]) : target_dir: Returns:

Expand source code
def concat_bags(paths_to_concat: List[Path], target_path: Path):
    """
    Concatenates all the Bags in paths_to_concatenate by using the provided bag_type
    into the target_dir directory.

    The logic checks for the type of the bag (Raw or Joined) and handles them accordingly.
    Of course, all paths in the paths_to_concat must be of the same type

    Args:
        paths_to_concat (List[Path]) :
        target_dir:

    Returns:

    """
    if len(paths_to_concat) == 0:
        # nothing to do
        return

    if is_rawbag_path(paths_to_concat[0]):
        all_bags = [RawDataBag.load(str(path)) for path in paths_to_concat]

        all_bag: RawDataBag = RawDataBag.concat(all_bags, drop_duplicates_sub_df=True)
        all_bag.save(target_path=str(target_path))
    elif is_joinedbag_path(paths_to_concat[0]):
        all_bags = [JoinedDataBag.load(str(path)) for path in paths_to_concat]

        all_bag: JoinedDataBag = JoinedDataBag.concat(all_bags, drop_duplicates_sub_df=True)
        all_bag.save(target_path=str(target_path))
    else:
        raise ValueError("bag_type must be either raw or joined")