Module secfsdstools.g_pipelines.concat_process

Module that combines the content from several input folders into a single DataBag.

Expand source code
"""
Module that combines the content from several input folders into a single DataBag.
"""
from pathlib import Path
from typing import List

from secfsdstools.c_automation.automation_utils import delete_temp_folders
from secfsdstools.c_automation.task_framework import CheckByNewSubfoldersMergeBaseTask, \
    CheckByTimestampMergeBaseTask, AbstractTask, \
    AbstractThreadProcess
from secfsdstools.g_pipelines.pipeline_utils import concat_bags


class ConcatIfNewSubfolderTask(CheckByNewSubfoldersMergeBaseTask):
    """
        Concats subfolders in root_path if a new subfolder was added.
    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
        Takes subfolders from the root_path (with applied pathfilter string) and
        concatenates them into a single DataBag (either Raw or Joined) into the target_path.

        The pathfilter string defines on which subfolder level actually does contain the data to be
        concatenated.

        E.g. if the pathfilter is just a "*" and the root_path looks like
        <pre>
        root_path
            2010q1.zip
            2010q2.zip
            ...
            2024q3.zip
        </pre>
        it would expect that the subfolders directly contain the databags.<br>
        If the pathfilter is defined as "*/BS" and the root_path looks like
        <pre>
        root_path
            2010q1.zip
              BS
              CF
              IS
            2010q2.zip
              BS
              CF
              IS
            ...
            2024q3.zip
              BS
              CF
              IS
        </pre>

        It would concatenate all BS subfolders into the target path.<br>

        The logic is executed, if new subfolders are available.
        Meaning, if a new folder in the root_path appears, it will add it to
        the already existing content in the target_path. So, if in the above example the
        subfolders 2010q1.zip ... 2024q3.zip already were concatenated in a previous run
        and in the next run the subfolder 2024q4.zip is detected, it will add the new subfolder
        to the existin gcontent in the target_path.

        This is being done by writing the name of the processed subfolders into a meta.inf file
        in the target_path.

        Args:
            root_path: root path to read that from
            pathfilter: pathfilter string that defines which subfolders in the root_path have
                        to be selected
            target_path: path to where the results have to be written
        """
        super().__init__(
            root_path=root_path,
            pathfilter=pathfilter,
            target_path=target_path
        )

    def __str__(self) -> str:
        return f"ConcatIfNewSubfolderTask(root_path: {self.root_path}, pathfilter: {self.filter})"

    def do_execution(self,
                     paths_to_process: List[Path],
                     target_path: Path,
                     tmp_path: Path):
        """
            defines the logic to be executed.
        Args:
            paths_to_process: lists of paths/folders that have to be processed
            target_path: the path where the result of the previous run was written
            tmp_path: target path to where a result has to be written
        """
        paths_to_concat = paths_to_process

        # if the target path exits, it must also be concatenated into the new result
        if target_path.exists():
            paths_to_concat = paths_to_process + [target_path]

        # concat and save to the tmp_path
        concat_bags(paths_to_concat=paths_to_concat,
                    target_path=tmp_path)


class ConcatIfChangedTimestampTask(CheckByTimestampMergeBaseTask):
    """
        Concats subfolders in root_path if something did change in any subfolder since the last
        run.
    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
        Takes subfolder from the root_path (with applied pathfilter string) and
        concatenates them into a single DataBag (either Raw or Joined) into the target_path.

        The pathfilter string defines on which subfolder level actually does contain the data to be
        concatenated.

        E.g. if the pathfilter is just a "*" and the root_path looks like
        <pre>
        root_path
            2010q1.zip
            2010q2.zip
            ...
            2024q3.zip
        </pre>
        it would expect that the subfolders directly contain the databags.<br>
        If the pathfilter is defined as "*/BS" and the root_path looks like
        <pre>
        root_path
            2010q1.zip
              BS
              CF
              IS
            2010q2.zip
              BS
              CF
              IS
            ...
            2024q3.zip
              BS
              CF
              IS
        </pre>

        It would concatenate all BS subfolders into the target path.<br>

        The logic will be executed, if a file/folder in the root_path has changed since the
        last processing.

        This is being done by writing the actual last modification timestamp of the root_path
        into the meta.inf file of the target_path.


        Args:
            root_path: root path to read that from
            pathfilter: pathfilter string that defines which subfolders in the root_path
                        have to be selected
            target_path: path to where the results have to be written
        """
        super().__init__(
            root_path=root_path,
            pathfilter=pathfilter,
            target_path=target_path
        )

    def __str__(self) -> str:
        return f"ConcatIfChangedTimestampTask(root_path: {self.root_path}," \
               f"pathfilter: {self.filter})"

    def do_execution(self,
                     paths_to_process: List[Path],
                     tmp_path: Path):
        """
        concats the bags in the paths_to_process and saves it in the tmp_path.

        Args:
            paths_to_process: lists of paths/folders that have to be processed
            tmp_path: path to where a result has to be written
        """
        concat_bags(paths_to_concat=paths_to_process, target_path=tmp_path)


class ConcatByChangedTimestampProcess(AbstractThreadProcess):
    """
    Process implementation that concatenates raw or joined databigs in a root_dir
    if something changed in any of the subfolders and saves the result in the target_dir.
    """

    def __init__(self,
                 root_dir: str,
                 target_dir: str,
                 pathfilter: str = "*",
                 ):
        """
        Constructor.
        Args:
            root_dir: root_dir which contains the bags to be concatenated
            target_dir: target_dir to which the concatenated result has to be written
            pathfilter: pathfilter string to apply to select the bags/subfolders to be processed.
                    default is "*".
        """
        super().__init__(execute_serial=False,
                         chunksize=0)

        self.root_path = Path(root_dir)
        self.target_path = Path(target_dir)
        self.filter = pathfilter

    def pre_process(self):
        """
        Executed before the actual process and cleans up any "tmp" folders within the target,
        in case of a failing previous processing.
        """
        delete_temp_folders(root_path=self.target_path.parent)

    def calculate_tasks(self) -> List[AbstractTask]:
        """
            Calculates the tasks that actually execute the logic.
            This is a single task process, so just a single task is being created.

        Returns:
            List[AbstractTask]: Task to be executed
        """
        task = ConcatIfChangedTimestampTask(
            root_path=self.root_path,
            pathfilter=self.filter,
            target_path=self.target_path,
        )

        # since this is a one task process, we just check if there is really something to do
        if len(task.paths_to_process) > 0:
            return [task]
        return []


class ConcatByNewSubfoldersProcess(AbstractThreadProcess):
    """
    Process implementation that concatenates raw or joined databags in a root_dir
    if a new bag/subfolder was added to the root_dir and saves the result in the target_dir.
    """

    def __init__(self,
                 root_dir: str,
                 target_dir: str,
                 pathfilter: str = "*",
                 ):
        """
        Constructor.
        Args:
            root_dir: root_dir which contains the bags to be concatenated
            target_dir: target_dir to which the concatenated result has to be written
            pathfilter: pathfilter string to apply to select the bags/subfolders to be processed.
                    default is "*".
        """
        super().__init__(execute_serial=False,
                         chunksize=0)

        self.root_path = Path(root_dir)
        self.target_path = Path(target_dir)
        self.filter = pathfilter

    def pre_process(self):
        """
        Executed before the actual process and cleans up any "tmp" folders within the target,
        in case of a failing previous processing.
        """
        delete_temp_folders(root_path=self.target_path.parent)

    def calculate_tasks(self) -> List[AbstractTask]:
        """
            Calculates the tasks that actually execute the logic.
            This is a single task process, so just a single task is being created.

        Returns:
            List[AbstractTask]: Task to be executed
        """
        task = ConcatIfNewSubfolderTask(
            root_path=self.root_path,
            pathfilter=self.filter,
            target_path=self.target_path,
        )

        # since this is a one task process, we just check if there is really something to do
        if len(task.paths_to_process) > 0:
            return [task]
        return []

Classes

class ConcatByChangedTimestampProcess (root_dir: str, target_dir: str, pathfilter: str = '*')

Process implementation that concatenates raw or joined databigs in a root_dir if something changed in any of the subfolders and saves the result in the target_dir.

Constructor.

Args

root_dir
root_dir which contains the bags to be concatenated
target_dir
target_dir to which the concatenated result has to be written
pathfilter
pathfilter string to apply to select the bags/subfolders to be processed. default is "*".
Expand source code
class ConcatByChangedTimestampProcess(AbstractThreadProcess):
    """
    Process implementation that concatenates raw or joined databigs in a root_dir
    if something changed in any of the subfolders and saves the result in the target_dir.
    """

    def __init__(self,
                 root_dir: str,
                 target_dir: str,
                 pathfilter: str = "*",
                 ):
        """
        Constructor.
        Args:
            root_dir: root_dir which contains the bags to be concatenated
            target_dir: target_dir to which the concatenated result has to be written
            pathfilter: pathfilter string to apply to select the bags/subfolders to be processed.
                    default is "*".
        """
        super().__init__(execute_serial=False,
                         chunksize=0)

        self.root_path = Path(root_dir)
        self.target_path = Path(target_dir)
        self.filter = pathfilter

    def pre_process(self):
        """
        Executed before the actual process and cleans up any "tmp" folders within the target,
        in case of a failing previous processing.
        """
        delete_temp_folders(root_path=self.target_path.parent)

    def calculate_tasks(self) -> List[AbstractTask]:
        """
            Calculates the tasks that actually execute the logic.
            This is a single task process, so just a single task is being created.

        Returns:
            List[AbstractTask]: Task to be executed
        """
        task = ConcatIfChangedTimestampTask(
            root_path=self.root_path,
            pathfilter=self.filter,
            target_path=self.target_path,
        )

        # since this is a one task process, we just check if there is really something to do
        if len(task.paths_to_process) > 0:
            return [task]
        return []

Ancestors

Methods

def calculate_tasks(self) ‑> List[AbstractTask]

Calculates the tasks that actually execute the logic. This is a single task process, so just a single task is being created.

Returns

List[AbstractTask]
Task to be executed
Expand source code
def calculate_tasks(self) -> List[AbstractTask]:
    """
        Calculates the tasks that actually execute the logic.
        This is a single task process, so just a single task is being created.

    Returns:
        List[AbstractTask]: Task to be executed
    """
    task = ConcatIfChangedTimestampTask(
        root_path=self.root_path,
        pathfilter=self.filter,
        target_path=self.target_path,
    )

    # since this is a one task process, we just check if there is really something to do
    if len(task.paths_to_process) > 0:
        return [task]
    return []
def pre_process(self)

Executed before the actual process and cleans up any "tmp" folders within the target, in case of a failing previous processing.

Expand source code
def pre_process(self):
    """
    Executed before the actual process and cleans up any "tmp" folders within the target,
    in case of a failing previous processing.
    """
    delete_temp_folders(root_path=self.target_path.parent)

Inherited members

class ConcatByNewSubfoldersProcess (root_dir: str, target_dir: str, pathfilter: str = '*')

Process implementation that concatenates raw or joined databags in a root_dir if a new bag/subfolder was added to the root_dir and saves the result in the target_dir.

Constructor.

Args

root_dir
root_dir which contains the bags to be concatenated
target_dir
target_dir to which the concatenated result has to be written
pathfilter
pathfilter string to apply to select the bags/subfolders to be processed. default is "*".
Expand source code
class ConcatByNewSubfoldersProcess(AbstractThreadProcess):
    """
    Process implementation that concatenates raw or joined databags in a root_dir
    if a new bag/subfolder was added to the root_dir and saves the result in the target_dir.
    """

    def __init__(self,
                 root_dir: str,
                 target_dir: str,
                 pathfilter: str = "*",
                 ):
        """
        Constructor.
        Args:
            root_dir: root_dir which contains the bags to be concatenated
            target_dir: target_dir to which the concatenated result has to be written
            pathfilter: pathfilter string to apply to select the bags/subfolders to be processed.
                    default is "*".
        """
        super().__init__(execute_serial=False,
                         chunksize=0)

        self.root_path = Path(root_dir)
        self.target_path = Path(target_dir)
        self.filter = pathfilter

    def pre_process(self):
        """
        Executed before the actual process and cleans up any "tmp" folders within the target,
        in case of a failing previous processing.
        """
        delete_temp_folders(root_path=self.target_path.parent)

    def calculate_tasks(self) -> List[AbstractTask]:
        """
            Calculates the tasks that actually execute the logic.
            This is a single task process, so just a single task is being created.

        Returns:
            List[AbstractTask]: Task to be executed
        """
        task = ConcatIfNewSubfolderTask(
            root_path=self.root_path,
            pathfilter=self.filter,
            target_path=self.target_path,
        )

        # since this is a one task process, we just check if there is really something to do
        if len(task.paths_to_process) > 0:
            return [task]
        return []

Ancestors

Methods

def calculate_tasks(self) ‑> List[AbstractTask]

Calculates the tasks that actually execute the logic. This is a single task process, so just a single task is being created.

Returns

List[AbstractTask]
Task to be executed
Expand source code
def calculate_tasks(self) -> List[AbstractTask]:
    """
        Calculates the tasks that actually execute the logic.
        This is a single task process, so just a single task is being created.

    Returns:
        List[AbstractTask]: Task to be executed
    """
    task = ConcatIfNewSubfolderTask(
        root_path=self.root_path,
        pathfilter=self.filter,
        target_path=self.target_path,
    )

    # since this is a one task process, we just check if there is really something to do
    if len(task.paths_to_process) > 0:
        return [task]
    return []
def pre_process(self)

Executed before the actual process and cleans up any "tmp" folders within the target, in case of a failing previous processing.

Expand source code
def pre_process(self):
    """
    Executed before the actual process and cleans up any "tmp" folders within the target,
    in case of a failing previous processing.
    """
    delete_temp_folders(root_path=self.target_path.parent)

Inherited members

class ConcatIfChangedTimestampTask (root_path: pathlib.Path, pathfilter: str, target_path: pathlib.Path)

Concats subfolders in root_path if something did change in any subfolder since the last run.

Takes subfolder from the root_path (with applied pathfilter string) and concatenates them into a single DataBag (either Raw or Joined) into the target_path.

The pathfilter string defines on which subfolder level actually does contain the data to be concatenated.

E.g. if the pathfilter is just a "*" and the root_path looks like

root_path
    2010q1.zip
    2010q2.zip
    ...
    2024q3.zip

it would expect that the subfolders directly contain the databags.
If the pathfilter is defined as "*/BS" and the root_path looks like

root_path
    2010q1.zip
      BS
      CF
      IS
    2010q2.zip
      BS
      CF
      IS
    ...
    2024q3.zip
      BS
      CF
      IS

It would concatenate all BS subfolders into the target path.

The logic will be executed, if a file/folder in the root_path has changed since the last processing.

This is being done by writing the actual last modification timestamp of the root_path into the meta.inf file of the target_path.

Args

root_path
root path to read that from
pathfilter
pathfilter string that defines which subfolders in the root_path have to be selected
target_path
path to where the results have to be written
Expand source code
class ConcatIfChangedTimestampTask(CheckByTimestampMergeBaseTask):
    """
        Concats subfolders in root_path if something did change in any subfolder since the last
        run.
    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
        Takes subfolder from the root_path (with applied pathfilter string) and
        concatenates them into a single DataBag (either Raw or Joined) into the target_path.

        The pathfilter string defines on which subfolder level actually does contain the data to be
        concatenated.

        E.g. if the pathfilter is just a "*" and the root_path looks like
        <pre>
        root_path
            2010q1.zip
            2010q2.zip
            ...
            2024q3.zip
        </pre>
        it would expect that the subfolders directly contain the databags.<br>
        If the pathfilter is defined as "*/BS" and the root_path looks like
        <pre>
        root_path
            2010q1.zip
              BS
              CF
              IS
            2010q2.zip
              BS
              CF
              IS
            ...
            2024q3.zip
              BS
              CF
              IS
        </pre>

        It would concatenate all BS subfolders into the target path.<br>

        The logic will be executed, if a file/folder in the root_path has changed since the
        last processing.

        This is being done by writing the actual last modification timestamp of the root_path
        into the meta.inf file of the target_path.


        Args:
            root_path: root path to read that from
            pathfilter: pathfilter string that defines which subfolders in the root_path
                        have to be selected
            target_path: path to where the results have to be written
        """
        super().__init__(
            root_path=root_path,
            pathfilter=pathfilter,
            target_path=target_path
        )

    def __str__(self) -> str:
        return f"ConcatIfChangedTimestampTask(root_path: {self.root_path}," \
               f"pathfilter: {self.filter})"

    def do_execution(self,
                     paths_to_process: List[Path],
                     tmp_path: Path):
        """
        concats the bags in the paths_to_process and saves it in the tmp_path.

        Args:
            paths_to_process: lists of paths/folders that have to be processed
            tmp_path: path to where a result has to be written
        """
        concat_bags(paths_to_concat=paths_to_process, target_path=tmp_path)

Ancestors

Methods

def do_execution(self, paths_to_process: List[pathlib.Path], tmp_path: pathlib.Path)

concats the bags in the paths_to_process and saves it in the tmp_path.

Args

paths_to_process
lists of paths/folders that have to be processed
tmp_path
path to where a result has to be written
Expand source code
def do_execution(self,
                 paths_to_process: List[Path],
                 tmp_path: Path):
    """
    concats the bags in the paths_to_process and saves it in the tmp_path.

    Args:
        paths_to_process: lists of paths/folders that have to be processed
        tmp_path: path to where a result has to be written
    """
    concat_bags(paths_to_concat=paths_to_process, target_path=tmp_path)

Inherited members

class ConcatIfNewSubfolderTask (root_path: pathlib.Path, pathfilter: str, target_path: pathlib.Path)

Concats subfolders in root_path if a new subfolder was added.

Takes subfolders from the root_path (with applied pathfilter string) and concatenates them into a single DataBag (either Raw or Joined) into the target_path.

The pathfilter string defines on which subfolder level actually does contain the data to be concatenated.

E.g. if the pathfilter is just a "*" and the root_path looks like

root_path
    2010q1.zip
    2010q2.zip
    ...
    2024q3.zip

it would expect that the subfolders directly contain the databags.
If the pathfilter is defined as "*/BS" and the root_path looks like

root_path
    2010q1.zip
      BS
      CF
      IS
    2010q2.zip
      BS
      CF
      IS
    ...
    2024q3.zip
      BS
      CF
      IS

It would concatenate all BS subfolders into the target path.

The logic is executed, if new subfolders are available. Meaning, if a new folder in the root_path appears, it will add it to the already existing content in the target_path. So, if in the above example the subfolders 2010q1.zip … 2024q3.zip already were concatenated in a previous run and in the next run the subfolder 2024q4.zip is detected, it will add the new subfolder to the existin gcontent in the target_path.

This is being done by writing the name of the processed subfolders into a meta.inf file in the target_path.

Args

root_path
root path to read that from
pathfilter
pathfilter string that defines which subfolders in the root_path have to be selected
target_path
path to where the results have to be written
Expand source code
class ConcatIfNewSubfolderTask(CheckByNewSubfoldersMergeBaseTask):
    """
        Concats subfolders in root_path if a new subfolder was added.
    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
        Takes subfolders from the root_path (with applied pathfilter string) and
        concatenates them into a single DataBag (either Raw or Joined) into the target_path.

        The pathfilter string defines on which subfolder level actually does contain the data to be
        concatenated.

        E.g. if the pathfilter is just a "*" and the root_path looks like
        <pre>
        root_path
            2010q1.zip
            2010q2.zip
            ...
            2024q3.zip
        </pre>
        it would expect that the subfolders directly contain the databags.<br>
        If the pathfilter is defined as "*/BS" and the root_path looks like
        <pre>
        root_path
            2010q1.zip
              BS
              CF
              IS
            2010q2.zip
              BS
              CF
              IS
            ...
            2024q3.zip
              BS
              CF
              IS
        </pre>

        It would concatenate all BS subfolders into the target path.<br>

        The logic is executed, if new subfolders are available.
        Meaning, if a new folder in the root_path appears, it will add it to
        the already existing content in the target_path. So, if in the above example the
        subfolders 2010q1.zip ... 2024q3.zip already were concatenated in a previous run
        and in the next run the subfolder 2024q4.zip is detected, it will add the new subfolder
        to the existin gcontent in the target_path.

        This is being done by writing the name of the processed subfolders into a meta.inf file
        in the target_path.

        Args:
            root_path: root path to read that from
            pathfilter: pathfilter string that defines which subfolders in the root_path have
                        to be selected
            target_path: path to where the results have to be written
        """
        super().__init__(
            root_path=root_path,
            pathfilter=pathfilter,
            target_path=target_path
        )

    def __str__(self) -> str:
        return f"ConcatIfNewSubfolderTask(root_path: {self.root_path}, pathfilter: {self.filter})"

    def do_execution(self,
                     paths_to_process: List[Path],
                     target_path: Path,
                     tmp_path: Path):
        """
            defines the logic to be executed.
        Args:
            paths_to_process: lists of paths/folders that have to be processed
            target_path: the path where the result of the previous run was written
            tmp_path: target path to where a result has to be written
        """
        paths_to_concat = paths_to_process

        # if the target path exits, it must also be concatenated into the new result
        if target_path.exists():
            paths_to_concat = paths_to_process + [target_path]

        # concat and save to the tmp_path
        concat_bags(paths_to_concat=paths_to_concat,
                    target_path=tmp_path)

Ancestors

Inherited members