Module secfsdstools.a_utils.fileutils
helper utils handling compressed files.
Expand source code
"""
helper utils handling compressed files.
"""
import glob
import logging
import os
import zipfile
from pathlib import Path
from typing import List, Optional, Dict
import pandas as pd
import pyarrow.parquet as pq
from secfsdstools.a_utils.constants import PA_SCHEMA_MAP
LOGGER = logging.getLogger(__name__)
def concat_parquet_files(input_files: List[str], output_file: str):
"""
Merges multiple Parquet files with identical columns into a single Parquet file.
Empty or non-existing files are ignored to optimize memory usage.
Args:
input_files (list of str): List of Parquet file paths to merge.
output_file (str): Path to the output merged Parquet file.
"""
file_type = os.path.basename(output_file)
file_type = file_type.replace(".parquet", "")
schema = PA_SCHEMA_MAP[file_type]
writer = None # Writer is created only when a non-empty file is found
for file in input_files:
if not os.path.exists(file) or os.path.getsize(file) == 0:
print(f"Skipping empty file: {file}")
continue # Ignore empty files
# Read Parquet file
table = pq.read_table(file, columns=schema.names)
# Ensure the table conforms to the fixed schema
table = table.cast(schema)
if writer is None:
# Create writer with predefined schema
writer = pq.ParquetWriter(output_file, schema)
writer.write_table(table)
if writer:
writer.close()
else:
logging.info("only non-empty files were provided - skipping creation of output file")
def check_dir(target_path: str):
"""
checks if the path exists and if it is empty.
if it doesn't exist, it is created.
"""
target_p = Path(target_path)
if not target_p.exists():
target_p.mkdir(parents=True, exist_ok=True)
if len(os.listdir(target_path)) > 0:
raise ValueError(f"the target_path {target_path} is not empty")
def get_filenames_in_directory(filter_string: str) -> List[str]:
"""
returns a list with files matching the pathfilter.
the pathfilter can also contain a folder structure.
Returns:
List[str]: list files in the directory
"""
zip_list: List[str] = glob.glob(filter_string)
return [os.path.basename(x) for x in zip_list]
def get_directories_in_directory(directory: str) -> List[str]:
"""
returns a list with the subdirectory in a directory.
Returns:
List[str]: list subdirectories in the directory
"""
if not os.path.exists(directory):
return []
subdirectories: List[str] = [
entry.name for entry in os.scandir(directory) if entry.is_dir()
]
return subdirectories
def read_df_from_file_in_zip(zip_file: str, file_to_extract: str,
dtype: Optional[Dict[str, object]] = None,
usecols: Optional[List[str]] = None, **kwargs) -> pd.DataFrame:
"""
reads the content of a file inside a zip file directly into dataframe
Args:
zip_file (str): the zip file containing the data file
file_to_extract (str): the file with the data
dtype (Dict[str, object], optional, None): column type array or None
usecols (List[str], optional, None): list with all the columns
that should be read or None
Returns:
pd.DataFrame: the pandas dataframe
"""
with zipfile.ZipFile(zip_file, "r") as zip_fp:
file = Path(file_to_extract).name
return pd.read_csv(zip_fp.open(file), header=0, delimiter="\t",
dtype=dtype, usecols=usecols, **kwargs)
def read_content_from_file_in_zip(zip_file: str, file_to_extract: str) -> str:
"""
reads the text content of a file inside a zip file
Args:
zip_file (str): the zip file containing the data file
file_to_extract (str): the file with the data
Returns:
str: the content as string
"""
with zipfile.ZipFile(zip_file, "r") as zip_fp:
file = Path(file_to_extract).name
return zip_fp.read(file).decode("utf-8")
def write_content_to_zip(content: str, filename: str) -> str:
"""
write the content str into the zip file. compression is set to zipfile.ZIP_DEFLATED
Args:
content (str): the content that should be written into the file
filename (str): string name of the target zipfile, without the ending ".zip"
Returns:
str: path to the zipfile that was ritten
"""
zip_filename = f"{filename}.zip"
with zipfile.ZipFile(zip_filename, mode="w", compression=zipfile.ZIP_DEFLATED) as zf_fp:
file = Path(filename).name
zf_fp.writestr(file, content)
return zip_filename
def read_content_from_zip(filename: str) -> str:
"""
returns the content of the provided zipfile (ending ".zip)
Args:
filename (str): string name of the target zipfile, without the ending ".zip"
Returns:
str: the content of a zipfile
"""
with zipfile.ZipFile(f"{filename}.zip", mode="r") as zf_fp:
file = Path(filename).name
return zf_fp.read(file).decode("utf-8")
Functions
def check_dir(target_path: str)
-
checks if the path exists and if it is empty. if it doesn't exist, it is created.
Expand source code
def check_dir(target_path: str): """ checks if the path exists and if it is empty. if it doesn't exist, it is created. """ target_p = Path(target_path) if not target_p.exists(): target_p.mkdir(parents=True, exist_ok=True) if len(os.listdir(target_path)) > 0: raise ValueError(f"the target_path {target_path} is not empty")
def concat_parquet_files(input_files: List[str], output_file: str)
-
Merges multiple Parquet files with identical columns into a single Parquet file. Empty or non-existing files are ignored to optimize memory usage.
Args
input_files
:list
ofstr
- List of Parquet file paths to merge.
output_file
:str
- Path to the output merged Parquet file.
Expand source code
def concat_parquet_files(input_files: List[str], output_file: str): """ Merges multiple Parquet files with identical columns into a single Parquet file. Empty or non-existing files are ignored to optimize memory usage. Args: input_files (list of str): List of Parquet file paths to merge. output_file (str): Path to the output merged Parquet file. """ file_type = os.path.basename(output_file) file_type = file_type.replace(".parquet", "") schema = PA_SCHEMA_MAP[file_type] writer = None # Writer is created only when a non-empty file is found for file in input_files: if not os.path.exists(file) or os.path.getsize(file) == 0: print(f"Skipping empty file: {file}") continue # Ignore empty files # Read Parquet file table = pq.read_table(file, columns=schema.names) # Ensure the table conforms to the fixed schema table = table.cast(schema) if writer is None: # Create writer with predefined schema writer = pq.ParquetWriter(output_file, schema) writer.write_table(table) if writer: writer.close() else: logging.info("only non-empty files were provided - skipping creation of output file")
def get_directories_in_directory(directory: str) ‑> List[str]
-
returns a list with the subdirectory in a directory.
Returns
List[str]
- list subdirectories in the directory
Expand source code
def get_directories_in_directory(directory: str) -> List[str]: """ returns a list with the subdirectory in a directory. Returns: List[str]: list subdirectories in the directory """ if not os.path.exists(directory): return [] subdirectories: List[str] = [ entry.name for entry in os.scandir(directory) if entry.is_dir() ] return subdirectories
def get_filenames_in_directory(filter_string: str) ‑> List[str]
-
returns a list with files matching the pathfilter. the pathfilter can also contain a folder structure.
Returns
List[str]
- list files in the directory
Expand source code
def get_filenames_in_directory(filter_string: str) -> List[str]: """ returns a list with files matching the pathfilter. the pathfilter can also contain a folder structure. Returns: List[str]: list files in the directory """ zip_list: List[str] = glob.glob(filter_string) return [os.path.basename(x) for x in zip_list]
def read_content_from_file_in_zip(zip_file: str, file_to_extract: str) ‑> str
-
reads the text content of a file inside a zip file
Args
zip_file
:str
- the zip file containing the data file
file_to_extract
:str
- the file with the data
Returns
str
- the content as string
Expand source code
def read_content_from_file_in_zip(zip_file: str, file_to_extract: str) -> str: """ reads the text content of a file inside a zip file Args: zip_file (str): the zip file containing the data file file_to_extract (str): the file with the data Returns: str: the content as string """ with zipfile.ZipFile(zip_file, "r") as zip_fp: file = Path(file_to_extract).name return zip_fp.read(file).decode("utf-8")
def read_content_from_zip(filename: str) ‑> str
-
returns the content of the provided zipfile (ending ".zip)
Args
filename
:str
- string name of the target zipfile, without the ending ".zip"
Returns
str
- the content of a zipfile
Expand source code
def read_content_from_zip(filename: str) -> str: """ returns the content of the provided zipfile (ending ".zip) Args: filename (str): string name of the target zipfile, without the ending ".zip" Returns: str: the content of a zipfile """ with zipfile.ZipFile(f"{filename}.zip", mode="r") as zf_fp: file = Path(filename).name return zf_fp.read(file).decode("utf-8")
def read_df_from_file_in_zip(zip_file: str, file_to_extract: str, dtype: Optional[Dict[str, object]] = None, usecols: Optional[List[str]] = None, **kwargs) ‑> pandas.core.frame.DataFrame
-
reads the content of a file inside a zip file directly into dataframe
Args
zip_file
:str
- the zip file containing the data file
file_to_extract
:str
- the file with the data
dtype
:Dict[str, object]
, optional, None
- column type array or None
usecols
:List[str]
, optional, None
- list with all the columns
that should be read or None
Returns
pd.DataFrame
- the pandas dataframe
Expand source code
def read_df_from_file_in_zip(zip_file: str, file_to_extract: str, dtype: Optional[Dict[str, object]] = None, usecols: Optional[List[str]] = None, **kwargs) -> pd.DataFrame: """ reads the content of a file inside a zip file directly into dataframe Args: zip_file (str): the zip file containing the data file file_to_extract (str): the file with the data dtype (Dict[str, object], optional, None): column type array or None usecols (List[str], optional, None): list with all the columns that should be read or None Returns: pd.DataFrame: the pandas dataframe """ with zipfile.ZipFile(zip_file, "r") as zip_fp: file = Path(file_to_extract).name return pd.read_csv(zip_fp.open(file), header=0, delimiter="\t", dtype=dtype, usecols=usecols, **kwargs)
def write_content_to_zip(content: str, filename: str) ‑> str
-
write the content str into the zip file. compression is set to zipfile.ZIP_DEFLATED
Args
content
:str
- the content that should be written into the file
filename
:str
- string name of the target zipfile, without the ending ".zip"
Returns
str
- path to the zipfile that was ritten
Expand source code
def write_content_to_zip(content: str, filename: str) -> str: """ write the content str into the zip file. compression is set to zipfile.ZIP_DEFLATED Args: content (str): the content that should be written into the file filename (str): string name of the target zipfile, without the ending ".zip" Returns: str: path to the zipfile that was ritten """ zip_filename = f"{filename}.zip" with zipfile.ZipFile(zip_filename, mode="w", compression=zipfile.ZIP_DEFLATED) as zf_fp: file = Path(filename).name zf_fp.writestr(file, content) return zip_filename