Source code for paradance.pipeline.base

import logging
from abc import ABCMeta, abstractmethod
from itertools import zip_longest
from typing import Dict, Optional, Union

import pandas as pd

from ..dataloader import CSVLoader, ExcelLoader, load_config
from ..evaluation import Calculator, LogarithmPCACalculator
from ..optimization import MultipleObjective, optimize_run

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)


[docs] class BasePipeline(metaclass=ABCMeta): """Abstract base class for implementing processing pipelines. This class provides a structured way to define a sequence of operations for data processing and optimization tasks. It is designed to be subclassed with specific implementations of the abstract methods provided. Attributes: config (Dict): Configuration settings loaded from a configuration file. n_trials (int): The number of optimization trials to perform. """
[docs] def __init__( self, dataframe: Optional[pd.DataFrame] = None, config_path: Optional[str] = None, n_trials: int = 200, ) -> None: self.dataframe = dataframe self.config: Dict = load_config(config_path) self.n_trials = n_trials
def _load_dataset(self) -> None: """Loads the dataset based on the file type specified in the configuration. Supports loading from CSV and Excel files. """ if self.dataframe is None: self.file_type = self.config["DataLoader"].get("file_type", "csv") selected_columns = self.config["Calculator"].get("selected_columns", None) config = self.config["DataLoader"] config["clean_zero_columns"] = selected_columns if self.dataframe is None: if self.file_type == "csv": self.dataframe = CSVLoader( config=config, ).df elif self.file_type == "xlsx": self.dataframe = ExcelLoader( config=config, ).df @abstractmethod def _load_calculator(self) -> Union[Calculator, LogarithmPCACalculator]: """Load or define the calculator for the pipeline operations. This method should be implemented to load or define the calculator, which might be a model or any computational tool needed for the pipeline. """ pass def _add_objective( self, calculator: Union[Calculator, LogarithmPCACalculator] ) -> None: """Defines the optimization objective for PCA.""" self.objective = MultipleObjective( calculator=calculator, config=self.config["Objective"], ) def _add_evaluators(self) -> None: """Adds evaluators for optimization based on configuration settings.""" flags = self.config["Evaluator"].get("flags", None) target_columns = self.config["Evaluator"].get("target_columns", []) mask_columns = self.config["Evaluator"].get("mask_columns", []) hyperparameters = self.config["Evaluator"].get("hyperparameters", []) evaluator_propertys = self.config["Evaluator"].get("evaluator_propertys", []) groupbys = self.config["Evaluator"].get("groupbys", []) for ( flag, target_column, mask_column, hyperparameter, evaluator_property, groupby, ) in zip_longest( flags, target_columns, mask_columns, hyperparameters, evaluator_propertys, groupbys, ): self.objective.add_evaluator( flag=flag, target_column=target_column, mask_column=mask_column, hyperparameter=hyperparameter, evaluator_property=evaluator_property, groupby=groupby, ) def _optimize(self) -> None: """Runs the optimization process for the defined objective and evaluators.""" optimize_run( multiple_objective=self.objective, n_trials=self.n_trials, )
[docs] @abstractmethod def show_results(self) -> None: """Displays the results of the optimization process.""" pass
def _pre_run(self) -> None: """ Execute the preliminary setup tasks for the pipeline. This method includes steps such as loading the dataset, initializing the calculator, and adding objectives and evaluators to the calculator. """ logger.info("Running pipeline...") self._load_dataset() calculator = self._load_calculator() self._add_objective(calculator) self._add_evaluators()
[docs] def run(self) -> None: """ Run the main execution flow of the pipeline. This method handles the entire flow of running the optimization after performing all pre-run setup tasks. It concludes with displaying the results. """ self._pre_run() self._optimize() self.show_results()