Module secfsdstools.c_download.basedownloading
Contains BaseDownloader class.
Expand source code
"""
Contains BaseDownloader class.
"""
import logging
import os
from abc import ABC, abstractmethod
from typing import Tuple, List, Dict
from secfsdstools.a_utils.downloadutils import UrlDownloader
from secfsdstools.a_utils.fileutils import get_filenames_in_directory, get_directories_in_directory
from secfsdstools.a_utils.parallelexecution import ThreadExecutor
LOGGER = logging.getLogger(__name__)
class BaseDownloader(ABC):
"""
Base class for Downloaders. Implements basic methods to download files
from an url and store it.
"""
def __init__(self, zip_dir: str,
parquet_dir_typed: str,
urldownloader: UrlDownloader,
execute_serial: bool = False):
self.urldownloader = urldownloader
self.parquet_dir_typed = parquet_dir_typed
self.execute_serial = execute_serial
self.result = None
self.zip_dir = zip_dir
if not os.path.isdir(self.zip_dir):
LOGGER.info("creating download folder: %s", self.zip_dir)
os.makedirs(self.zip_dir)
def _get_headers(self) -> Dict[str, str]:
return {}
def _download_zip(self, file: str, url: str) -> str:
file_path = os.path.join(self.zip_dir, file)
try:
self.urldownloader.binary_download_url_to_file(url, file_path,
headers=self._get_headers())
return 'success'
except Exception as ex: # pylint: disable=W0703
# we want to catch everything here.
return f'failed: {ex}'
def _download_file(self, data: Tuple[str, str]) -> str:
file: str = data[0]
url: str = data[1]
LOGGER.info(' start to download %s ', file)
return self._download_zip(url=url, file=file)
def _get_downloaded_zips(self) -> List[str]:
return get_filenames_in_directory(os.path.join(self.zip_dir, '*.zip'))
def _get_transformed_parquet(self) -> List[str]:
return get_directories_in_directory(self.parquet_dir_typed)
@abstractmethod
def _calculate_missing_zips(self) -> List[Tuple[str, str]]:
pass
def download(self):
"""
downloads the missing quarterly zip files from the sec.
"""
executor = ThreadExecutor[Tuple[str, str], str, type(None)](
processes=3,
max_calls_per_sec=8,
chunksize=3,
execute_serial=False
# execute_serial=self.execute_serial
)
executor.set_get_entries_function(self._calculate_missing_zips)
executor.set_process_element_function(self._download_file)
executor.set_post_process_chunk_function(lambda x: x)
self.result = executor.execute()
Classes
class BaseDownloader (zip_dir: str, parquet_dir_typed: str, urldownloader: UrlDownloader, execute_serial: bool = False)
-
Base class for Downloaders. Implements basic methods to download files from an url and store it.
Expand source code
class BaseDownloader(ABC): """ Base class for Downloaders. Implements basic methods to download files from an url and store it. """ def __init__(self, zip_dir: str, parquet_dir_typed: str, urldownloader: UrlDownloader, execute_serial: bool = False): self.urldownloader = urldownloader self.parquet_dir_typed = parquet_dir_typed self.execute_serial = execute_serial self.result = None self.zip_dir = zip_dir if not os.path.isdir(self.zip_dir): LOGGER.info("creating download folder: %s", self.zip_dir) os.makedirs(self.zip_dir) def _get_headers(self) -> Dict[str, str]: return {} def _download_zip(self, file: str, url: str) -> str: file_path = os.path.join(self.zip_dir, file) try: self.urldownloader.binary_download_url_to_file(url, file_path, headers=self._get_headers()) return 'success' except Exception as ex: # pylint: disable=W0703 # we want to catch everything here. return f'failed: {ex}' def _download_file(self, data: Tuple[str, str]) -> str: file: str = data[0] url: str = data[1] LOGGER.info(' start to download %s ', file) return self._download_zip(url=url, file=file) def _get_downloaded_zips(self) -> List[str]: return get_filenames_in_directory(os.path.join(self.zip_dir, '*.zip')) def _get_transformed_parquet(self) -> List[str]: return get_directories_in_directory(self.parquet_dir_typed) @abstractmethod def _calculate_missing_zips(self) -> List[Tuple[str, str]]: pass def download(self): """ downloads the missing quarterly zip files from the sec. """ executor = ThreadExecutor[Tuple[str, str], str, type(None)]( processes=3, max_calls_per_sec=8, chunksize=3, execute_serial=False # execute_serial=self.execute_serial ) executor.set_get_entries_function(self._calculate_missing_zips) executor.set_process_element_function(self._download_file) executor.set_post_process_chunk_function(lambda x: x) self.result = executor.execute()
Ancestors
- abc.ABC
Subclasses
Methods
def download(self)
-
downloads the missing quarterly zip files from the sec.
Expand source code
def download(self): """ downloads the missing quarterly zip files from the sec. """ executor = ThreadExecutor[Tuple[str, str], str, type(None)]( processes=3, max_calls_per_sec=8, chunksize=3, execute_serial=False # execute_serial=self.execute_serial ) executor.set_get_entries_function(self._calculate_missing_zips) executor.set_process_element_function(self._download_file) executor.set_post_process_chunk_function(lambda x: x) self.result = executor.execute()