Module secfsdstools.c_automation.automation_utils

Util function that are mainly used in the context of automation.

Expand source code
"""
Util function that are mainly used in the context of automation.
"""
import os
import shutil
from pathlib import Path
from typing import Optional, List

from secfsdstools.a_utils.fileutils import get_directories_in_directory


def delete_temp_folders(root_path: Path, temp_prefix: str = "tmp"):
    """
    Remove any existing folders starting with the tmp_prefix in the root_path
     (these are generally folders containing data of tasks that failed)).

    """
    dirs_in_filter_dir = get_directories_in_directory(str(root_path))

    tmp_dirs = [d for d in dirs_in_filter_dir if d.startswith(temp_prefix)]

    for tmp_dir in tmp_dirs:
        file_path = root_path / tmp_dir
        shutil.rmtree(file_path, ignore_errors=True)


def get_latest_mtime(root_path: Path, skip: Optional[List[str]] = None) -> float:
    """
    Find the latest timestamp at which an element in the folder structure was changed
    Args:
        root_path: root folder

    Returns:
        the latest timestamp of a folder or file within the "folder" as float value.

    """
    if skip is None:
        skip = []

    latest_mtime = 0

    for dirpath, dirnames, filenames in os.walk(root_path):
        # Check the modification timestamp of files
        filenames = list(set(filenames) - set(skip))
        for filename in filenames:
            file_path = Path(dirpath) / filename
            mtime = file_path.stat().st_mtime  # modification timestamp of the file
            latest_mtime = max(latest_mtime, mtime)

        # Check the modification timestamp of subfolders
        for dirname in dirnames:
            dir_path = Path(dirpath) / dirname
            mtime = dir_path.stat().st_mtime  # modification timestamp of the folder
            latest_mtime = max(latest_mtime, mtime)

    return latest_mtime

Functions

def delete_temp_folders(root_path: pathlib.Path, temp_prefix: str = 'tmp')

Remove any existing folders starting with the tmp_prefix in the root_path (these are generally folders containing data of tasks that failed)).

Expand source code
def delete_temp_folders(root_path: Path, temp_prefix: str = "tmp"):
    """
    Remove any existing folders starting with the tmp_prefix in the root_path
     (these are generally folders containing data of tasks that failed)).

    """
    dirs_in_filter_dir = get_directories_in_directory(str(root_path))

    tmp_dirs = [d for d in dirs_in_filter_dir if d.startswith(temp_prefix)]

    for tmp_dir in tmp_dirs:
        file_path = root_path / tmp_dir
        shutil.rmtree(file_path, ignore_errors=True)
def get_latest_mtime(root_path: pathlib.Path, skip: Optional[List[str]] = None) ‑> float

Find the latest timestamp at which an element in the folder structure was changed

Args

root_path
root folder

Returns

the latest timestamp of a folder or file within the "folder" as float value.

Expand source code
def get_latest_mtime(root_path: Path, skip: Optional[List[str]] = None) -> float:
    """
    Find the latest timestamp at which an element in the folder structure was changed
    Args:
        root_path: root folder

    Returns:
        the latest timestamp of a folder or file within the "folder" as float value.

    """
    if skip is None:
        skip = []

    latest_mtime = 0

    for dirpath, dirnames, filenames in os.walk(root_path):
        # Check the modification timestamp of files
        filenames = list(set(filenames) - set(skip))
        for filename in filenames:
            file_path = Path(dirpath) / filename
            mtime = file_path.stat().st_mtime  # modification timestamp of the file
            latest_mtime = max(latest_mtime, mtime)

        # Check the modification timestamp of subfolders
        for dirname in dirnames:
            dir_path = Path(dirpath) / dirname
            mtime = dir_path.stat().st_mtime  # modification timestamp of the folder
            latest_mtime = max(latest_mtime, mtime)

    return latest_mtime