Module secfsdstools.c_automation.task_framework

Base classes for the Task and Process Framework. Used for downloading, transforming to parquet, and indexing of the zip files from SEC, as well as to implement customized automation tasks.

Expand source code
"""
Base classes for the Task and Process Framework.
Used for downloading, transforming to parquet, and indexing of the zip files from SEC, as well
as to implement customized automation tasks.
"""
import logging
import shutil
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import List, Tuple
from typing import Protocol, Any, Dict

from secfsdstools.a_utils.parallelexecution import ThreadExecutor, ParallelExecutor
from secfsdstools.c_automation.automation_utils import get_latest_mtime


class TaskResultState(Enum):
    """
    Enum defining possible ResultStates of one task.
    """
    SUCCESS = 1
    FAILED = 2


class Task(Protocol):
    """
    Task interface.
    """

    def prepare(self):
        """ Prepare everything to execute the task.
            E.g., creation or clearing a directory. """

    def execute(self):
        """ Execution the task. """

    def commit(self) -> Any:
        """ Commit the task if the execution method is not "self-commiting". E.g.,
         If you do some file processing in the execute-method,
         but want to update a state in a table,
         you could do the update of the state in the commit method.
         """

    def exception(self, exception) -> Any:
        """ Handle the exception. """


@dataclass
class TaskResult:
    """
    Dataclass containing the result of a task.
    Contains the task, the TaskResultState and the result (either the return value form the commit()
    or exception() method.
    """
    task: Task
    result: Any
    state: TaskResultState


class AbstractTask:
    """
    Abstract Base implemenation providing some commonly used basic functionality.

    It is based on reading subfolders from a root_path, which are defined by pathfilter.
    Then processing the content of these folders and writing the result in a target_path.

    The result is created in tmp-folder and is then "commited" by renaming the tmp-folder into
    the target-path, therefore providing an atomic-action (renaming) that acts as commit.

    It also provides basic implementation of "meta.inf" file, that can be stored in the target.
    The idea of the meta.inf file is, to give a hint of what already was processed from the
    root_path in a previous, step.

    For example, the meta.inf could contain a list of subfolder names that were already processed.
    Therefore, if a new subfolder appears in the root_path, the task would knwow which subfolders
    need to be process. another possibility is to store the timestamp of the data, which was
    processed (in cases, where the content of files within the subfolders in root_path changes, but
    not the subfolders themselves). Therefore, allowing to check whether a modification timestamp
    of files in the root_path is newer than the timestamp stored in the meta.inf file.

    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
        The constructor of the AbstracTask.

        Args:
            root_path: root_path of the data to be processed
            pathfilter: a pathfilter string (e.g. "*"; as defined for Path.glob()) to select the
                    subfolders, that have to be processed.
                    pathfilter could be something like "*", or "*/BS", or "something/*/BS".

                    E.g., the following root_path structure and the pathfilter "*/BS"
                    would select all "BS" "sub-subfolders" within root_path:
                    <pre>
                       <root_path>
                            2010a1.zip/BS
                            2010q1.zip/IS
                            2010q1.zip/CF
                            ...
                            2024a4.zip/BS
                            2024q4.zip/IS
                            2024q4.zip/CF
                    </pre>

            target_path: the target_path to write the results to.
        """

        self.root_path = root_path
        self.target_path = target_path
        self.filter = pathfilter

        # create a list of subfolders that have to be processed defined by the pathfilter string.
        self.filtered_paths = list(self.root_path.glob(self.filter))

        # usually, all filtered_paths have to be processed
        self.paths_to_process: List[Path] = self.filtered_paths

        # define the tmp_path
        self.tmp_path = target_path.parent / f"tmp_{target_path.name}"
        self.meta_inf_file: Path = self.target_path / "meta.inf"

        # pathfilter could be something like "*", or "*/BS", or "something/*/BS"
        # but in order to be able to fill the metainf file with the names for which "*" iterates
        # over, we need to know the position of the "*" from the end of the resulting path.
        # So if the pathfilter is just a "*" it is 0, if it is "*/BS" it would be 1
        self.star_position = self._star_position_from_end(self.filter)

    @staticmethod
    def _star_position_from_end(path: str) -> int:
        """
        Gets the position of the "*" in the provided path (counted from the end).

        Examples:
            path = "a/b/c/d/*" -> returns 0
            path = "a/b/c/*/d" -> returns 1
            path = "a/b/*/c/d" -> returns 2

        Args:
            path: path with a "*" as part

        Returns:
            the position of the "*" in the path, counted from the end.
        """

        # ignore first and last /
        if path.startswith('/'):
            path = path[1:]
        if path.endswith('/'):
            path = path[:-1]

        # Split the string by '/' to get segments
        segments = path.split('/')

        # Iterate from the end and find the first segment containing '*'
        for i, segment in enumerate(reversed(segments)):
            if '*' in segment:
                return i  # Position from the end

        # If no '*' is found, return -1 to indicate an error
        return -1

    @staticmethod
    def _get_star_position_name(path: Path, star_position: int) -> str:
        """
        Gets the name of the part where the "*" is positioned in the pathfilter-string.

        Example:
             path = "a/b/c" and star_position = 0 -> returns c
             path = "a/b/c" and star_position = 1 -> returns b
             path = "a/b/c" and star_position = 2 -> returns c

        Args:
            path: path from which the name_part at the star_position has to be returned
            star_position: position of the part which name has to be returned.

        Returns:
            str: name of the part defined by the star_position

        """
        # reverse list with [::-1]
        return path.parts[::-1][star_position]

    def read_metainf_content(self) -> List[str]:
        """
        reads the content from the meta.inf file in an existing target_path
        Returns:
            List(str): the content by line
        """
        meta_inf_content = self.meta_inf_file.read_text(encoding="utf-8")
        return meta_inf_content.split("\n")

    def exception(self, exception) -> str:
        """
        Basic implementation of the exception method.
        It deletes the temp folder and returns a "failed" message.
        """
        shutil.rmtree(self.tmp_path, ignore_errors=True)
        return f"failed {exception}"

    def has_work_todo(self) -> bool:
        """
        returns true if there is actual work to do, otherwise False.
        Can be overwritten.
        Default implementation just looks if the provided root_path has subfolders, that are
        defined by the provided pathfilter string.
        """
        return len(self.paths_to_process) > 0

    def prepare(self):
        """
        basic implementation of the prepare method. Does nothing if there is nothing to process
        or does create the tmp_folder, if processing has to be done.
        """
        if not self.has_work_todo():
            return

        self.tmp_path.mkdir(parents=True, exist_ok=False)

    def commit(self):
        """
        Basic implementation of the commit method.
        If nothing had to be done, it simply returns "success".
        If work was done, it removes an existing target_path, and overwrites it with the
        content of the tmp_path (by renaming the tmp_path to the target_path, which is an
        atomic action, which either fails, or succeeds).
        """
        if not self.has_work_todo():
            return "success"

        # Remove old content of target_path
        if self.target_path.exists():
            shutil.rmtree(self.target_path)

        # rename the tmp_path, so this is like an atomic action that either fails or succeeds.
        self.tmp_path.rename(self.target_path)
        return "success"

    def write_meta_inf(self, content: str):
        """
        writes the provided content into the the meta_inf file in the tmp-path.
        Args:
            content: content to be written
        """
        temp_meta_inf = self.tmp_path / "meta.inf"
        temp_meta_inf.write_text(data=content, encoding="utf-8")


class CheckByTimestampMergeBaseTask(AbstractTask):
    """
    This class uses the AbstractTask to implement logic that checks if files were changed within
    the root_path since the last processing.

    It can be used as a BaseClass to implement a Task, that checks for new data to be processed
    by looking at the modification timestamp of the files in the root_path.

    It does this as follows:
    - if there is no target_path yet, it will process the content in the root_path,
      write the result in the target_path together with a meta.inf file that contains
      the newest modification timestamp of all the files in the root_path.
    - if there is a target_path, then it reads the timestamp, that is stored within the target_path.
      It any of the files within the root_path has a newer modification timestamp, it will
      process the data and also update the timestamp in the meta.inf file
    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
          The constructor of the CheckByTimestampMergeBaseTask.
          Check also the documentation of the AbstractTask Constructor.
        """
        super().__init__(
            root_path=root_path,
            pathfilter=pathfilter,
            target_path=target_path,
        )

        if self.meta_inf_file.exists():
            # if the meta_inf file exists, we expect that the first row contains the
            # latest modification timestamp of all files in the root_path, that was
            # processed the last time.
            containing_values = self.read_metainf_content()
            last_processed_timestamp = float(containing_values[0])

            # go and find the current latest modification timestamp of allfiles in the root_path
            current_timestamp = get_latest_mtime(self.root_path)

            # if the current_timestamp is equal to the last_processed_timestamp,
            # it means that the data in the root_path weren't changed and therefore,
            # no processing has to be done. We mark this by setting pats_to_process to an empty list
            if current_timestamp <= last_processed_timestamp:
                self.paths_to_process = []

    def execute(self):
        """
        Basic implementation of the execute method.

        If there are "paths_to_process", what has to be done depending on "check_by_timestamp"
        being true or not.

        Returns:

        """
        if not self.has_work_todo():
            return

        self.do_execution(paths_to_process=self.paths_to_process,
                          tmp_path=self.tmp_path)

        meta_inf_content: str = str(get_latest_mtime(self.root_path))
        self.write_meta_inf(content=meta_inf_content)

    @abstractmethod
    def do_execution(self,
                     paths_to_process: List[Path],
                     tmp_path: Path):
        """
            defines the logic to be executed.
        Args:
            paths_to_process: lists of paths/folders that have to be processed
            tmp_path: path to where a result has to be written
        """


class CheckByNewSubfoldersMergeBaseTask(AbstractTask):
    """
    Implements the basic logic to track already processed data either by folder structure of the
    root-path (meaning that new data that appeared as a new subfolder in the root-path has to be
    integrated in the existing content of the target path) or by
    checking the timestamp of the latest modifications within the root-path structure (meaning
    that the content of the target-path has to be recreated with the current content of the
    root-path.

    Both scenarios use a "meta-inf" file that either contains the name of the subfolders, or the
    the timestamp of the latest processed modification.
    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
        Constructor of base task.

        Args:
            root_path: root path to read that from
            pathfilter: pathfilter string that defines which subfolders in the
                        root_path have to be selected
            target_path: path to where the results have to be written
        """
        self.all_names: Dict[str, Path]

        super().__init__(
            root_path=root_path,
            pathfilter=pathfilter,
            target_path=target_path,
        )

        # so if we have the pathfilter */BS and if we have the directories "2010q1.zip/BS",
        # "2010q2.zip/BS" in the root_path, all_names key will be 2010q1.zip, 2010q2.zip
        self.all_names = {self._get_star_position_name(path=p, star_position=self.star_position):
                              p for p in self.paths_to_process}

        if self.meta_inf_file.exists():
            containing_values = self.read_metainf_content()

            missing = set(self.all_names.keys()) - set(containing_values)
            self.paths_to_process = [self.all_names[name] for name in missing]

    def execute(self):
        """
        Basic implementation of the execute method.

        If there are "paths_to_process", what has to be done depending on "check_by_timestamp"
        being true or not.


        Returns:

        """
        if len(self.paths_to_process) == 0:
            return

        paths_to_process = self.paths_to_process.copy()

        # depending on the use case, we need to combine new data with already
        # existing data in the target.
        # therefore, we provide a list "paths_to_process" which contains subfolders that are new,
        # the processed_path (the path that contains the result of the last processing), and
        # the target_path, where we have to store the result to (this the tmp folder)
        self.do_execution(paths_to_process=paths_to_process,
                          target_path=self.target_path,
                          tmp_path=self.tmp_path)

        meta_inf_content: str = "\n".join([self._get_star_position_name(f, self.star_position)
                                           for f in self.filtered_paths])
        self.write_meta_inf(content=meta_inf_content)

    @abstractmethod
    def do_execution(self,
                     paths_to_process: List[Path],
                     target_path: Path,
                     tmp_path: Path):
        """
            defines the logic to be executed.
        Args:
            paths_to_process: lists of paths/folders that have to be processed
            target_path: the path where the result of the previous run was written
            tmp_path: target path to where a result has to be written
        """


class AbstractProcess(ABC):
    """
    Defines the Abstract process of processing tasks for a certain process.
    """

    def __init__(self,
                 execute_serial: bool = False,
                 chunksize: int = 3,
                 paralleltasks: int = 3,
                 max_tasks_per_second: int = 8):
        self.execute_serial = execute_serial
        self.chunksize = chunksize
        self.paralleltasks = paralleltasks
        self.max_tasks_per_second = max_tasks_per_second

        # since failed tasks are retried, results[FAILED] can contain multiple entries for
        # a tasks that is retried multiple times.
        self.results: Dict[TaskResultState, List[TaskResult]] = defaultdict(list)

        self.failed_tasks: List[Task] = []

    @abstractmethod
    def calculate_tasks(self) -> List[Task]:
        """
        Calculate the tasks that have to be executed for the implemented process

        Returns:
            List[Tasks] : List of the tasks to be processed.
        """

    def pre_process(self):
        """ Hook method to implement logic that is executed before the whole process is finished.
        """

    def post_process(self):
        """ Hook method to implement logic that is executed after the whole process is finished. """

    @staticmethod
    def process_task(task: Task) -> TaskResult:
        """
        execute a single task.
        """
        logger = logging.getLogger()
        try:
            task.prepare()
            task.execute()
            result = TaskResult(task=task,
                                result=task.commit(),
                                state=TaskResultState.SUCCESS)
            logger.info("Success: %s", task)
            return result
        except Exception as ex:  # pylint: disable=W0703
            # we want to catch everything here.
            logger.info("Failed: %s / %s ", task, ex)
            return TaskResult(task=task,
                              result=task.exception(exception=ex),
                              state=TaskResultState.FAILED)

    def process(self):
        """
        execute the process by executing all the tasks that need to be executed.
        The execution can happen in parallel or serial.

        There is a retry mechanism for failing tasks.

        """
        logger = logging.getLogger()
        logger.info("Starting process %s", self.__class__.__name__)

        self.pre_process()

        results_all, failed_tasks = self.do_execution()
        self.failed_tasks = failed_tasks

        for entry in results_all:
            self.results[entry.state].append(entry)

        for failed in self.failed_tasks:
            logger.warning("not able to process %s", failed)

        self.post_process()

    @abstractmethod
    def do_execution(self) -> Tuple[List[TaskResult], List[Task]]:
        """
        handle to real execution.
        """


class AbstractThreadProcess(AbstractProcess):
    """
    Uses for the parallel execution logic a Thread-Based approach.
    """

    def do_execution(self) -> Tuple[List[TaskResult], List[Task]]:
        """
        Using a thread-based executor.
        """
        executor = ThreadExecutor[Task, TaskResult, TaskResult](
            processes=self.paralleltasks,
            max_calls_per_sec=self.max_tasks_per_second,
            chunksize=self.chunksize,
            execute_serial=self.execute_serial
        )
        executor.set_get_entries_function(self.calculate_tasks)
        executor.set_process_element_function(self.process_task)
        executor.set_post_process_chunk_function(lambda x: x)  # no process_chunk for this purpose
        return executor.execute()


class AbstractProcessPoolProcess(AbstractProcess):
    """
    Uses for the parallel execution logic a Thread-Based approach.
    """

    def do_execution(self) -> Tuple[List[TaskResult], List[Task]]:
        """
        Using a process-based executor.
        """
        executor = ParallelExecutor[Task, TaskResult, TaskResult](
            processes=self.paralleltasks,
            max_calls_per_sec=self.max_tasks_per_second,
            chunksize=self.chunksize,
            execute_serial=self.execute_serial
        )
        executor.set_get_entries_function(self.calculate_tasks)
        executor.set_process_element_function(self.process_task)
        executor.set_post_process_chunk_function(lambda x: x)  # no process_chunk for this purpose
        return executor.execute()


def execute_processes(processes: List[AbstractProcess]):
    """
        Execute the list of processes in serial
    Args:
        processes (List(AbstractProcess)): List of AbstractProcesses to be executed

    """
    for process in processes:
        process.process()

Functions

def execute_processes(processes: List[AbstractProcess])

Execute the list of processes in serial

Args

processes (List(AbstractProcess)): List of AbstractProcesses to be executed

Expand source code
def execute_processes(processes: List[AbstractProcess]):
    """
        Execute the list of processes in serial
    Args:
        processes (List(AbstractProcess)): List of AbstractProcesses to be executed

    """
    for process in processes:
        process.process()

Classes

class AbstractProcess (execute_serial: bool = False, chunksize: int = 3, paralleltasks: int = 3, max_tasks_per_second: int = 8)

Defines the Abstract process of processing tasks for a certain process.

Expand source code
class AbstractProcess(ABC):
    """
    Defines the Abstract process of processing tasks for a certain process.
    """

    def __init__(self,
                 execute_serial: bool = False,
                 chunksize: int = 3,
                 paralleltasks: int = 3,
                 max_tasks_per_second: int = 8):
        self.execute_serial = execute_serial
        self.chunksize = chunksize
        self.paralleltasks = paralleltasks
        self.max_tasks_per_second = max_tasks_per_second

        # since failed tasks are retried, results[FAILED] can contain multiple entries for
        # a tasks that is retried multiple times.
        self.results: Dict[TaskResultState, List[TaskResult]] = defaultdict(list)

        self.failed_tasks: List[Task] = []

    @abstractmethod
    def calculate_tasks(self) -> List[Task]:
        """
        Calculate the tasks that have to be executed for the implemented process

        Returns:
            List[Tasks] : List of the tasks to be processed.
        """

    def pre_process(self):
        """ Hook method to implement logic that is executed before the whole process is finished.
        """

    def post_process(self):
        """ Hook method to implement logic that is executed after the whole process is finished. """

    @staticmethod
    def process_task(task: Task) -> TaskResult:
        """
        execute a single task.
        """
        logger = logging.getLogger()
        try:
            task.prepare()
            task.execute()
            result = TaskResult(task=task,
                                result=task.commit(),
                                state=TaskResultState.SUCCESS)
            logger.info("Success: %s", task)
            return result
        except Exception as ex:  # pylint: disable=W0703
            # we want to catch everything here.
            logger.info("Failed: %s / %s ", task, ex)
            return TaskResult(task=task,
                              result=task.exception(exception=ex),
                              state=TaskResultState.FAILED)

    def process(self):
        """
        execute the process by executing all the tasks that need to be executed.
        The execution can happen in parallel or serial.

        There is a retry mechanism for failing tasks.

        """
        logger = logging.getLogger()
        logger.info("Starting process %s", self.__class__.__name__)

        self.pre_process()

        results_all, failed_tasks = self.do_execution()
        self.failed_tasks = failed_tasks

        for entry in results_all:
            self.results[entry.state].append(entry)

        for failed in self.failed_tasks:
            logger.warning("not able to process %s", failed)

        self.post_process()

    @abstractmethod
    def do_execution(self) -> Tuple[List[TaskResult], List[Task]]:
        """
        handle to real execution.
        """

Ancestors

  • abc.ABC

Subclasses

Static methods

def process_task(task: Task) ‑> TaskResult

execute a single task.

Expand source code
@staticmethod
def process_task(task: Task) -> TaskResult:
    """
    execute a single task.
    """
    logger = logging.getLogger()
    try:
        task.prepare()
        task.execute()
        result = TaskResult(task=task,
                            result=task.commit(),
                            state=TaskResultState.SUCCESS)
        logger.info("Success: %s", task)
        return result
    except Exception as ex:  # pylint: disable=W0703
        # we want to catch everything here.
        logger.info("Failed: %s / %s ", task, ex)
        return TaskResult(task=task,
                          result=task.exception(exception=ex),
                          state=TaskResultState.FAILED)

Methods

def calculate_tasks(self) ‑> List[Task]

Calculate the tasks that have to be executed for the implemented process

Returns

List[Tasks]
List of the tasks to be processed.
Expand source code
@abstractmethod
def calculate_tasks(self) -> List[Task]:
    """
    Calculate the tasks that have to be executed for the implemented process

    Returns:
        List[Tasks] : List of the tasks to be processed.
    """
def do_execution(self) ‑> Tuple[List[TaskResult], List[Task]]

handle to real execution.

Expand source code
@abstractmethod
def do_execution(self) -> Tuple[List[TaskResult], List[Task]]:
    """
    handle to real execution.
    """
def post_process(self)

Hook method to implement logic that is executed after the whole process is finished.

Expand source code
def post_process(self):
    """ Hook method to implement logic that is executed after the whole process is finished. """
def pre_process(self)

Hook method to implement logic that is executed before the whole process is finished.

Expand source code
def pre_process(self):
    """ Hook method to implement logic that is executed before the whole process is finished.
    """
def process(self)

execute the process by executing all the tasks that need to be executed. The execution can happen in parallel or serial.

There is a retry mechanism for failing tasks.

Expand source code
def process(self):
    """
    execute the process by executing all the tasks that need to be executed.
    The execution can happen in parallel or serial.

    There is a retry mechanism for failing tasks.

    """
    logger = logging.getLogger()
    logger.info("Starting process %s", self.__class__.__name__)

    self.pre_process()

    results_all, failed_tasks = self.do_execution()
    self.failed_tasks = failed_tasks

    for entry in results_all:
        self.results[entry.state].append(entry)

    for failed in self.failed_tasks:
        logger.warning("not able to process %s", failed)

    self.post_process()
class AbstractProcessPoolProcess (execute_serial: bool = False, chunksize: int = 3, paralleltasks: int = 3, max_tasks_per_second: int = 8)

Uses for the parallel execution logic a Thread-Based approach.

Expand source code
class AbstractProcessPoolProcess(AbstractProcess):
    """
    Uses for the parallel execution logic a Thread-Based approach.
    """

    def do_execution(self) -> Tuple[List[TaskResult], List[Task]]:
        """
        Using a process-based executor.
        """
        executor = ParallelExecutor[Task, TaskResult, TaskResult](
            processes=self.paralleltasks,
            max_calls_per_sec=self.max_tasks_per_second,
            chunksize=self.chunksize,
            execute_serial=self.execute_serial
        )
        executor.set_get_entries_function(self.calculate_tasks)
        executor.set_process_element_function(self.process_task)
        executor.set_post_process_chunk_function(lambda x: x)  # no process_chunk for this purpose
        return executor.execute()

Ancestors

Subclasses

Methods

def do_execution(self) ‑> Tuple[List[TaskResult], List[Task]]

Using a process-based executor.

Expand source code
def do_execution(self) -> Tuple[List[TaskResult], List[Task]]:
    """
    Using a process-based executor.
    """
    executor = ParallelExecutor[Task, TaskResult, TaskResult](
        processes=self.paralleltasks,
        max_calls_per_sec=self.max_tasks_per_second,
        chunksize=self.chunksize,
        execute_serial=self.execute_serial
    )
    executor.set_get_entries_function(self.calculate_tasks)
    executor.set_process_element_function(self.process_task)
    executor.set_post_process_chunk_function(lambda x: x)  # no process_chunk for this purpose
    return executor.execute()

Inherited members

class AbstractTask (root_path: pathlib.Path, pathfilter: str, target_path: pathlib.Path)

Abstract Base implemenation providing some commonly used basic functionality.

It is based on reading subfolders from a root_path, which are defined by pathfilter. Then processing the content of these folders and writing the result in a target_path.

The result is created in tmp-folder and is then "commited" by renaming the tmp-folder into the target-path, therefore providing an atomic-action (renaming) that acts as commit.

It also provides basic implementation of "meta.inf" file, that can be stored in the target. The idea of the meta.inf file is, to give a hint of what already was processed from the root_path in a previous, step.

For example, the meta.inf could contain a list of subfolder names that were already processed. Therefore, if a new subfolder appears in the root_path, the task would knwow which subfolders need to be process. another possibility is to store the timestamp of the data, which was processed (in cases, where the content of files within the subfolders in root_path changes, but not the subfolders themselves). Therefore, allowing to check whether a modification timestamp of files in the root_path is newer than the timestamp stored in the meta.inf file.

The constructor of the AbstracTask.

Args

root_path
root_path of the data to be processed
pathfilter
a pathfilter string (e.g. ""; as defined for Path.glob()) to select the subfolders, that have to be processed. pathfilter could be something like "", or "/BS", or "something//BS".
E.g., the following root_path structure and the pathfilter "*/BS"
would select all "BS" "sub-subfolders" within root_path:
<pre>
   <root_path>
        2010a1.zip/BS
        2010q1.zip/IS
        2010q1.zip/CF
        ...
        2024a4.zip/BS
        2024q4.zip/IS
        2024q4.zip/CF
</pre>
target_path
the target_path to write the results to.
Expand source code
class AbstractTask:
    """
    Abstract Base implemenation providing some commonly used basic functionality.

    It is based on reading subfolders from a root_path, which are defined by pathfilter.
    Then processing the content of these folders and writing the result in a target_path.

    The result is created in tmp-folder and is then "commited" by renaming the tmp-folder into
    the target-path, therefore providing an atomic-action (renaming) that acts as commit.

    It also provides basic implementation of "meta.inf" file, that can be stored in the target.
    The idea of the meta.inf file is, to give a hint of what already was processed from the
    root_path in a previous, step.

    For example, the meta.inf could contain a list of subfolder names that were already processed.
    Therefore, if a new subfolder appears in the root_path, the task would knwow which subfolders
    need to be process. another possibility is to store the timestamp of the data, which was
    processed (in cases, where the content of files within the subfolders in root_path changes, but
    not the subfolders themselves). Therefore, allowing to check whether a modification timestamp
    of files in the root_path is newer than the timestamp stored in the meta.inf file.

    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
        The constructor of the AbstracTask.

        Args:
            root_path: root_path of the data to be processed
            pathfilter: a pathfilter string (e.g. "*"; as defined for Path.glob()) to select the
                    subfolders, that have to be processed.
                    pathfilter could be something like "*", or "*/BS", or "something/*/BS".

                    E.g., the following root_path structure and the pathfilter "*/BS"
                    would select all "BS" "sub-subfolders" within root_path:
                    <pre>
                       <root_path>
                            2010a1.zip/BS
                            2010q1.zip/IS
                            2010q1.zip/CF
                            ...
                            2024a4.zip/BS
                            2024q4.zip/IS
                            2024q4.zip/CF
                    </pre>

            target_path: the target_path to write the results to.
        """

        self.root_path = root_path
        self.target_path = target_path
        self.filter = pathfilter

        # create a list of subfolders that have to be processed defined by the pathfilter string.
        self.filtered_paths = list(self.root_path.glob(self.filter))

        # usually, all filtered_paths have to be processed
        self.paths_to_process: List[Path] = self.filtered_paths

        # define the tmp_path
        self.tmp_path = target_path.parent / f"tmp_{target_path.name}"
        self.meta_inf_file: Path = self.target_path / "meta.inf"

        # pathfilter could be something like "*", or "*/BS", or "something/*/BS"
        # but in order to be able to fill the metainf file with the names for which "*" iterates
        # over, we need to know the position of the "*" from the end of the resulting path.
        # So if the pathfilter is just a "*" it is 0, if it is "*/BS" it would be 1
        self.star_position = self._star_position_from_end(self.filter)

    @staticmethod
    def _star_position_from_end(path: str) -> int:
        """
        Gets the position of the "*" in the provided path (counted from the end).

        Examples:
            path = "a/b/c/d/*" -> returns 0
            path = "a/b/c/*/d" -> returns 1
            path = "a/b/*/c/d" -> returns 2

        Args:
            path: path with a "*" as part

        Returns:
            the position of the "*" in the path, counted from the end.
        """

        # ignore first and last /
        if path.startswith('/'):
            path = path[1:]
        if path.endswith('/'):
            path = path[:-1]

        # Split the string by '/' to get segments
        segments = path.split('/')

        # Iterate from the end and find the first segment containing '*'
        for i, segment in enumerate(reversed(segments)):
            if '*' in segment:
                return i  # Position from the end

        # If no '*' is found, return -1 to indicate an error
        return -1

    @staticmethod
    def _get_star_position_name(path: Path, star_position: int) -> str:
        """
        Gets the name of the part where the "*" is positioned in the pathfilter-string.

        Example:
             path = "a/b/c" and star_position = 0 -> returns c
             path = "a/b/c" and star_position = 1 -> returns b
             path = "a/b/c" and star_position = 2 -> returns c

        Args:
            path: path from which the name_part at the star_position has to be returned
            star_position: position of the part which name has to be returned.

        Returns:
            str: name of the part defined by the star_position

        """
        # reverse list with [::-1]
        return path.parts[::-1][star_position]

    def read_metainf_content(self) -> List[str]:
        """
        reads the content from the meta.inf file in an existing target_path
        Returns:
            List(str): the content by line
        """
        meta_inf_content = self.meta_inf_file.read_text(encoding="utf-8")
        return meta_inf_content.split("\n")

    def exception(self, exception) -> str:
        """
        Basic implementation of the exception method.
        It deletes the temp folder and returns a "failed" message.
        """
        shutil.rmtree(self.tmp_path, ignore_errors=True)
        return f"failed {exception}"

    def has_work_todo(self) -> bool:
        """
        returns true if there is actual work to do, otherwise False.
        Can be overwritten.
        Default implementation just looks if the provided root_path has subfolders, that are
        defined by the provided pathfilter string.
        """
        return len(self.paths_to_process) > 0

    def prepare(self):
        """
        basic implementation of the prepare method. Does nothing if there is nothing to process
        or does create the tmp_folder, if processing has to be done.
        """
        if not self.has_work_todo():
            return

        self.tmp_path.mkdir(parents=True, exist_ok=False)

    def commit(self):
        """
        Basic implementation of the commit method.
        If nothing had to be done, it simply returns "success".
        If work was done, it removes an existing target_path, and overwrites it with the
        content of the tmp_path (by renaming the tmp_path to the target_path, which is an
        atomic action, which either fails, or succeeds).
        """
        if not self.has_work_todo():
            return "success"

        # Remove old content of target_path
        if self.target_path.exists():
            shutil.rmtree(self.target_path)

        # rename the tmp_path, so this is like an atomic action that either fails or succeeds.
        self.tmp_path.rename(self.target_path)
        return "success"

    def write_meta_inf(self, content: str):
        """
        writes the provided content into the the meta_inf file in the tmp-path.
        Args:
            content: content to be written
        """
        temp_meta_inf = self.tmp_path / "meta.inf"
        temp_meta_inf.write_text(data=content, encoding="utf-8")

Subclasses

Methods

def commit(self)

Basic implementation of the commit method. If nothing had to be done, it simply returns "success". If work was done, it removes an existing target_path, and overwrites it with the content of the tmp_path (by renaming the tmp_path to the target_path, which is an atomic action, which either fails, or succeeds).

Expand source code
def commit(self):
    """
    Basic implementation of the commit method.
    If nothing had to be done, it simply returns "success".
    If work was done, it removes an existing target_path, and overwrites it with the
    content of the tmp_path (by renaming the tmp_path to the target_path, which is an
    atomic action, which either fails, or succeeds).
    """
    if not self.has_work_todo():
        return "success"

    # Remove old content of target_path
    if self.target_path.exists():
        shutil.rmtree(self.target_path)

    # rename the tmp_path, so this is like an atomic action that either fails or succeeds.
    self.tmp_path.rename(self.target_path)
    return "success"
def exception(self, exception) ‑> str

Basic implementation of the exception method. It deletes the temp folder and returns a "failed" message.

Expand source code
def exception(self, exception) -> str:
    """
    Basic implementation of the exception method.
    It deletes the temp folder and returns a "failed" message.
    """
    shutil.rmtree(self.tmp_path, ignore_errors=True)
    return f"failed {exception}"
def has_work_todo(self) ‑> bool

returns true if there is actual work to do, otherwise False. Can be overwritten. Default implementation just looks if the provided root_path has subfolders, that are defined by the provided pathfilter string.

Expand source code
def has_work_todo(self) -> bool:
    """
    returns true if there is actual work to do, otherwise False.
    Can be overwritten.
    Default implementation just looks if the provided root_path has subfolders, that are
    defined by the provided pathfilter string.
    """
    return len(self.paths_to_process) > 0
def prepare(self)

basic implementation of the prepare method. Does nothing if there is nothing to process or does create the tmp_folder, if processing has to be done.

Expand source code
def prepare(self):
    """
    basic implementation of the prepare method. Does nothing if there is nothing to process
    or does create the tmp_folder, if processing has to be done.
    """
    if not self.has_work_todo():
        return

    self.tmp_path.mkdir(parents=True, exist_ok=False)
def read_metainf_content(self) ‑> List[str]

reads the content from the meta.inf file in an existing target_path

Returns

List(str): the content by line

Expand source code
def read_metainf_content(self) -> List[str]:
    """
    reads the content from the meta.inf file in an existing target_path
    Returns:
        List(str): the content by line
    """
    meta_inf_content = self.meta_inf_file.read_text(encoding="utf-8")
    return meta_inf_content.split("\n")
def write_meta_inf(self, content: str)

writes the provided content into the the meta_inf file in the tmp-path.

Args

content
content to be written
Expand source code
def write_meta_inf(self, content: str):
    """
    writes the provided content into the the meta_inf file in the tmp-path.
    Args:
        content: content to be written
    """
    temp_meta_inf = self.tmp_path / "meta.inf"
    temp_meta_inf.write_text(data=content, encoding="utf-8")
class AbstractThreadProcess (execute_serial: bool = False, chunksize: int = 3, paralleltasks: int = 3, max_tasks_per_second: int = 8)

Uses for the parallel execution logic a Thread-Based approach.

Expand source code
class AbstractThreadProcess(AbstractProcess):
    """
    Uses for the parallel execution logic a Thread-Based approach.
    """

    def do_execution(self) -> Tuple[List[TaskResult], List[Task]]:
        """
        Using a thread-based executor.
        """
        executor = ThreadExecutor[Task, TaskResult, TaskResult](
            processes=self.paralleltasks,
            max_calls_per_sec=self.max_tasks_per_second,
            chunksize=self.chunksize,
            execute_serial=self.execute_serial
        )
        executor.set_get_entries_function(self.calculate_tasks)
        executor.set_process_element_function(self.process_task)
        executor.set_post_process_chunk_function(lambda x: x)  # no process_chunk for this purpose
        return executor.execute()

Ancestors

Subclasses

Methods

def do_execution(self) ‑> Tuple[List[TaskResult], List[Task]]

Using a thread-based executor.

Expand source code
def do_execution(self) -> Tuple[List[TaskResult], List[Task]]:
    """
    Using a thread-based executor.
    """
    executor = ThreadExecutor[Task, TaskResult, TaskResult](
        processes=self.paralleltasks,
        max_calls_per_sec=self.max_tasks_per_second,
        chunksize=self.chunksize,
        execute_serial=self.execute_serial
    )
    executor.set_get_entries_function(self.calculate_tasks)
    executor.set_process_element_function(self.process_task)
    executor.set_post_process_chunk_function(lambda x: x)  # no process_chunk for this purpose
    return executor.execute()

Inherited members

class CheckByNewSubfoldersMergeBaseTask (root_path: pathlib.Path, pathfilter: str, target_path: pathlib.Path)

Implements the basic logic to track already processed data either by folder structure of the root-path (meaning that new data that appeared as a new subfolder in the root-path has to be integrated in the existing content of the target path) or by checking the timestamp of the latest modifications within the root-path structure (meaning that the content of the target-path has to be recreated with the current content of the root-path.

Both scenarios use a "meta-inf" file that either contains the name of the subfolders, or the the timestamp of the latest processed modification.

Constructor of base task.

Args

root_path
root path to read that from
pathfilter
pathfilter string that defines which subfolders in the root_path have to be selected
target_path
path to where the results have to be written
Expand source code
class CheckByNewSubfoldersMergeBaseTask(AbstractTask):
    """
    Implements the basic logic to track already processed data either by folder structure of the
    root-path (meaning that new data that appeared as a new subfolder in the root-path has to be
    integrated in the existing content of the target path) or by
    checking the timestamp of the latest modifications within the root-path structure (meaning
    that the content of the target-path has to be recreated with the current content of the
    root-path.

    Both scenarios use a "meta-inf" file that either contains the name of the subfolders, or the
    the timestamp of the latest processed modification.
    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
        Constructor of base task.

        Args:
            root_path: root path to read that from
            pathfilter: pathfilter string that defines which subfolders in the
                        root_path have to be selected
            target_path: path to where the results have to be written
        """
        self.all_names: Dict[str, Path]

        super().__init__(
            root_path=root_path,
            pathfilter=pathfilter,
            target_path=target_path,
        )

        # so if we have the pathfilter */BS and if we have the directories "2010q1.zip/BS",
        # "2010q2.zip/BS" in the root_path, all_names key will be 2010q1.zip, 2010q2.zip
        self.all_names = {self._get_star_position_name(path=p, star_position=self.star_position):
                              p for p in self.paths_to_process}

        if self.meta_inf_file.exists():
            containing_values = self.read_metainf_content()

            missing = set(self.all_names.keys()) - set(containing_values)
            self.paths_to_process = [self.all_names[name] for name in missing]

    def execute(self):
        """
        Basic implementation of the execute method.

        If there are "paths_to_process", what has to be done depending on "check_by_timestamp"
        being true or not.


        Returns:

        """
        if len(self.paths_to_process) == 0:
            return

        paths_to_process = self.paths_to_process.copy()

        # depending on the use case, we need to combine new data with already
        # existing data in the target.
        # therefore, we provide a list "paths_to_process" which contains subfolders that are new,
        # the processed_path (the path that contains the result of the last processing), and
        # the target_path, where we have to store the result to (this the tmp folder)
        self.do_execution(paths_to_process=paths_to_process,
                          target_path=self.target_path,
                          tmp_path=self.tmp_path)

        meta_inf_content: str = "\n".join([self._get_star_position_name(f, self.star_position)
                                           for f in self.filtered_paths])
        self.write_meta_inf(content=meta_inf_content)

    @abstractmethod
    def do_execution(self,
                     paths_to_process: List[Path],
                     target_path: Path,
                     tmp_path: Path):
        """
            defines the logic to be executed.
        Args:
            paths_to_process: lists of paths/folders that have to be processed
            target_path: the path where the result of the previous run was written
            tmp_path: target path to where a result has to be written
        """

Ancestors

Subclasses

Methods

def do_execution(self, paths_to_process: List[pathlib.Path], target_path: pathlib.Path, tmp_path: pathlib.Path)

defines the logic to be executed.

Args

paths_to_process
lists of paths/folders that have to be processed
target_path
the path where the result of the previous run was written
tmp_path
target path to where a result has to be written
Expand source code
@abstractmethod
def do_execution(self,
                 paths_to_process: List[Path],
                 target_path: Path,
                 tmp_path: Path):
    """
        defines the logic to be executed.
    Args:
        paths_to_process: lists of paths/folders that have to be processed
        target_path: the path where the result of the previous run was written
        tmp_path: target path to where a result has to be written
    """
def execute(self)

Basic implementation of the execute method.

If there are "paths_to_process", what has to be done depending on "check_by_timestamp" being true or not.

Returns:

Expand source code
def execute(self):
    """
    Basic implementation of the execute method.

    If there are "paths_to_process", what has to be done depending on "check_by_timestamp"
    being true or not.


    Returns:

    """
    if len(self.paths_to_process) == 0:
        return

    paths_to_process = self.paths_to_process.copy()

    # depending on the use case, we need to combine new data with already
    # existing data in the target.
    # therefore, we provide a list "paths_to_process" which contains subfolders that are new,
    # the processed_path (the path that contains the result of the last processing), and
    # the target_path, where we have to store the result to (this the tmp folder)
    self.do_execution(paths_to_process=paths_to_process,
                      target_path=self.target_path,
                      tmp_path=self.tmp_path)

    meta_inf_content: str = "\n".join([self._get_star_position_name(f, self.star_position)
                                       for f in self.filtered_paths])
    self.write_meta_inf(content=meta_inf_content)

Inherited members

class CheckByTimestampMergeBaseTask (root_path: pathlib.Path, pathfilter: str, target_path: pathlib.Path)

This class uses the AbstractTask to implement logic that checks if files were changed within the root_path since the last processing.

It can be used as a BaseClass to implement a Task, that checks for new data to be processed by looking at the modification timestamp of the files in the root_path.

It does this as follows: - if there is no target_path yet, it will process the content in the root_path, write the result in the target_path together with a meta.inf file that contains the newest modification timestamp of all the files in the root_path. - if there is a target_path, then it reads the timestamp, that is stored within the target_path. It any of the files within the root_path has a newer modification timestamp, it will process the data and also update the timestamp in the meta.inf file

The constructor of the CheckByTimestampMergeBaseTask. Check also the documentation of the AbstractTask Constructor.

Expand source code
class CheckByTimestampMergeBaseTask(AbstractTask):
    """
    This class uses the AbstractTask to implement logic that checks if files were changed within
    the root_path since the last processing.

    It can be used as a BaseClass to implement a Task, that checks for new data to be processed
    by looking at the modification timestamp of the files in the root_path.

    It does this as follows:
    - if there is no target_path yet, it will process the content in the root_path,
      write the result in the target_path together with a meta.inf file that contains
      the newest modification timestamp of all the files in the root_path.
    - if there is a target_path, then it reads the timestamp, that is stored within the target_path.
      It any of the files within the root_path has a newer modification timestamp, it will
      process the data and also update the timestamp in the meta.inf file
    """

    def __init__(self,
                 root_path: Path,
                 pathfilter: str,
                 target_path: Path):
        """
          The constructor of the CheckByTimestampMergeBaseTask.
          Check also the documentation of the AbstractTask Constructor.
        """
        super().__init__(
            root_path=root_path,
            pathfilter=pathfilter,
            target_path=target_path,
        )

        if self.meta_inf_file.exists():
            # if the meta_inf file exists, we expect that the first row contains the
            # latest modification timestamp of all files in the root_path, that was
            # processed the last time.
            containing_values = self.read_metainf_content()
            last_processed_timestamp = float(containing_values[0])

            # go and find the current latest modification timestamp of allfiles in the root_path
            current_timestamp = get_latest_mtime(self.root_path)

            # if the current_timestamp is equal to the last_processed_timestamp,
            # it means that the data in the root_path weren't changed and therefore,
            # no processing has to be done. We mark this by setting pats_to_process to an empty list
            if current_timestamp <= last_processed_timestamp:
                self.paths_to_process = []

    def execute(self):
        """
        Basic implementation of the execute method.

        If there are "paths_to_process", what has to be done depending on "check_by_timestamp"
        being true or not.

        Returns:

        """
        if not self.has_work_todo():
            return

        self.do_execution(paths_to_process=self.paths_to_process,
                          tmp_path=self.tmp_path)

        meta_inf_content: str = str(get_latest_mtime(self.root_path))
        self.write_meta_inf(content=meta_inf_content)

    @abstractmethod
    def do_execution(self,
                     paths_to_process: List[Path],
                     tmp_path: Path):
        """
            defines the logic to be executed.
        Args:
            paths_to_process: lists of paths/folders that have to be processed
            tmp_path: path to where a result has to be written
        """

Ancestors

Subclasses

Methods

def do_execution(self, paths_to_process: List[pathlib.Path], tmp_path: pathlib.Path)

defines the logic to be executed.

Args

paths_to_process
lists of paths/folders that have to be processed
tmp_path
path to where a result has to be written
Expand source code
@abstractmethod
def do_execution(self,
                 paths_to_process: List[Path],
                 tmp_path: Path):
    """
        defines the logic to be executed.
    Args:
        paths_to_process: lists of paths/folders that have to be processed
        tmp_path: path to where a result has to be written
    """
def execute(self)

Basic implementation of the execute method.

If there are "paths_to_process", what has to be done depending on "check_by_timestamp" being true or not.

Returns:

Expand source code
def execute(self):
    """
    Basic implementation of the execute method.

    If there are "paths_to_process", what has to be done depending on "check_by_timestamp"
    being true or not.

    Returns:

    """
    if not self.has_work_todo():
        return

    self.do_execution(paths_to_process=self.paths_to_process,
                      tmp_path=self.tmp_path)

    meta_inf_content: str = str(get_latest_mtime(self.root_path))
    self.write_meta_inf(content=meta_inf_content)

Inherited members

class Task (*args, **kwargs)

Task interface.

Expand source code
class Task(Protocol):
    """
    Task interface.
    """

    def prepare(self):
        """ Prepare everything to execute the task.
            E.g., creation or clearing a directory. """

    def execute(self):
        """ Execution the task. """

    def commit(self) -> Any:
        """ Commit the task if the execution method is not "self-commiting". E.g.,
         If you do some file processing in the execute-method,
         but want to update a state in a table,
         you could do the update of the state in the commit method.
         """

    def exception(self, exception) -> Any:
        """ Handle the exception. """

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

def commit(self) ‑> Any

Commit the task if the execution method is not "self-commiting". E.g., If you do some file processing in the execute-method, but want to update a state in a table, you could do the update of the state in the commit method.

Expand source code
def commit(self) -> Any:
    """ Commit the task if the execution method is not "self-commiting". E.g.,
     If you do some file processing in the execute-method,
     but want to update a state in a table,
     you could do the update of the state in the commit method.
     """
def exception(self, exception) ‑> Any

Handle the exception.

Expand source code
def exception(self, exception) -> Any:
    """ Handle the exception. """
def execute(self)

Execution the task.

Expand source code
def execute(self):
    """ Execution the task. """
def prepare(self)

Prepare everything to execute the task. E.g., creation or clearing a directory.

Expand source code
def prepare(self):
    """ Prepare everything to execute the task.
        E.g., creation or clearing a directory. """
class TaskResult (task: Task, result: Any, state: TaskResultState)

Dataclass containing the result of a task. Contains the task, the TaskResultState and the result (either the return value form the commit() or exception() method.

Expand source code
class TaskResult:
    """
    Dataclass containing the result of a task.
    Contains the task, the TaskResultState and the result (either the return value form the commit()
    or exception() method.
    """
    task: Task
    result: Any
    state: TaskResultState

Class variables

var result : Any
var stateTaskResultState
var taskTask
class TaskResultState (value, names=None, *, module=None, qualname=None, type=None, start=1)

Enum defining possible ResultStates of one task.

Expand source code
class TaskResultState(Enum):
    """
    Enum defining possible ResultStates of one task.
    """
    SUCCESS = 1
    FAILED = 2

Ancestors

  • enum.Enum

Class variables

var FAILED
var SUCCESS