Module secfsdstools.c_automation.automation_utils
Util function that are mainly used in the context of automation.
Expand source code
"""
Util function that are mainly used in the context of automation.
"""
import os
import shutil
from pathlib import Path
from typing import Optional, List
from secfsdstools.a_utils.fileutils import get_directories_in_directory
def delete_temp_folders(root_path: Path, temp_prefix: str = "tmp"):
"""
Remove any existing folders starting with the tmp_prefix in the root_path
(these are generally folders containing data of tasks that failed)).
"""
dirs_in_filter_dir = get_directories_in_directory(str(root_path))
tmp_dirs = [d for d in dirs_in_filter_dir if d.startswith(temp_prefix)]
for tmp_dir in tmp_dirs:
file_path = root_path / tmp_dir
shutil.rmtree(file_path, ignore_errors=True)
def get_latest_mtime(root_path: Path, skip: Optional[List[str]] = None) -> float:
"""
Find the latest timestamp at which an element in the folder structure was changed
Args:
root_path: root folder
Returns:
the latest timestamp of a folder or file within the "folder" as float value.
"""
if skip is None:
skip = []
latest_mtime = 0
for dirpath, dirnames, filenames in os.walk(root_path):
# Check the modification timestamp of files
filenames = list(set(filenames) - set(skip))
for filename in filenames:
file_path = Path(dirpath) / filename
mtime = file_path.stat().st_mtime # modification timestamp of the file
latest_mtime = max(latest_mtime, mtime)
# Check the modification timestamp of subfolders
for dirname in dirnames:
dir_path = Path(dirpath) / dirname
mtime = dir_path.stat().st_mtime # modification timestamp of the folder
latest_mtime = max(latest_mtime, mtime)
return latest_mtime
Functions
def delete_temp_folders(root_path: pathlib.Path, temp_prefix: str = 'tmp')
-
Remove any existing folders starting with the tmp_prefix in the root_path (these are generally folders containing data of tasks that failed)).
Expand source code
def delete_temp_folders(root_path: Path, temp_prefix: str = "tmp"): """ Remove any existing folders starting with the tmp_prefix in the root_path (these are generally folders containing data of tasks that failed)). """ dirs_in_filter_dir = get_directories_in_directory(str(root_path)) tmp_dirs = [d for d in dirs_in_filter_dir if d.startswith(temp_prefix)] for tmp_dir in tmp_dirs: file_path = root_path / tmp_dir shutil.rmtree(file_path, ignore_errors=True)
def get_latest_mtime(root_path: pathlib.Path, skip: Optional[List[str]] = None) ‑> float
-
Find the latest timestamp at which an element in the folder structure was changed
Args
root_path
- root folder
Returns
the latest timestamp of a folder or file within the "folder" as float value.
Expand source code
def get_latest_mtime(root_path: Path, skip: Optional[List[str]] = None) -> float: """ Find the latest timestamp at which an element in the folder structure was changed Args: root_path: root folder Returns: the latest timestamp of a folder or file within the "folder" as float value. """ if skip is None: skip = [] latest_mtime = 0 for dirpath, dirnames, filenames in os.walk(root_path): # Check the modification timestamp of files filenames = list(set(filenames) - set(skip)) for filename in filenames: file_path = Path(dirpath) / filename mtime = file_path.stat().st_mtime # modification timestamp of the file latest_mtime = max(latest_mtime, mtime) # Check the modification timestamp of subfolders for dirname in dirnames: dir_path = Path(dirpath) / dirname mtime = dir_path.stat().st_mtime # modification timestamp of the folder latest_mtime = max(latest_mtime, mtime) return latest_mtime