Module secfsdstools.c_transform.toparquettransforming
Transform zip files to parquet format
Expand source code
"""Transform zip files to parquet format"""
import contextlib
import glob
import logging
import os
import shutil
from typing import List, Tuple
from secfsdstools.a_utils.constants import SUB_TXT, PRE_TXT, NUM_TXT, NUM_DTYPE, PRE_DTYPE, \
SUB_DTYPE
from secfsdstools.a_utils.fileutils import get_directories_in_directory, \
read_df_from_file_in_zip
from secfsdstools.a_utils.parallelexecution import ParallelExecutor
LOGGER = logging.getLogger(__name__)
class ToParquetTransformer:
"""
Transforming zip files containing the sub.txt, num.txt, and pre.txt as CSV into
parquet format.
"""
def __init__(self, zip_dir: str, parquet_dir: str, file_type: str, keep_zip_files: bool):
"""
Constructor.
Args:
zip_dir: directory which contains the zipfiles that have to be transformed to parquet
parquet_dir: target base directory for the parguet files
file_type: file_type, either 'quarter' or 'daily' used to define the
subfolder in the parquet dir
"""
self.zip_dir = zip_dir
self.parquet_dir = parquet_dir
self.file_type = file_type
self.keep_zip_files = keep_zip_files
def _calculate_not_transformed(self) -> List[Tuple[str, str]]:
"""
calculates the untransformed zip files in the zip_dir.
simply reads all the existing file names in the zip_dir and checks if there is a
subfolder with the same name in the parguet-dir
Returns:
List[Tuple[str, str]]: List with tuple of zipfile-name and zipfile path.
"""
downloaded_zipfiles = glob.glob(os.path.join(self.zip_dir, "*.zip"))
present_parquet_dirs = get_directories_in_directory(
os.path.join(self.parquet_dir, self.file_type))
zip_file_names = {os.path.basename(p): p for p in downloaded_zipfiles}
parquet_dirs = [os.path.basename(p) for p in present_parquet_dirs]
not_transformed_names = set(zip_file_names.keys()) - set(parquet_dirs)
# key is the zipfile name, value is the whole path of the file
# the returned dict only contains elements for which not parquet directory does exist yet
return [(k, v) for k, v in zip_file_names.items() if k in not_transformed_names]
def _transform_zip_file(self, zip_file_name: str, zip_file_path: str):
target_path = os.path.join(self.parquet_dir, self.file_type, zip_file_name)
try:
os.makedirs(target_path, exist_ok=True)
self._inner_transform_zip_file(target_path, zip_file_path)
# remove the file if keep_zip_files is False
if not self.keep_zip_files:
with contextlib.suppress(OSError):
os.remove(zip_file_path)
except Exception as ex: # pylint: disable=W0703 # we need to catch all exceptions
LOGGER.error('failed to process %s', zip_file_path)
LOGGER.error(ex)
# the created dir has to be removed with all its content
shutil.rmtree(target_path, ignore_errors=True)
def _inner_transform_zip_file(self, target_path, zip_file_path):
sub_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=SUB_TXT,
dtype=SUB_DTYPE)
pre_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=PRE_TXT,
dtype=PRE_DTYPE)
num_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=NUM_TXT,
dtype=NUM_DTYPE)
# ensure period columns are valid ints
# some report types don't have a value set for period
sub_df['period'] = sub_df['period'].fillna(-1).astype(int)
# same for line
pre_df['line'] = pre_df['line'].fillna(-1).astype(int)
# special handling for field value in num, since the daily files can also contain strings
if self.file_type == 'daily':
num_df = num_df[~num_df.tag.isin(['SecurityExchangeName', 'TradingSymbol'])]
num_df['value'] = num_df['value'].astype(float)
sub_df.to_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet'))
pre_df.to_parquet(os.path.join(target_path, f'{PRE_TXT}.parquet'))
num_df.to_parquet(os.path.join(target_path, f'{NUM_TXT}.parquet'))
def process(self) -> List[Tuple[str, str]]:
"""
Transforms all the zip files in the zip-dir to parquet format in the parquet dir,
if the zip file has not been transformed already.
processing is done in parallel.
Returns:
List[Tuple[str, str]]:
"""
def get_entries() -> List[Tuple[str, str]]:
return self._calculate_not_transformed()
def process_element(element: Tuple[str, str]) -> Tuple[str, str]:
LOGGER.info('processing %s', element[0])
self._transform_zip_file(element[0], element[1])
return element
def post_process(parts: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
return parts
executor = ParallelExecutor(chunksize=0)
executor.set_get_entries_function(get_entries)
executor.set_process_element_function(process_element)
executor.set_post_process_chunk_function(post_process)
result, failed = executor.execute()
if len(failed) > 0:
LOGGER.error("The following files could not be transformed: %s", failed)
return result
Classes
class ToParquetTransformer (zip_dir: str, parquet_dir: str, file_type: str, keep_zip_files: bool)
-
Transforming zip files containing the sub.txt, num.txt, and pre.txt as CSV into parquet format.
Constructor.
Args
zip_dir
- directory which contains the zipfiles that have to be transformed to parquet
parquet_dir
- target base directory for the parguet files
file_type
- file_type, either 'quarter' or 'daily' used to define the subfolder in the parquet dir
Expand source code
class ToParquetTransformer: """ Transforming zip files containing the sub.txt, num.txt, and pre.txt as CSV into parquet format. """ def __init__(self, zip_dir: str, parquet_dir: str, file_type: str, keep_zip_files: bool): """ Constructor. Args: zip_dir: directory which contains the zipfiles that have to be transformed to parquet parquet_dir: target base directory for the parguet files file_type: file_type, either 'quarter' or 'daily' used to define the subfolder in the parquet dir """ self.zip_dir = zip_dir self.parquet_dir = parquet_dir self.file_type = file_type self.keep_zip_files = keep_zip_files def _calculate_not_transformed(self) -> List[Tuple[str, str]]: """ calculates the untransformed zip files in the zip_dir. simply reads all the existing file names in the zip_dir and checks if there is a subfolder with the same name in the parguet-dir Returns: List[Tuple[str, str]]: List with tuple of zipfile-name and zipfile path. """ downloaded_zipfiles = glob.glob(os.path.join(self.zip_dir, "*.zip")) present_parquet_dirs = get_directories_in_directory( os.path.join(self.parquet_dir, self.file_type)) zip_file_names = {os.path.basename(p): p for p in downloaded_zipfiles} parquet_dirs = [os.path.basename(p) for p in present_parquet_dirs] not_transformed_names = set(zip_file_names.keys()) - set(parquet_dirs) # key is the zipfile name, value is the whole path of the file # the returned dict only contains elements for which not parquet directory does exist yet return [(k, v) for k, v in zip_file_names.items() if k in not_transformed_names] def _transform_zip_file(self, zip_file_name: str, zip_file_path: str): target_path = os.path.join(self.parquet_dir, self.file_type, zip_file_name) try: os.makedirs(target_path, exist_ok=True) self._inner_transform_zip_file(target_path, zip_file_path) # remove the file if keep_zip_files is False if not self.keep_zip_files: with contextlib.suppress(OSError): os.remove(zip_file_path) except Exception as ex: # pylint: disable=W0703 # we need to catch all exceptions LOGGER.error('failed to process %s', zip_file_path) LOGGER.error(ex) # the created dir has to be removed with all its content shutil.rmtree(target_path, ignore_errors=True) def _inner_transform_zip_file(self, target_path, zip_file_path): sub_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=SUB_TXT, dtype=SUB_DTYPE) pre_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=PRE_TXT, dtype=PRE_DTYPE) num_df = read_df_from_file_in_zip(zip_file=zip_file_path, file_to_extract=NUM_TXT, dtype=NUM_DTYPE) # ensure period columns are valid ints # some report types don't have a value set for period sub_df['period'] = sub_df['period'].fillna(-1).astype(int) # same for line pre_df['line'] = pre_df['line'].fillna(-1).astype(int) # special handling for field value in num, since the daily files can also contain strings if self.file_type == 'daily': num_df = num_df[~num_df.tag.isin(['SecurityExchangeName', 'TradingSymbol'])] num_df['value'] = num_df['value'].astype(float) sub_df.to_parquet(os.path.join(target_path, f'{SUB_TXT}.parquet')) pre_df.to_parquet(os.path.join(target_path, f'{PRE_TXT}.parquet')) num_df.to_parquet(os.path.join(target_path, f'{NUM_TXT}.parquet')) def process(self) -> List[Tuple[str, str]]: """ Transforms all the zip files in the zip-dir to parquet format in the parquet dir, if the zip file has not been transformed already. processing is done in parallel. Returns: List[Tuple[str, str]]: """ def get_entries() -> List[Tuple[str, str]]: return self._calculate_not_transformed() def process_element(element: Tuple[str, str]) -> Tuple[str, str]: LOGGER.info('processing %s', element[0]) self._transform_zip_file(element[0], element[1]) return element def post_process(parts: List[Tuple[str, str]]) -> List[Tuple[str, str]]: return parts executor = ParallelExecutor(chunksize=0) executor.set_get_entries_function(get_entries) executor.set_process_element_function(process_element) executor.set_post_process_chunk_function(post_process) result, failed = executor.execute() if len(failed) > 0: LOGGER.error("The following files could not be transformed: %s", failed) return result
Methods
def process(self) ‑> List[Tuple[str, str]]
-
Transforms all the zip files in the zip-dir to parquet format in the parquet dir, if the zip file has not been transformed already. processing is done in parallel.
Returns
List[Tuple[str, str]]:
Expand source code
def process(self) -> List[Tuple[str, str]]: """ Transforms all the zip files in the zip-dir to parquet format in the parquet dir, if the zip file has not been transformed already. processing is done in parallel. Returns: List[Tuple[str, str]]: """ def get_entries() -> List[Tuple[str, str]]: return self._calculate_not_transformed() def process_element(element: Tuple[str, str]) -> Tuple[str, str]: LOGGER.info('processing %s', element[0]) self._transform_zip_file(element[0], element[1]) return element def post_process(parts: List[Tuple[str, str]]) -> List[Tuple[str, str]]: return parts executor = ParallelExecutor(chunksize=0) executor.set_get_entries_function(get_entries) executor.set_process_element_function(process_element) executor.set_post_process_chunk_function(post_process) result, failed = executor.execute() if len(failed) > 0: LOGGER.error("The following files could not be transformed: %s", failed) return result