Module secfsdstools.c_transform.toparquettransforming_process
Transform zip files to parquet format
Expand source code
"""Transform zip files to parquet format"""
import contextlib
import glob
import logging
import os
import shutil
from pathlib import Path
from typing import List
from secfsdstools.a_utils.constants import SUB_TXT, PRE_TXT, NUM_TXT, NUM_DTYPE, PRE_DTYPE, \
SUB_DTYPE
from secfsdstools.a_utils.fileutils import get_directories_in_directory, \
read_df_from_file_in_zip
from secfsdstools.c_automation.task_framework import AbstractProcessPoolProcess
LOGGER = logging.getLogger(__name__)
class ToParquetTransformTask:
"""
Transforms a zip file containing csv files to a folder with parquet files.
"""
def __init__(self,
zip_file_path: str,
parquet_dir: str,
file_type: str,
keep_zip_files: bool):
"""
Constructor.
Args:
zip_file_path: path to the zipfile that must be transformed
parquet_dir: target base directory for the parguet files
file_type: file_type, either 'quarter' or 'daily' used to define the
subfolder in the parquet dir
keep_zip_files: flag that indicates whether the zipfiles should be deleted after
successful transformation
"""
self.zip_file_name = os.path.basename(zip_file_path)
self.zip_file_path = zip_file_path
self.parquet_dir = parquet_dir
self.file_type = file_type
self.keep_zip_files = keep_zip_files
self.file_path = Path(self.parquet_dir) / self.file_type / self.zip_file_name
def prepare(self):
""" create the necessary parent directories. """
self.file_path.mkdir(parents=True, exist_ok=True)
def execute(self):
"""
transform the zip file.
"""
self._inner_transform_zip_file(self.file_path, self.zip_file_path)
# remove the file if keep_zip_files is False
if not self.keep_zip_files:
with contextlib.suppress(OSError):
os.remove(self.zip_file_path)
def commit(self) -> str:
""" nothing special to do. """
return "success"
def exception(self, exception) -> str:
""" log the problem and clean the target directory. """
logger = logging.getLogger()
logger.error('failed to process %s', self.zip_file_name)
# the created dir has to be removed with all its content
shutil.rmtree(self.file_path, ignore_errors=True)
return f"failed {exception}"
def __str__(self) -> str:
return f"ToParquetTransformTask(zip_file_name: {self.zip_file_name})"
def _inner_transform_zip_file(self, target_path: Path, zip_file_path):
sub_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=SUB_TXT,
dtype=SUB_DTYPE)
pre_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=PRE_TXT,
dtype=PRE_DTYPE)
num_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=NUM_TXT,
dtype=NUM_DTYPE)
# ensure period columns are valid ints
# some report types don't have a value set for period
sub_df['period'] = sub_df['period'].fillna(-1).astype(int)
# same for line
pre_df['line'] = pre_df['line'].fillna(-1).astype(int)
# special handling for field value in num, since the daily files can also contain strings
if self.file_type == 'daily':
num_df = num_df[~num_df.tag.isin(['SecurityExchangeName', 'TradingSymbol'])]
num_df['value'] = num_df['value'].astype(float)
sub_df.to_parquet(target_path / f'{SUB_TXT}.parquet')
pre_df.to_parquet(target_path / f'{PRE_TXT}.parquet')
num_df.to_parquet(target_path / f'{NUM_TXT}.parquet')
class ToParquetTransformerProcess(AbstractProcessPoolProcess):
"""
Transforming zip files containing the sub.txt, num.txt, and pre.txt as CSV into
parquet format.
"""
def __init__(self,
zip_dir: str,
parquet_dir: str,
file_type: str,
keep_zip_files: bool,
execute_serial: bool = False):
"""
Constructor.
Args:
zip_dir: directory which contains the zipfiles that have to be transformed to parquet
parquet_dir: target base directory for the parguet files
file_type: file_type, either 'quarter' or 'daily' used to define the
subfolder in the parquet dir
"""
super().__init__(execute_serial=execute_serial,
chunksize=0)
self.zip_dir = zip_dir
self.parquet_dir = parquet_dir
self.file_type = file_type
self.keep_zip_files = keep_zip_files
def _calculate_not_transformed(self) -> List[str]:
"""
calculates the untransformed zip files in the zip_dir.
simply reads all the existing file names in the zip_dir and checks if there is a
subfolder with the same name in the parguet-dir
Returns:
List[str]: List with tuple of zipfile path.
"""
downloaded_zipfiles = glob.glob(os.path.join(self.zip_dir, "*.zip"))
present_parquet_dirs = get_directories_in_directory(
os.path.join(self.parquet_dir, self.file_type))
zip_file_names = {os.path.basename(p): p for p in downloaded_zipfiles}
parquet_dirs = [os.path.basename(p) for p in present_parquet_dirs]
not_transformed_names = set(zip_file_names.keys()) - set(parquet_dirs)
# key is the zipfile name, value is the whole path of the file
# the returned dict only contains elements for which not parquet directory does exist yet
return [v for k, v in zip_file_names.items() if k in not_transformed_names]
def calculate_tasks(self) -> List[ToParquetTransformTask]:
"""
Returns:
List[ToParquetTransformTask]: calculates the necessary tasks that have to be executed.
"""
not_transformed_paths: List[str] = self._calculate_not_transformed()
return [ToParquetTransformTask(zip_file_path=zip_file_path,
parquet_dir=self.parquet_dir,
file_type=self.file_type,
keep_zip_files=self.keep_zip_files
)
for zip_file_path in not_transformed_paths]
Classes
class ToParquetTransformTask (zip_file_path: str, parquet_dir: str, file_type: str, keep_zip_files: bool)
-
Transforms a zip file containing csv files to a folder with parquet files.
Constructor.
Args
zip_file_path
- path to the zipfile that must be transformed
parquet_dir
- target base directory for the parguet files
file_type
- file_type, either 'quarter' or 'daily' used to define the subfolder in the parquet dir
keep_zip_files
- flag that indicates whether the zipfiles should be deleted after successful transformation
Expand source code
class ToParquetTransformTask: """ Transforms a zip file containing csv files to a folder with parquet files. """ def __init__(self, zip_file_path: str, parquet_dir: str, file_type: str, keep_zip_files: bool): """ Constructor. Args: zip_file_path: path to the zipfile that must be transformed parquet_dir: target base directory for the parguet files file_type: file_type, either 'quarter' or 'daily' used to define the subfolder in the parquet dir keep_zip_files: flag that indicates whether the zipfiles should be deleted after successful transformation """ self.zip_file_name = os.path.basename(zip_file_path) self.zip_file_path = zip_file_path self.parquet_dir = parquet_dir self.file_type = file_type self.keep_zip_files = keep_zip_files self.file_path = Path(self.parquet_dir) / self.file_type / self.zip_file_name def prepare(self): """ create the necessary parent directories. """ self.file_path.mkdir(parents=True, exist_ok=True) def execute(self): """ transform the zip file. """ self._inner_transform_zip_file(self.file_path, self.zip_file_path) # remove the file if keep_zip_files is False if not self.keep_zip_files: with contextlib.suppress(OSError): os.remove(self.zip_file_path) def commit(self) -> str: """ nothing special to do. """ return "success" def exception(self, exception) -> str: """ log the problem and clean the target directory. """ logger = logging.getLogger() logger.error('failed to process %s', self.zip_file_name) # the created dir has to be removed with all its content shutil.rmtree(self.file_path, ignore_errors=True) return f"failed {exception}" def __str__(self) -> str: return f"ToParquetTransformTask(zip_file_name: {self.zip_file_name})" def _inner_transform_zip_file(self, target_path: Path, zip_file_path): sub_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=SUB_TXT, dtype=SUB_DTYPE) pre_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=PRE_TXT, dtype=PRE_DTYPE) num_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=NUM_TXT, dtype=NUM_DTYPE) # ensure period columns are valid ints # some report types don't have a value set for period sub_df['period'] = sub_df['period'].fillna(-1).astype(int) # same for line pre_df['line'] = pre_df['line'].fillna(-1).astype(int) # special handling for field value in num, since the daily files can also contain strings if self.file_type == 'daily': num_df = num_df[~num_df.tag.isin(['SecurityExchangeName', 'TradingSymbol'])] num_df['value'] = num_df['value'].astype(float) sub_df.to_parquet(target_path / f'{SUB_TXT}.parquet') pre_df.to_parquet(target_path / f'{PRE_TXT}.parquet') num_df.to_parquet(target_path / f'{NUM_TXT}.parquet')
Methods
def commit(self) ‑> str
-
nothing special to do.
Expand source code
def commit(self) -> str: """ nothing special to do. """ return "success"
def exception(self, exception) ‑> str
-
log the problem and clean the target directory.
Expand source code
def exception(self, exception) -> str: """ log the problem and clean the target directory. """ logger = logging.getLogger() logger.error('failed to process %s', self.zip_file_name) # the created dir has to be removed with all its content shutil.rmtree(self.file_path, ignore_errors=True) return f"failed {exception}"
def execute(self)
-
transform the zip file.
Expand source code
def execute(self): """ transform the zip file. """ self._inner_transform_zip_file(self.file_path, self.zip_file_path) # remove the file if keep_zip_files is False if not self.keep_zip_files: with contextlib.suppress(OSError): os.remove(self.zip_file_path)
def prepare(self)
-
create the necessary parent directories.
Expand source code
def prepare(self): """ create the necessary parent directories. """ self.file_path.mkdir(parents=True, exist_ok=True)
class ToParquetTransformerProcess (zip_dir: str, parquet_dir: str, file_type: str, keep_zip_files: bool, execute_serial: bool = False)
-
Transforming zip files containing the sub.txt, num.txt, and pre.txt as CSV into parquet format.
Constructor.
Args
zip_dir
- directory which contains the zipfiles that have to be transformed to parquet
parquet_dir
- target base directory for the parguet files
file_type
- file_type, either 'quarter' or 'daily' used to define the subfolder in the parquet dir
Expand source code
class ToParquetTransformerProcess(AbstractProcessPoolProcess): """ Transforming zip files containing the sub.txt, num.txt, and pre.txt as CSV into parquet format. """ def __init__(self, zip_dir: str, parquet_dir: str, file_type: str, keep_zip_files: bool, execute_serial: bool = False): """ Constructor. Args: zip_dir: directory which contains the zipfiles that have to be transformed to parquet parquet_dir: target base directory for the parguet files file_type: file_type, either 'quarter' or 'daily' used to define the subfolder in the parquet dir """ super().__init__(execute_serial=execute_serial, chunksize=0) self.zip_dir = zip_dir self.parquet_dir = parquet_dir self.file_type = file_type self.keep_zip_files = keep_zip_files def _calculate_not_transformed(self) -> List[str]: """ calculates the untransformed zip files in the zip_dir. simply reads all the existing file names in the zip_dir and checks if there is a subfolder with the same name in the parguet-dir Returns: List[str]: List with tuple of zipfile path. """ downloaded_zipfiles = glob.glob(os.path.join(self.zip_dir, "*.zip")) present_parquet_dirs = get_directories_in_directory( os.path.join(self.parquet_dir, self.file_type)) zip_file_names = {os.path.basename(p): p for p in downloaded_zipfiles} parquet_dirs = [os.path.basename(p) for p in present_parquet_dirs] not_transformed_names = set(zip_file_names.keys()) - set(parquet_dirs) # key is the zipfile name, value is the whole path of the file # the returned dict only contains elements for which not parquet directory does exist yet return [v for k, v in zip_file_names.items() if k in not_transformed_names] def calculate_tasks(self) -> List[ToParquetTransformTask]: """ Returns: List[ToParquetTransformTask]: calculates the necessary tasks that have to be executed. """ not_transformed_paths: List[str] = self._calculate_not_transformed() return [ToParquetTransformTask(zip_file_path=zip_file_path, parquet_dir=self.parquet_dir, file_type=self.file_type, keep_zip_files=self.keep_zip_files ) for zip_file_path in not_transformed_paths]
Ancestors
Methods
def calculate_tasks(self) ‑> List[ToParquetTransformTask]
-
Returns
List[ToParquetTransformTask]
- calculates the necessary tasks that have to be executed.
Expand source code
def calculate_tasks(self) -> List[ToParquetTransformTask]: """ Returns: List[ToParquetTransformTask]: calculates the necessary tasks that have to be executed. """ not_transformed_paths: List[str] = self._calculate_not_transformed() return [ToParquetTransformTask(zip_file_path=zip_file_path, parquet_dir=self.parquet_dir, file_type=self.file_type, keep_zip_files=self.keep_zip_files ) for zip_file_path in not_transformed_paths]
Inherited members