Module secfsdstools.c_download.basedownloading_process

Base classes for Downloading zip files of the financial statement data sets.

Expand source code
"""
Base classes for Downloading zip files of the financial statement data sets.
"""
import logging
import os
from abc import abstractmethod
from pathlib import Path
from typing import List, Tuple, Dict

from secfsdstools.a_utils.downloadutils import UrlDownloader
from secfsdstools.a_utils.fileutils import get_filenames_in_directory, get_directories_in_directory
from secfsdstools.c_automation.task_framework import AbstractThreadProcess

LOGGER = logging.getLogger(__name__)


class DownloadTask:
    """
    This Tasks takes care of downloading a file from a certain url.
    """

    def __init__(self,
                 target_path: Path,
                 url: str,
                 urldownloader: UrlDownloader,
                 headers=None
                 ):
        """
        Constructor.
        Args:
            target_path: target path to which the resource has to be stored
            url: url to download
            urldownloader: instance of the UrlDownloader
            headers: special headers to be added to the http call as dict.Default is an empty dict
        """
        if headers is None:
            headers = {}
        self.url = url
        self.urldownloader = urldownloader
        self.headers = headers

        self.target_path = target_path

    def prepare(self):
        """
        create the target directory (incl. parents) if necessary
        """
        self.target_path.parent.mkdir(parents=True, exist_ok=True)

    def execute(self):
        """ download the url using the urldownloader instance. """
        logger = logging.getLogger()
        logger.info("download %s", self.url)
        self.urldownloader.binary_download_url_to_file(self.url,
                                                       str(self.target_path),
                                                       headers=self.headers)

    def commit(self) -> str:
        """ no real commit logic necessary, just return 'success' """
        return "success"

    def exception(self, exception) -> str:
        """ no special failure handling necessary"""
        return f"failed {exception}"

    def __str__(self) -> str:
        return f"DownloadTask(target_path: {self.target_path})"


class BaseDownloadingProcess(AbstractThreadProcess):
    """
    Base implementation for downloading zip files.
    """

    def __init__(self,
                 zip_dir: str,
                 parquet_dir: str,
                 urldownloader: UrlDownloader,
                 execute_serial: bool = False):
        """
        Constructor.
        Args:
            zip_dir: target folder to store downloaded files to.
            parquet_dir: directory where the transformed content of zip files are
            urldownloader: UrlDownloader instance
            execute_serial: whether to execute it in parallel or serial
        """
        super().__init__(execute_serial=execute_serial,
                         chunksize=3)

        self.zip_dir = zip_dir
        self.parquet_dir = parquet_dir
        self.urldownloader = urldownloader
        self.execute_serial = execute_serial

        if not os.path.isdir(self.zip_dir):
            LOGGER.info("creating download folder: %s", self.zip_dir)
            os.makedirs(self.zip_dir)

    def get_headers(self) -> Dict[str, str]:
        """
        return the needed headers. No additional header needed in this case.
        """
        return {}

    def _get_downloaded_zips(self) -> List[str]:
        """
        return the list of zipfiles that are already downloaded.
        """
        return get_filenames_in_directory(os.path.join(self.zip_dir, '*.zip'))

    def _get_transformed_parquet(self) -> List[str]:
        """ return the list of zipfiles that were already transformed to parquet."""
        return get_directories_in_directory(self.parquet_dir)

    @abstractmethod
    def _calculate_missing_zips(self) -> List[Tuple[str, str]]:
        """ calculate the zip files, which still need to be downloaded"""

    def calculate_tasks(self) -> List[DownloadTask]:
        """
        create the download tasks: a task for every missing zip file
        Returns:
            List[DownloadTask]: list of download tasks

        """
        missing_zips: List[Tuple[str, str]] = self._calculate_missing_zips()

        return [DownloadTask(target_path=Path(self.zip_dir) / name,
                             url=href,
                             urldownloader=self.urldownloader,
                             headers=self.get_headers())
                for name, href in missing_zips]

Classes

class BaseDownloadingProcess (zip_dir: str, parquet_dir: str, urldownloader: UrlDownloader, execute_serial: bool = False)

Base implementation for downloading zip files.

Constructor.

Args

zip_dir
target folder to store downloaded files to.
parquet_dir
directory where the transformed content of zip files are
urldownloader
UrlDownloader instance
execute_serial
whether to execute it in parallel or serial
Expand source code
class BaseDownloadingProcess(AbstractThreadProcess):
    """
    Base implementation for downloading zip files.
    """

    def __init__(self,
                 zip_dir: str,
                 parquet_dir: str,
                 urldownloader: UrlDownloader,
                 execute_serial: bool = False):
        """
        Constructor.
        Args:
            zip_dir: target folder to store downloaded files to.
            parquet_dir: directory where the transformed content of zip files are
            urldownloader: UrlDownloader instance
            execute_serial: whether to execute it in parallel or serial
        """
        super().__init__(execute_serial=execute_serial,
                         chunksize=3)

        self.zip_dir = zip_dir
        self.parquet_dir = parquet_dir
        self.urldownloader = urldownloader
        self.execute_serial = execute_serial

        if not os.path.isdir(self.zip_dir):
            LOGGER.info("creating download folder: %s", self.zip_dir)
            os.makedirs(self.zip_dir)

    def get_headers(self) -> Dict[str, str]:
        """
        return the needed headers. No additional header needed in this case.
        """
        return {}

    def _get_downloaded_zips(self) -> List[str]:
        """
        return the list of zipfiles that are already downloaded.
        """
        return get_filenames_in_directory(os.path.join(self.zip_dir, '*.zip'))

    def _get_transformed_parquet(self) -> List[str]:
        """ return the list of zipfiles that were already transformed to parquet."""
        return get_directories_in_directory(self.parquet_dir)

    @abstractmethod
    def _calculate_missing_zips(self) -> List[Tuple[str, str]]:
        """ calculate the zip files, which still need to be downloaded"""

    def calculate_tasks(self) -> List[DownloadTask]:
        """
        create the download tasks: a task for every missing zip file
        Returns:
            List[DownloadTask]: list of download tasks

        """
        missing_zips: List[Tuple[str, str]] = self._calculate_missing_zips()

        return [DownloadTask(target_path=Path(self.zip_dir) / name,
                             url=href,
                             urldownloader=self.urldownloader,
                             headers=self.get_headers())
                for name, href in missing_zips]

Ancestors

Subclasses

Methods

def calculate_tasks(self) ‑> List[DownloadTask]

create the download tasks: a task for every missing zip file

Returns

List[DownloadTask]
list of download tasks
Expand source code
def calculate_tasks(self) -> List[DownloadTask]:
    """
    create the download tasks: a task for every missing zip file
    Returns:
        List[DownloadTask]: list of download tasks

    """
    missing_zips: List[Tuple[str, str]] = self._calculate_missing_zips()

    return [DownloadTask(target_path=Path(self.zip_dir) / name,
                         url=href,
                         urldownloader=self.urldownloader,
                         headers=self.get_headers())
            for name, href in missing_zips]
def get_headers(self) ‑> Dict[str, str]

return the needed headers. No additional header needed in this case.

Expand source code
def get_headers(self) -> Dict[str, str]:
    """
    return the needed headers. No additional header needed in this case.
    """
    return {}

Inherited members

class DownloadTask (target_path: pathlib.Path, url: str, urldownloader: UrlDownloader, headers=None)

This Tasks takes care of downloading a file from a certain url.

Constructor.

Args

target_path
target path to which the resource has to be stored
url
url to download
urldownloader
instance of the UrlDownloader
headers
special headers to be added to the http call as dict.Default is an empty dict
Expand source code
class DownloadTask:
    """
    This Tasks takes care of downloading a file from a certain url.
    """

    def __init__(self,
                 target_path: Path,
                 url: str,
                 urldownloader: UrlDownloader,
                 headers=None
                 ):
        """
        Constructor.
        Args:
            target_path: target path to which the resource has to be stored
            url: url to download
            urldownloader: instance of the UrlDownloader
            headers: special headers to be added to the http call as dict.Default is an empty dict
        """
        if headers is None:
            headers = {}
        self.url = url
        self.urldownloader = urldownloader
        self.headers = headers

        self.target_path = target_path

    def prepare(self):
        """
        create the target directory (incl. parents) if necessary
        """
        self.target_path.parent.mkdir(parents=True, exist_ok=True)

    def execute(self):
        """ download the url using the urldownloader instance. """
        logger = logging.getLogger()
        logger.info("download %s", self.url)
        self.urldownloader.binary_download_url_to_file(self.url,
                                                       str(self.target_path),
                                                       headers=self.headers)

    def commit(self) -> str:
        """ no real commit logic necessary, just return 'success' """
        return "success"

    def exception(self, exception) -> str:
        """ no special failure handling necessary"""
        return f"failed {exception}"

    def __str__(self) -> str:
        return f"DownloadTask(target_path: {self.target_path})"

Methods

def commit(self) ‑> str

no real commit logic necessary, just return 'success'

Expand source code
def commit(self) -> str:
    """ no real commit logic necessary, just return 'success' """
    return "success"
def exception(self, exception) ‑> str

no special failure handling necessary

Expand source code
def exception(self, exception) -> str:
    """ no special failure handling necessary"""
    return f"failed {exception}"
def execute(self)

download the url using the urldownloader instance.

Expand source code
def execute(self):
    """ download the url using the urldownloader instance. """
    logger = logging.getLogger()
    logger.info("download %s", self.url)
    self.urldownloader.binary_download_url_to_file(self.url,
                                                   str(self.target_path),
                                                   headers=self.headers)
def prepare(self)

create the target directory (incl. parents) if necessary

Expand source code
def prepare(self):
    """
    create the target directory (incl. parents) if necessary
    """
    self.target_path.parent.mkdir(parents=True, exist_ok=True)