Source code for simod.event_log.event_log

from pathlib import Path
from typing import Optional

import pandas as pd
import pendulum
from openxes_cli.lib import csv_to_xes
from pix_framework.io.event_log import DEFAULT_XES_IDS, EventLogIDs, read_csv_log
from pix_framework.io.event_log import split_log_training_validation_trace_wise as split_log

from .preprocessor import Preprocessor
from ..settings.preprocessing_settings import PreprocessingSettings
from ..utilities import get_process_name_from_log_path


[docs] class EventLog: """ Represents an event log containing process execution data and its partitioned subsets. This class provides functionality for storing and managing an event log, including training, validation, and test partitions. It also supports exporting logs to XES format and loading event logs from files. Attributes ---------- train_partition : :class:`pandas.DataFrame` DataFrame containing the training partition of the event log. validation_partition : :class:`pandas.DataFrame` DataFrame containing the validation partition of the event log. train_validation_partition : :class:`pandas.DataFrame` DataFrame containing both training and validation data. test_partition : :class:`pandas.DataFrame` DataFrame containing the test partition of the event log, if available. log_ids : :class:`EventLogIDs` Identifiers for mapping column names in the event log. process_name : str The name of the business process associated with the event log, primarily used for file naming. """ train_partition: pd.DataFrame validation_partition: pd.DataFrame train_validation_partition: pd.DataFrame test_partition: pd.DataFrame log_ids: EventLogIDs process_name: str # a name of the process that is used mainly for file names def __init__( self, log_train: pd.DataFrame, log_validation: pd.DataFrame, log_train_validation: pd.DataFrame, log_test: pd.DataFrame, log_ids: EventLogIDs, process_name: Optional[str] = None, ): self.train_partition = log_train self.validation_partition = log_validation self.train_validation_partition = log_train_validation self.test_partition = log_test self.log_ids = log_ids if process_name is not None: self.process_name = process_name else: self.process_name = "business_process"
[docs] @staticmethod def from_path( train_log_path: Path, log_ids: EventLogIDs, preprocessing_settings: PreprocessingSettings = PreprocessingSettings(), need_test_partition: Optional[bool] = False, process_name: Optional[str] = None, test_log_path: Optional[Path] = None, split_ratio: float = 0.8, ) -> "EventLog": """ Loads an event log from a file and performs partitioning into training, validation, and test subsets. Parameters ---------- train_log_path : :class:`pathlib.Path` Path to the training event log file (CSV or CSV.GZ). log_ids : :class:`EventLogIDs` Identifiers for mapping column names in the event log. preprocessing_settings : :class:`PreprocessingSettings`, optional Settings for preprocessing the event log. need_test_partition : bool, optional Whether to create a test partition if a separate test log is not provided. process_name : str, optional Name of the business process. If not provided, it is inferred from the file name. test_log_path : :class:`pathlib.Path`, optional Path to the test event log file (CSV or CSV.GZ). If provided, the test log is loaded separately. split_ratio : float, default=0.8 Ratio for splitting training and validation partitions. Returns ------- :class:`EventLog` An instance of :class:`EventLog` with training, validation, and test partitions. Raises ------ ValueError If the specified training or test log has an unsupported file extension. """ # Check event log prerequisites if not train_log_path.name.endswith(".csv") and not train_log_path.name.endswith(".csv.gz"): raise ValueError( f"The specified training log has an unsupported extension ({train_log_path.name}). " f"Only 'csv' and 'csv.gz' supported." ) if test_log_path is not None: if not test_log_path.name.endswith(".csv") and not test_log_path.name.endswith(".csv.gz"): raise ValueError( f"The specified test log has an unsupported extension ({test_log_path.name}). " f"Only 'csv' and 'csv.gz' supported." ) # Read training event log event_log = read_csv_log(train_log_path, log_ids) # Preprocess training event log preprocessor = Preprocessor(event_log, log_ids) processed_event_log = preprocessor.run( multitasking=preprocessing_settings.multitasking, enable_time_concurrency_threshold=preprocessing_settings.enable_time_concurrency_threshold, concurrency_thresholds=preprocessing_settings.concurrency_thresholds, ) # Get test if needed, and split train+validation if test_log_path is not None: # Test log provided, the input log is train+validation train_validation_df = processed_event_log test_df = read_csv_log(test_log_path, log_ids) elif need_test_partition: # Test log not provided but needed, split input into test and train+validation train_validation_df, test_df = split_log(processed_event_log, log_ids, training_percentage=split_ratio) else: # Test log not provided and not needed, the input log is train+validation train_validation_df = processed_event_log test_df = None train_df, validation_df = split_log(train_validation_df, log_ids, training_percentage=split_ratio) # Return EventLog instance with different partitions return EventLog( log_train=train_df, log_validation=validation_df, log_train_validation=train_validation_df, log_test=test_df, log_ids=log_ids, process_name=get_process_name_from_log_path(train_log_path) if process_name is None else process_name, )
[docs] def train_to_xes(self, path: Path, only_complete_events: bool = False): """ Saves the training log to an XES file. Parameters ---------- path : :class:`pathlib.Path` Destination path for the XES file. only_complete_events : bool If true, generate XES file containing only events corresponding to the end of each activity instance. """ write_xes(self.train_partition, self.log_ids, path, only_complete_events=only_complete_events)
[docs] def validation_to_xes(self, path: Path, only_complete_events: bool = False): """ Saves the validation log to an XES file. Parameters ---------- path : :class:`pathlib.Path` Destination path for the XES file. only_complete_events : bool If true, generate XES file containing only events corresponding to the end of each activity instance. """ write_xes(self.validation_partition, self.log_ids, path, only_complete_events=only_complete_events)
[docs] def train_validation_to_xes(self, path: Path, only_complete_events: bool = False): """ Saves the combined training and validation log to an XES file. Parameters ---------- path : :class:`pathlib.Path` Destination path for the XES file. only_complete_events : bool If true, generate XES file containing only events corresponding to the end of each activity instance. """ write_xes(self.train_validation_partition, self.log_ids, path, only_complete_events=only_complete_events)
[docs] def test_to_xes(self, path: Path, only_complete_events: bool = False): """ Saves the test log to an XES file. Parameters ---------- path : :class:`pathlib.Path` Destination path for the XES file. only_complete_events : bool If true, generate XES file containing only events corresponding to the end of each activity instance. """ write_xes(self.test_partition, self.log_ids, path, only_complete_events=only_complete_events)
def write_xes( event_log: pd.DataFrame, log_ids: EventLogIDs, output_path: Path, only_complete_events: bool = False, ): """ Writes the log to a file in XES format. """ # Copy event log to modify df = event_log.copy() # Transform timestamps to expected format xes_datetime_format = "YYYY-MM-DDTHH:mm:ss.SSSZ" # Start time if only_complete_events: df[log_ids.start_time] = "" else: df[log_ids.start_time] = df[log_ids.start_time].apply( lambda x: pendulum.parse(x.isoformat()).format(xes_datetime_format) ) # End time df[log_ids.end_time] = df[log_ids.end_time].apply( lambda x: pendulum.parse(x.isoformat()).format(xes_datetime_format) ) # Rename columns to XES expected df = df.rename( columns={ log_ids.activity: "concept:name", log_ids.case: "case:concept:name", log_ids.resource: "org:resource", log_ids.start_time: "start_timestamp", log_ids.end_time: "time:timestamp", } )[["case:concept:name", "concept:name", "org:resource", "start_timestamp", "time:timestamp", ]] # Fill null values df.fillna("UNDEFINED", inplace=True) # Write and convert df.to_csv(output_path, index=False) csv_to_xes(output_path, output_path)