Module secfsdstools.c_index.indexing
Indexing the downloaded to data
Expand source code
"""Indexing the downloaded to data"""
import logging
import os
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import List, Tuple
import pandas as pd
from secfsdstools.a_utils.constants import SUB_TXT
from secfsdstools.a_utils.fileutils import get_directories_in_directory
from secfsdstools.c_index.indexdataaccess import IndexFileProcessingState, ParquetDBIndexingAccessor
LOGGER = logging.getLogger(__name__)
class BaseReportIndexer(ABC):
"""
Base class to index the reports.
"""
PROCESSED_STR: str = 'processed'
URL_PREFIX: str = 'https://www.sec.gov/Archives/edgar/data/'
def __init__(self, accessor: ParquetDBIndexingAccessor, file_type: str):
self.dbaccessor = accessor
self.file_type = file_type
# get current datetime in UTC
utc_dt = datetime.now(timezone.utc)
# convert UTC time to ISO 8601 format
iso_date = utc_dt.astimezone().isoformat()
self.process_time = iso_date
@abstractmethod
def get_present_files(self) -> List[str]:
"""
returns the list with the filenames that are already present
Returns:
List[str]: list with the zip filenames that are already present
"""
@abstractmethod
def get_sub_df(self, file_name: str) -> Tuple[pd.DataFrame, str]:
"""
loads the content of sub_txt into a dataframe and returns the dataframe and the
fullpath to the data as a tuple.
Args:
file_name: name of the original zip file
Returns:
Tuple[pd.Dataframe, str]: DataFrame with the content in the sub_txt file, file path
"""
def _calculate_not_indexed(self) -> List[str]:
present_files = self.get_present_files()
processed_indexfiles_df = self.dbaccessor.read_all_indexfileprocessing_df()
indexed_df = processed_indexfiles_df[processed_indexfiles_df.status == self.PROCESSED_STR]
indexed_files = indexed_df.fileName.to_list()
not_indexed = set(present_files) - set(indexed_files)
return list(not_indexed)
def _index_file(self, file_name: str):
LOGGER.info("indexing file %s", file_name)
# todo: check if table already contains entries
# will fail at the moment, since the the primary key is defined
sub_df, full_path = self.get_sub_df(file_name)
sub_df['fullPath'] = full_path
sub_df['originFile'] = file_name
sub_df['originFileType'] = self.file_type
sub_df['url'] = BaseReportIndexer.URL_PREFIX
sub_df['url'] = sub_df['url'] + sub_df['cik'].astype(str) + '/' + \
sub_df['adsh'].str.replace('-', '') + '/' + sub_df['adsh'] + '-index.htm'
self.dbaccessor.add_index_report(sub_df,
IndexFileProcessingState(
fileName=file_name,
fullPath=full_path,
status=self.PROCESSED_STR,
entries=len(sub_df),
processTime=self.process_time
))
def process(self):
"""
index all not zip-files that were not indexed yet.
"""
not_indexed_files = self._calculate_not_indexed()
for not_indexed_file in not_indexed_files:
self._index_file(file_name=not_indexed_file)
class ReportParquetIndexer(BaseReportIndexer):
"""
Index the reports in parquet files.
"""
def __init__(self, db_dir: str, parquet_dir: str, file_type: str):
super().__init__(ParquetDBIndexingAccessor(db_dir=db_dir), file_type)
self.parquet_dir = parquet_dir
def get_present_files(self) -> List[str]:
return get_directories_in_directory(
os.path.join(self.parquet_dir, self.file_type))
def get_sub_df(self, file_name: str) -> Tuple[pd.DataFrame, str]:
path = os.path.join(self.parquet_dir, self.file_type, file_name)
full_path = os.path.realpath(path)
sub_file = os.path.join(full_path, f"{SUB_TXT}.parquet")
usecols = ['adsh',
'cik',
'name',
'form',
'filed',
'period']
return pd.read_parquet(sub_file, columns=usecols), full_path
Classes
class BaseReportIndexer (accessor: ParquetDBIndexingAccessor, file_type: str)
-
Base class to index the reports.
Expand source code
class BaseReportIndexer(ABC): """ Base class to index the reports. """ PROCESSED_STR: str = 'processed' URL_PREFIX: str = 'https://www.sec.gov/Archives/edgar/data/' def __init__(self, accessor: ParquetDBIndexingAccessor, file_type: str): self.dbaccessor = accessor self.file_type = file_type # get current datetime in UTC utc_dt = datetime.now(timezone.utc) # convert UTC time to ISO 8601 format iso_date = utc_dt.astimezone().isoformat() self.process_time = iso_date @abstractmethod def get_present_files(self) -> List[str]: """ returns the list with the filenames that are already present Returns: List[str]: list with the zip filenames that are already present """ @abstractmethod def get_sub_df(self, file_name: str) -> Tuple[pd.DataFrame, str]: """ loads the content of sub_txt into a dataframe and returns the dataframe and the fullpath to the data as a tuple. Args: file_name: name of the original zip file Returns: Tuple[pd.Dataframe, str]: DataFrame with the content in the sub_txt file, file path """ def _calculate_not_indexed(self) -> List[str]: present_files = self.get_present_files() processed_indexfiles_df = self.dbaccessor.read_all_indexfileprocessing_df() indexed_df = processed_indexfiles_df[processed_indexfiles_df.status == self.PROCESSED_STR] indexed_files = indexed_df.fileName.to_list() not_indexed = set(present_files) - set(indexed_files) return list(not_indexed) def _index_file(self, file_name: str): LOGGER.info("indexing file %s", file_name) # todo: check if table already contains entries # will fail at the moment, since the the primary key is defined sub_df, full_path = self.get_sub_df(file_name) sub_df['fullPath'] = full_path sub_df['originFile'] = file_name sub_df['originFileType'] = self.file_type sub_df['url'] = BaseReportIndexer.URL_PREFIX sub_df['url'] = sub_df['url'] + sub_df['cik'].astype(str) + '/' + \ sub_df['adsh'].str.replace('-', '') + '/' + sub_df['adsh'] + '-index.htm' self.dbaccessor.add_index_report(sub_df, IndexFileProcessingState( fileName=file_name, fullPath=full_path, status=self.PROCESSED_STR, entries=len(sub_df), processTime=self.process_time )) def process(self): """ index all not zip-files that were not indexed yet. """ not_indexed_files = self._calculate_not_indexed() for not_indexed_file in not_indexed_files: self._index_file(file_name=not_indexed_file)
Ancestors
- abc.ABC
Subclasses
Class variables
var PROCESSED_STR : str
var URL_PREFIX : str
Methods
def get_present_files(self) ‑> List[str]
-
returns the list with the filenames that are already present
Returns
List[str]
- list with the zip filenames that are already present
Expand source code
@abstractmethod def get_present_files(self) -> List[str]: """ returns the list with the filenames that are already present Returns: List[str]: list with the zip filenames that are already present """
def get_sub_df(self, file_name: str) ‑> Tuple[pandas.core.frame.DataFrame, str]
-
loads the content of sub_txt into a dataframe and returns the dataframe and the fullpath to the data as a tuple.
Args
file_name
- name of the original zip file
Returns
Tuple[pd.Dataframe, str]
- DataFrame with the content in the sub_txt file, file path
Expand source code
@abstractmethod def get_sub_df(self, file_name: str) -> Tuple[pd.DataFrame, str]: """ loads the content of sub_txt into a dataframe and returns the dataframe and the fullpath to the data as a tuple. Args: file_name: name of the original zip file Returns: Tuple[pd.Dataframe, str]: DataFrame with the content in the sub_txt file, file path """
def process(self)
-
index all not zip-files that were not indexed yet.
Expand source code
def process(self): """ index all not zip-files that were not indexed yet. """ not_indexed_files = self._calculate_not_indexed() for not_indexed_file in not_indexed_files: self._index_file(file_name=not_indexed_file)
class ReportParquetIndexer (db_dir: str, parquet_dir: str, file_type: str)
-
Index the reports in parquet files.
Expand source code
class ReportParquetIndexer(BaseReportIndexer): """ Index the reports in parquet files. """ def __init__(self, db_dir: str, parquet_dir: str, file_type: str): super().__init__(ParquetDBIndexingAccessor(db_dir=db_dir), file_type) self.parquet_dir = parquet_dir def get_present_files(self) -> List[str]: return get_directories_in_directory( os.path.join(self.parquet_dir, self.file_type)) def get_sub_df(self, file_name: str) -> Tuple[pd.DataFrame, str]: path = os.path.join(self.parquet_dir, self.file_type, file_name) full_path = os.path.realpath(path) sub_file = os.path.join(full_path, f"{SUB_TXT}.parquet") usecols = ['adsh', 'cik', 'name', 'form', 'filed', 'period'] return pd.read_parquet(sub_file, columns=usecols), full_path
Ancestors
- BaseReportIndexer
- abc.ABC
Class variables
var PROCESSED_STR : str
var URL_PREFIX : str
Inherited members