Module secfsdstools.c_daily.dailypreparation_process

Module for preparing daily SEC financial statement data. Handles downloading, transforming, and indexing daily SEC filings. Provides functionality to process daily files starting from a specified quarter.

Expand source code
"""
Module for preparing daily SEC financial statement data. Handles downloading,
transforming, and indexing daily SEC filings.
Provides functionality to process daily files starting from a specified quarter.
"""

import logging
import shutil
from pathlib import Path
from typing import Dict

from secdaily._00_common.BaseDefinitions import QuarterInfo
from secdaily._00_common.db.StateAccess import StateAccess
from secdaily._00_common.MigrationProcessing import MigrationProcessor
from secdaily.SecDaily import Configuration, SecDailyOrchestrator

from secfsdstools.c_automation.task_framework import AbstractProcess
from secfsdstools.c_index.indexdataaccess import ParquetDBIndexingAccessor

LOGGER = logging.getLogger(__name__)


class DailyPreparationProcess(AbstractProcess):
    """
    Process for preparing daily SEC financial statement data.

    This class handles the entire process of downloading daily SEC filings,
    transforming them into the appropriate format, and indexing them for
    efficient access. It works with the secdaily package to download and
    process the daily files.
    """

    def __init__(self, db_dir: str, parquet_dir: str, daily_dir: str):
        super().__init__()
        self.daily_dir = daily_dir
        self.parquet_dir = parquet_dir
        self.index_accessor = ParquetDBIndexingAccessor(db_dir=db_dir)

        self.config = Configuration(
            workdir=self.daily_dir,
            clean_db_entries=True,
            clean_daily_zip_files=True,
            clean_intermediate_files=True,
            clean_quarter_zip_files=True,
        )

    @staticmethod
    def calculate_daily_start_quarter(quarter_before: str) -> QuarterInfo:
        """
        calculates the next quarter based on the provided quarter.
        """

        year_str, quarter_str = quarter_before.split("q")
        year = int(year_str)
        quarter = int(quarter_str)

        if quarter == 4:
            year += 1
            quarter = 1
        else:
            quarter += 1

        return QuarterInfo(year, quarter)

    @staticmethod
    def cut_off_day(quarter: QuarterInfo) -> int:
        """
        calculates the "first" day of the quarter.
        quarter one will result in yyyy0000, quarter two in yyyy0300,
        quarter three in yyyy0600, and quarter four in yyyy0900.

        This way, we can select for < cut_off_day to get all filings before the start of the quarter.
        """
        cut_off_month: Dict[int, int] = {1: 0, 2: 4, 3: 7, 4: 10}  # previous year  # April  # July  # October

        return quarter.year * 10_000 + cut_off_month[quarter.qrtr] * 100

    def clear_index_tables(self, cut_off_day: int):
        """
        Clear index tables for the daily processing.

        index_parquet_reports: Removes entries that were created based on daily files that
        are now covered by quarterly files. Based on fields origin_file < cut_off_day and originFileType = daily.

        index_parquet_processing_state: remove entries based on fileName length 8 + 3 and < cut_off_day
        """
        self.index_accessor.clear_index_tables(cut_off_day=cut_off_day)

    def clear_daily_parquet_files(self, cut_off_day: int):
        """
        Clear daily parquet files.

        Clear parquet files that were created from daily files that
        are now covered by quarterly files.
        """

        cut_off_file_name = f"{cut_off_day}.zip"
        daily_parquet_dir = Path(self.parquet_dir) / "daily"

        if daily_parquet_dir.exists():
            for dir_path in daily_parquet_dir.iterdir():
                if dir_path.is_dir() and dir_path.name < cut_off_file_name:
                    shutil.rmtree(dir_path)

    def check_for_daily_cleanup(self) -> bool:
        """
        check if the daily processing needs to be cleaned up.
        directly executes the cleanup if necessary for all the data that is managed by secdaily.
        """

        state_access = StateAccess(work_dir=self.daily_dir)
        migration_processor = MigrationProcessor(dbmanager=state_access)

        if migration_processor.is_migration_required():
            migration_processor.execute_migration(self.config)

            # Update the last run version after successful completion
            migration_processor.update_last_run_version()
            return True

        return False

    def download_daily_files(self, daily_start_quarter: QuarterInfo):
        """
        Download daily SEC filing files.

        This method configures and uses the SecDailyOrchestrator to download
        and process daily SEC filing data, starting from the calculated
        daily_start_quarter.
        """

        sec_daily = SecDailyOrchestrator(configuration=self.config)
        sec_daily.process_index_data(start_qrtr_info=daily_start_quarter)
        sec_daily.process_xml_data()
        sec_daily.create_sec_style()
        sec_daily.create_daily_zip()
        sec_daily.housekeeping(start_qrtr_info=daily_start_quarter)

    def process(self):
        """
        Execute the complete daily preparation process.

        This method runs the entire process in sequence:
        1. Clear index tables
        2. Clear daily parquet files
        3. Download daily files
        4. Transform daily files
        5. Index daily files
        """

        last_processed_quarter_file_name = self.index_accessor.find_latest_quarter_file_name()
        if last_processed_quarter_file_name is None:
            raise ValueError(
                "No quarterly files were processed before. "
                "Please process quarterly files first before running the daily process."
            )

        last_processed_quarter = last_processed_quarter_file_name.split(".")[0]

        last_processed_quarter: str
        # check if the daily data has te be cleaned up because of a breaking change in secdaily
        if self.check_for_daily_cleanup():
            # if so, we just use a "quarter" that is far in the future to
            # ensure everything is cleared in secfsdstools as well
            daily_last_processed_quarter = "3000q1"
        else:
            daily_last_processed_quarter = last_processed_quarter

        daily_start_quarter = self.calculate_daily_start_quarter(daily_last_processed_quarter)
        cut_off_day = self.cut_off_day(daily_start_quarter)
        self.context["cut_off_day"] = cut_off_day

        LOGGER.info("clearing daily index tables and daily parquet files before cut off: %s", cut_off_day)
        self.clear_index_tables(cut_off_day=cut_off_day)
        self.clear_daily_parquet_files(cut_off_day=cut_off_day)

        LOGGER.info("starting daily processing after last processed quarter: %s", last_processed_quarter)
        self.download_daily_files(daily_start_quarter=daily_start_quarter)

Classes

class DailyPreparationProcess (db_dir: str, parquet_dir: str, daily_dir: str)

Process for preparing daily SEC financial statement data.

This class handles the entire process of downloading daily SEC filings, transforming them into the appropriate format, and indexing them for efficient access. It works with the secdaily package to download and process the daily files.

Expand source code
class DailyPreparationProcess(AbstractProcess):
    """
    Process for preparing daily SEC financial statement data.

    This class handles the entire process of downloading daily SEC filings,
    transforming them into the appropriate format, and indexing them for
    efficient access. It works with the secdaily package to download and
    process the daily files.
    """

    def __init__(self, db_dir: str, parquet_dir: str, daily_dir: str):
        super().__init__()
        self.daily_dir = daily_dir
        self.parquet_dir = parquet_dir
        self.index_accessor = ParquetDBIndexingAccessor(db_dir=db_dir)

        self.config = Configuration(
            workdir=self.daily_dir,
            clean_db_entries=True,
            clean_daily_zip_files=True,
            clean_intermediate_files=True,
            clean_quarter_zip_files=True,
        )

    @staticmethod
    def calculate_daily_start_quarter(quarter_before: str) -> QuarterInfo:
        """
        calculates the next quarter based on the provided quarter.
        """

        year_str, quarter_str = quarter_before.split("q")
        year = int(year_str)
        quarter = int(quarter_str)

        if quarter == 4:
            year += 1
            quarter = 1
        else:
            quarter += 1

        return QuarterInfo(year, quarter)

    @staticmethod
    def cut_off_day(quarter: QuarterInfo) -> int:
        """
        calculates the "first" day of the quarter.
        quarter one will result in yyyy0000, quarter two in yyyy0300,
        quarter three in yyyy0600, and quarter four in yyyy0900.

        This way, we can select for < cut_off_day to get all filings before the start of the quarter.
        """
        cut_off_month: Dict[int, int] = {1: 0, 2: 4, 3: 7, 4: 10}  # previous year  # April  # July  # October

        return quarter.year * 10_000 + cut_off_month[quarter.qrtr] * 100

    def clear_index_tables(self, cut_off_day: int):
        """
        Clear index tables for the daily processing.

        index_parquet_reports: Removes entries that were created based on daily files that
        are now covered by quarterly files. Based on fields origin_file < cut_off_day and originFileType = daily.

        index_parquet_processing_state: remove entries based on fileName length 8 + 3 and < cut_off_day
        """
        self.index_accessor.clear_index_tables(cut_off_day=cut_off_day)

    def clear_daily_parquet_files(self, cut_off_day: int):
        """
        Clear daily parquet files.

        Clear parquet files that were created from daily files that
        are now covered by quarterly files.
        """

        cut_off_file_name = f"{cut_off_day}.zip"
        daily_parquet_dir = Path(self.parquet_dir) / "daily"

        if daily_parquet_dir.exists():
            for dir_path in daily_parquet_dir.iterdir():
                if dir_path.is_dir() and dir_path.name < cut_off_file_name:
                    shutil.rmtree(dir_path)

    def check_for_daily_cleanup(self) -> bool:
        """
        check if the daily processing needs to be cleaned up.
        directly executes the cleanup if necessary for all the data that is managed by secdaily.
        """

        state_access = StateAccess(work_dir=self.daily_dir)
        migration_processor = MigrationProcessor(dbmanager=state_access)

        if migration_processor.is_migration_required():
            migration_processor.execute_migration(self.config)

            # Update the last run version after successful completion
            migration_processor.update_last_run_version()
            return True

        return False

    def download_daily_files(self, daily_start_quarter: QuarterInfo):
        """
        Download daily SEC filing files.

        This method configures and uses the SecDailyOrchestrator to download
        and process daily SEC filing data, starting from the calculated
        daily_start_quarter.
        """

        sec_daily = SecDailyOrchestrator(configuration=self.config)
        sec_daily.process_index_data(start_qrtr_info=daily_start_quarter)
        sec_daily.process_xml_data()
        sec_daily.create_sec_style()
        sec_daily.create_daily_zip()
        sec_daily.housekeeping(start_qrtr_info=daily_start_quarter)

    def process(self):
        """
        Execute the complete daily preparation process.

        This method runs the entire process in sequence:
        1. Clear index tables
        2. Clear daily parquet files
        3. Download daily files
        4. Transform daily files
        5. Index daily files
        """

        last_processed_quarter_file_name = self.index_accessor.find_latest_quarter_file_name()
        if last_processed_quarter_file_name is None:
            raise ValueError(
                "No quarterly files were processed before. "
                "Please process quarterly files first before running the daily process."
            )

        last_processed_quarter = last_processed_quarter_file_name.split(".")[0]

        last_processed_quarter: str
        # check if the daily data has te be cleaned up because of a breaking change in secdaily
        if self.check_for_daily_cleanup():
            # if so, we just use a "quarter" that is far in the future to
            # ensure everything is cleared in secfsdstools as well
            daily_last_processed_quarter = "3000q1"
        else:
            daily_last_processed_quarter = last_processed_quarter

        daily_start_quarter = self.calculate_daily_start_quarter(daily_last_processed_quarter)
        cut_off_day = self.cut_off_day(daily_start_quarter)
        self.context["cut_off_day"] = cut_off_day

        LOGGER.info("clearing daily index tables and daily parquet files before cut off: %s", cut_off_day)
        self.clear_index_tables(cut_off_day=cut_off_day)
        self.clear_daily_parquet_files(cut_off_day=cut_off_day)

        LOGGER.info("starting daily processing after last processed quarter: %s", last_processed_quarter)
        self.download_daily_files(daily_start_quarter=daily_start_quarter)

Ancestors

Static methods

def calculate_daily_start_quarter(quarter_before: str) ‑> secdaily._00_common.BaseDefinitions.QuarterInfo

calculates the next quarter based on the provided quarter.

Expand source code
@staticmethod
def calculate_daily_start_quarter(quarter_before: str) -> QuarterInfo:
    """
    calculates the next quarter based on the provided quarter.
    """

    year_str, quarter_str = quarter_before.split("q")
    year = int(year_str)
    quarter = int(quarter_str)

    if quarter == 4:
        year += 1
        quarter = 1
    else:
        quarter += 1

    return QuarterInfo(year, quarter)
def cut_off_day(quarter: secdaily._00_common.BaseDefinitions.QuarterInfo) ‑> int

calculates the "first" day of the quarter. quarter one will result in yyyy0000, quarter two in yyyy0300, quarter three in yyyy0600, and quarter four in yyyy0900.

This way, we can select for < cut_off_day to get all filings before the start of the quarter.

Expand source code
@staticmethod
def cut_off_day(quarter: QuarterInfo) -> int:
    """
    calculates the "first" day of the quarter.
    quarter one will result in yyyy0000, quarter two in yyyy0300,
    quarter three in yyyy0600, and quarter four in yyyy0900.

    This way, we can select for < cut_off_day to get all filings before the start of the quarter.
    """
    cut_off_month: Dict[int, int] = {1: 0, 2: 4, 3: 7, 4: 10}  # previous year  # April  # July  # October

    return quarter.year * 10_000 + cut_off_month[quarter.qrtr] * 100

Methods

def check_for_daily_cleanup(self) ‑> bool

check if the daily processing needs to be cleaned up. directly executes the cleanup if necessary for all the data that is managed by secdaily.

Expand source code
def check_for_daily_cleanup(self) -> bool:
    """
    check if the daily processing needs to be cleaned up.
    directly executes the cleanup if necessary for all the data that is managed by secdaily.
    """

    state_access = StateAccess(work_dir=self.daily_dir)
    migration_processor = MigrationProcessor(dbmanager=state_access)

    if migration_processor.is_migration_required():
        migration_processor.execute_migration(self.config)

        # Update the last run version after successful completion
        migration_processor.update_last_run_version()
        return True

    return False
def clear_daily_parquet_files(self, cut_off_day: int)

Clear daily parquet files.

Clear parquet files that were created from daily files that are now covered by quarterly files.

Expand source code
def clear_daily_parquet_files(self, cut_off_day: int):
    """
    Clear daily parquet files.

    Clear parquet files that were created from daily files that
    are now covered by quarterly files.
    """

    cut_off_file_name = f"{cut_off_day}.zip"
    daily_parquet_dir = Path(self.parquet_dir) / "daily"

    if daily_parquet_dir.exists():
        for dir_path in daily_parquet_dir.iterdir():
            if dir_path.is_dir() and dir_path.name < cut_off_file_name:
                shutil.rmtree(dir_path)
def clear_index_tables(self, cut_off_day: int)

Clear index tables for the daily processing.

index_parquet_reports: Removes entries that were created based on daily files that are now covered by quarterly files. Based on fields origin_file < cut_off_day and originFileType = daily.

index_parquet_processing_state: remove entries based on fileName length 8 + 3 and < cut_off_day

Expand source code
def clear_index_tables(self, cut_off_day: int):
    """
    Clear index tables for the daily processing.

    index_parquet_reports: Removes entries that were created based on daily files that
    are now covered by quarterly files. Based on fields origin_file < cut_off_day and originFileType = daily.

    index_parquet_processing_state: remove entries based on fileName length 8 + 3 and < cut_off_day
    """
    self.index_accessor.clear_index_tables(cut_off_day=cut_off_day)
def download_daily_files(self, daily_start_quarter: secdaily._00_common.BaseDefinitions.QuarterInfo)

Download daily SEC filing files.

This method configures and uses the SecDailyOrchestrator to download and process daily SEC filing data, starting from the calculated daily_start_quarter.

Expand source code
def download_daily_files(self, daily_start_quarter: QuarterInfo):
    """
    Download daily SEC filing files.

    This method configures and uses the SecDailyOrchestrator to download
    and process daily SEC filing data, starting from the calculated
    daily_start_quarter.
    """

    sec_daily = SecDailyOrchestrator(configuration=self.config)
    sec_daily.process_index_data(start_qrtr_info=daily_start_quarter)
    sec_daily.process_xml_data()
    sec_daily.create_sec_style()
    sec_daily.create_daily_zip()
    sec_daily.housekeeping(start_qrtr_info=daily_start_quarter)
def process(self)

Execute the complete daily preparation process.

This method runs the entire process in sequence: 1. Clear index tables 2. Clear daily parquet files 3. Download daily files 4. Transform daily files 5. Index daily files

Expand source code
def process(self):
    """
    Execute the complete daily preparation process.

    This method runs the entire process in sequence:
    1. Clear index tables
    2. Clear daily parquet files
    3. Download daily files
    4. Transform daily files
    5. Index daily files
    """

    last_processed_quarter_file_name = self.index_accessor.find_latest_quarter_file_name()
    if last_processed_quarter_file_name is None:
        raise ValueError(
            "No quarterly files were processed before. "
            "Please process quarterly files first before running the daily process."
        )

    last_processed_quarter = last_processed_quarter_file_name.split(".")[0]

    last_processed_quarter: str
    # check if the daily data has te be cleaned up because of a breaking change in secdaily
    if self.check_for_daily_cleanup():
        # if so, we just use a "quarter" that is far in the future to
        # ensure everything is cleared in secfsdstools as well
        daily_last_processed_quarter = "3000q1"
    else:
        daily_last_processed_quarter = last_processed_quarter

    daily_start_quarter = self.calculate_daily_start_quarter(daily_last_processed_quarter)
    cut_off_day = self.cut_off_day(daily_start_quarter)
    self.context["cut_off_day"] = cut_off_day

    LOGGER.info("clearing daily index tables and daily parquet files before cut off: %s", cut_off_day)
    self.clear_index_tables(cut_off_day=cut_off_day)
    self.clear_daily_parquet_files(cut_off_day=cut_off_day)

    LOGGER.info("starting daily processing after last processed quarter: %s", last_processed_quarter)
    self.download_daily_files(daily_start_quarter=daily_start_quarter)

Inherited members