Module secfsdstools.c_index.indexing_process
Indexing the downloaded to data
Expand source code
"""Indexing the downloaded to data"""
import logging
import os
from datetime import datetime, timezone
from typing import List
import pandas as pd
from secfsdstools.a_utils.constants import SUB_TXT
from secfsdstools.a_utils.fileutils import get_directories_in_directory
from secfsdstools.c_automation.task_framework import AbstractThreadProcess
from secfsdstools.c_index.indexdataaccess import IndexFileProcessingState, ParquetDBIndexingAccessor
LOGGER = logging.getLogger(__name__)
class IndexingTask:
""" Indexes the content of a folder by writing the content of the sub_df file into the
index tables.
"""
PROCESSED_STR: str = 'processed'
URL_PREFIX: str = 'https://www.sec.gov/Archives/edgar/data/'
def __init__(self,
dbaccessor: ParquetDBIndexingAccessor,
file_path: str,
file_type: str,
process_time: str,
):
"""
Constructor.
Args:
dbaccessor: dbaccessor helper class
file_path: path to the directory with the sub_df file that has to be indexed
file_type: file_type, normally, this is "quarter"
process_time: process time that is used a timestamp in the created table entry
"""
self.dbaccessor = dbaccessor
self.file_path = file_path
self.file_type = file_type
self.file_name = os.path.basename(file_path)
self.process_time = process_time
def _get_sub_df(self) -> pd.DataFrame:
"""
reads the content of the sub_df file into dataframe.
Returns:
pd.DataFrame
"""
sub_file = os.path.join(self.file_path, f"{SUB_TXT}.parquet")
usecols = ['adsh',
'cik',
'name',
'form',
'filed',
'period']
return pd.read_parquet(sub_file, columns=usecols)
def prepare(self):
""" prepare Task. Nothing to do."""
def execute(self):
"""
Reads the sub_df content and writes the entries to the index.
"""
logger = logging.getLogger()
logger.debug("indexing file %s", self.file_name)
# todo: check if table already contains entries
# will fail at the moment, since the the primary key is defined
sub_df = self._get_sub_df()
sub_df['fullPath'] = self.file_path
sub_df['originFile'] = self.file_name
sub_df['originFileType'] = self.file_type
sub_df['url'] = self.URL_PREFIX
sub_df['url'] = sub_df['url'] + sub_df['cik'].astype(str) + '/' + \
sub_df['adsh'].str.replace('-', '') + '/' + sub_df['adsh'] + '-index.htm'
self.dbaccessor.add_index_report(sub_df,
IndexFileProcessingState(
fileName=self.file_name,
fullPath=self.file_path,
status=self.PROCESSED_STR,
entries=len(sub_df),
processTime=self.process_time
))
def commit(self):
""" no special commit handling. """
# no special commit necessary
return "success"
def exception(self, exception) -> str:
""" no special exception handling. """
return f"failed {exception}"
def __str__(self) -> str:
return f"IndexingTask(file_path: {self.file_path})"
class ReportParquetIndexerProcess(AbstractThreadProcess):
"""
Index the reports in parquet files.
"""
def __init__(self,
db_dir: str,
file_type: str,
parquet_dir: str,
execute_serial: bool=True):
"""
Constructor.
Args:
db_dir: location of the dbfile.
file_type: type of the data, usually this is "quarter".
parquet_dir: parent directory in which the transformed parquet files are.
"""
# only use serial execution, since indexing is rather quick
super().__init__(execute_serial=execute_serial,
chunksize=0)
self.dbaccessor = ParquetDBIndexingAccessor(db_dir=db_dir)
self.file_type = file_type
self.parquet_dir = parquet_dir
# get current datetime in UTC
utc_dt = datetime.now(timezone.utc)
# convert UTC time to ISO 8601 format string
iso_date = utc_dt.astimezone().isoformat()
self.process_time = iso_date
def _get_present_files(self) -> List[str]:
"""
returns the available folders within the parquet directory.
Returns:
List[str] list with foldernames.
"""
return get_directories_in_directory(
os.path.join(self.parquet_dir, self.file_type))
def _calculate_not_indexed_file_paths(self) -> List[str]:
"""
calculates which parquet files were not indexed yet.
Returns:
List[str]: list with directories which need to be indexed.
"""
present_files = self._get_present_files()
processed_indexfiles_df = self.dbaccessor.read_all_indexfileprocessing_df()
indexed_df = processed_indexfiles_df[
processed_indexfiles_df.status == IndexingTask.PROCESSED_STR]
indexed_files = indexed_df.fileName.to_list()
not_indexed_file_names = list(set(present_files) - set(indexed_files))
return [os.path.realpath(os.path.join(self.parquet_dir, self.file_type, file_name))
for file_name in not_indexed_file_names]
def calculate_tasks(self) -> List[IndexingTask]:
"""
Calculates the tasks, which have to be executed.
Returns:
List[IndexingTasks]
"""
not_indexed_paths: List[str] = self._calculate_not_indexed_file_paths()
return [IndexingTask(dbaccessor=self.dbaccessor,
file_path=file_path,
file_type=self.file_type,
process_time=self.process_time)
for file_path in not_indexed_paths]
Classes
class IndexingTask (dbaccessor: ParquetDBIndexingAccessor, file_path: str, file_type: str, process_time: str)
-
Indexes the content of a folder by writing the content of the sub_df file into the index tables.
Constructor.
Args
dbaccessor
- dbaccessor helper class
file_path
- path to the directory with the sub_df file that has to be indexed
file_type
- file_type, normally, this is "quarter"
process_time
- process time that is used a timestamp in the created table entry
Expand source code
class IndexingTask: """ Indexes the content of a folder by writing the content of the sub_df file into the index tables. """ PROCESSED_STR: str = 'processed' URL_PREFIX: str = 'https://www.sec.gov/Archives/edgar/data/' def __init__(self, dbaccessor: ParquetDBIndexingAccessor, file_path: str, file_type: str, process_time: str, ): """ Constructor. Args: dbaccessor: dbaccessor helper class file_path: path to the directory with the sub_df file that has to be indexed file_type: file_type, normally, this is "quarter" process_time: process time that is used a timestamp in the created table entry """ self.dbaccessor = dbaccessor self.file_path = file_path self.file_type = file_type self.file_name = os.path.basename(file_path) self.process_time = process_time def _get_sub_df(self) -> pd.DataFrame: """ reads the content of the sub_df file into dataframe. Returns: pd.DataFrame """ sub_file = os.path.join(self.file_path, f"{SUB_TXT}.parquet") usecols = ['adsh', 'cik', 'name', 'form', 'filed', 'period'] return pd.read_parquet(sub_file, columns=usecols) def prepare(self): """ prepare Task. Nothing to do.""" def execute(self): """ Reads the sub_df content and writes the entries to the index. """ logger = logging.getLogger() logger.debug("indexing file %s", self.file_name) # todo: check if table already contains entries # will fail at the moment, since the the primary key is defined sub_df = self._get_sub_df() sub_df['fullPath'] = self.file_path sub_df['originFile'] = self.file_name sub_df['originFileType'] = self.file_type sub_df['url'] = self.URL_PREFIX sub_df['url'] = sub_df['url'] + sub_df['cik'].astype(str) + '/' + \ sub_df['adsh'].str.replace('-', '') + '/' + sub_df['adsh'] + '-index.htm' self.dbaccessor.add_index_report(sub_df, IndexFileProcessingState( fileName=self.file_name, fullPath=self.file_path, status=self.PROCESSED_STR, entries=len(sub_df), processTime=self.process_time )) def commit(self): """ no special commit handling. """ # no special commit necessary return "success" def exception(self, exception) -> str: """ no special exception handling. """ return f"failed {exception}" def __str__(self) -> str: return f"IndexingTask(file_path: {self.file_path})"
Class variables
var PROCESSED_STR : str
var URL_PREFIX : str
Methods
def commit(self)
-
no special commit handling.
Expand source code
def commit(self): """ no special commit handling. """ # no special commit necessary return "success"
def exception(self, exception) ‑> str
-
no special exception handling.
Expand source code
def exception(self, exception) -> str: """ no special exception handling. """ return f"failed {exception}"
def execute(self)
-
Reads the sub_df content and writes the entries to the index.
Expand source code
def execute(self): """ Reads the sub_df content and writes the entries to the index. """ logger = logging.getLogger() logger.debug("indexing file %s", self.file_name) # todo: check if table already contains entries # will fail at the moment, since the the primary key is defined sub_df = self._get_sub_df() sub_df['fullPath'] = self.file_path sub_df['originFile'] = self.file_name sub_df['originFileType'] = self.file_type sub_df['url'] = self.URL_PREFIX sub_df['url'] = sub_df['url'] + sub_df['cik'].astype(str) + '/' + \ sub_df['adsh'].str.replace('-', '') + '/' + sub_df['adsh'] + '-index.htm' self.dbaccessor.add_index_report(sub_df, IndexFileProcessingState( fileName=self.file_name, fullPath=self.file_path, status=self.PROCESSED_STR, entries=len(sub_df), processTime=self.process_time ))
def prepare(self)
-
prepare Task. Nothing to do.
Expand source code
def prepare(self): """ prepare Task. Nothing to do."""
class ReportParquetIndexerProcess (db_dir: str, file_type: str, parquet_dir: str, execute_serial: bool = True)
-
Index the reports in parquet files.
Constructor.
Args
db_dir
- location of the dbfile.
file_type
- type of the data, usually this is "quarter".
parquet_dir
- parent directory in which the transformed parquet files are.
Expand source code
class ReportParquetIndexerProcess(AbstractThreadProcess): """ Index the reports in parquet files. """ def __init__(self, db_dir: str, file_type: str, parquet_dir: str, execute_serial: bool=True): """ Constructor. Args: db_dir: location of the dbfile. file_type: type of the data, usually this is "quarter". parquet_dir: parent directory in which the transformed parquet files are. """ # only use serial execution, since indexing is rather quick super().__init__(execute_serial=execute_serial, chunksize=0) self.dbaccessor = ParquetDBIndexingAccessor(db_dir=db_dir) self.file_type = file_type self.parquet_dir = parquet_dir # get current datetime in UTC utc_dt = datetime.now(timezone.utc) # convert UTC time to ISO 8601 format string iso_date = utc_dt.astimezone().isoformat() self.process_time = iso_date def _get_present_files(self) -> List[str]: """ returns the available folders within the parquet directory. Returns: List[str] list with foldernames. """ return get_directories_in_directory( os.path.join(self.parquet_dir, self.file_type)) def _calculate_not_indexed_file_paths(self) -> List[str]: """ calculates which parquet files were not indexed yet. Returns: List[str]: list with directories which need to be indexed. """ present_files = self._get_present_files() processed_indexfiles_df = self.dbaccessor.read_all_indexfileprocessing_df() indexed_df = processed_indexfiles_df[ processed_indexfiles_df.status == IndexingTask.PROCESSED_STR] indexed_files = indexed_df.fileName.to_list() not_indexed_file_names = list(set(present_files) - set(indexed_files)) return [os.path.realpath(os.path.join(self.parquet_dir, self.file_type, file_name)) for file_name in not_indexed_file_names] def calculate_tasks(self) -> List[IndexingTask]: """ Calculates the tasks, which have to be executed. Returns: List[IndexingTasks] """ not_indexed_paths: List[str] = self._calculate_not_indexed_file_paths() return [IndexingTask(dbaccessor=self.dbaccessor, file_path=file_path, file_type=self.file_type, process_time=self.process_time) for file_path in not_indexed_paths]
Ancestors
- AbstractThreadProcess
- AbstractProcess
- abc.ABC
Methods
def calculate_tasks(self) ‑> List[IndexingTask]
-
Calculates the tasks, which have to be executed.
Returns
List[IndexingTasks]
Expand source code
def calculate_tasks(self) -> List[IndexingTask]: """ Calculates the tasks, which have to be executed. Returns: List[IndexingTasks] """ not_indexed_paths: List[str] = self._calculate_not_indexed_file_paths() return [IndexingTask(dbaccessor=self.dbaccessor, file_path=file_path, file_type=self.file_type, process_time=self.process_time) for file_path in not_indexed_paths]
Inherited members