import json
import shutil
from pathlib import Path
from typing import List, Optional
import pandas as pd
from pix_framework.discovery.case_arrival import discover_case_arrival_model
from pix_framework.discovery.gateway_probabilities import compute_gateway_probabilities
from pix_framework.discovery.resource_calendar_and_performance.calendar_discovery_parameters import (
CalendarDiscoveryParameters,
)
from pix_framework.discovery.resource_model import discover_resource_model
from pix_framework.filesystem.file_manager import create_folder, get_random_folder_id, remove_asset
from pix_framework.io.bpm_graph import BPMNGraph
from pix_framework.io.bpmn import get_activities_names_from_bpmn
from simod.batching.discovery import discover_batching_rules
from simod.branch_rules.discovery import discover_branch_rules, map_branch_rules_to_flows
from simod.cli_formatter import print_section, print_subsection
from simod.control_flow.discovery import discover_process_model, add_bpmn_diagram_to_model
from simod.control_flow.optimizer import ControlFlowOptimizer
from simod.control_flow.settings import HyperoptIterationParams as ControlFlowHyperoptIterationParams
from simod.data_attributes.discovery import discover_data_attributes
from simod.event_log.event_log import EventLog
from simod.extraneous_delays.optimizer import ExtraneousDelaysOptimizer
from simod.extraneous_delays.types import ExtraneousDelay
from simod.extraneous_delays.utilities import add_timers_to_bpmn_model
from simod.prioritization.discovery import discover_prioritization_rules
from simod.resource_model.optimizer import ResourceModelOptimizer
from simod.resource_model.repair import repair_with_missing_activities
from simod.resource_model.settings import HyperoptIterationParams as ResourceModelHyperoptIterationParams
from simod.runtime_meter import RuntimeMeter
from simod.settings.control_flow_settings import ProcessModelDiscoveryAlgorithm
from simod.settings.simod_settings import SimodSettings
from simod.simulation.parameters.BPS_model import BPSModel
from simod.simulation.prosimos import simulate_and_evaluate
from simod.utilities import get_process_model_path, get_simulation_parameters_path
[docs]
class Simod:
"""
Class to run the full pipeline of SIMOD in order to discover a BPS model from an event log.
Attributes
----------
settings : :class:`~simod.settings.simod_settings.SimodSettings`
Configuration to run SIMOD and all its stages.
event_log : :class:`~simod.event_log.event_log.EventLog`
EventLog class storing the preprocessed training, validation, and (optionally) test partitions.
output_dir : :class:`~pathlib.Path`
Path to the folder where to write all the SIMOD outputs.
final_bps_model : :class:`~simod.simulation.parameters.BPS_model.BPSModel`
Instance of the best BPS model discovered by SIMOD.
"""
# Event log with the train, validation and test logs.
_event_log: EventLog
# Settings for all SIMOD optimization and discovery processes
_settings: SimodSettings
# Best BPS model obtained from the discovery processes
_best_bps_model: BPSModel
# Final BPS model discovered with the best hyperparams on the training+validation log
final_bps_model: Optional[BPSModel]
# Directory to write all the files
_output_dir: Path
# Optimizer for the Control-Flow and Gateway Probabilities
_control_flow_optimizer: Optional[ControlFlowOptimizer]
# Optimizer for the Resource Model
_resource_model_optimizer: Optional[ResourceModelOptimizer]
# Optimizer for the Extraneous Delay Timers
_extraneous_delays_optimizer: Optional[ExtraneousDelaysOptimizer]
def __init__(
self,
settings: SimodSettings,
event_log: EventLog,
output_dir: Optional[Path] = None,
):
self._settings = settings
self._event_log = event_log
self._best_bps_model = BPSModel(process_model=self._settings.common.process_model_path)
if output_dir is None:
self._output_dir = Path(__file__).parent.parent.parent / "outputs" / get_random_folder_id()
create_folder(self._output_dir)
else:
self._output_dir = output_dir
self._control_flow_dir = self._output_dir / "control-flow"
create_folder(self._control_flow_dir)
self._resource_model_dir = self._output_dir / "resource_model"
create_folder(self._resource_model_dir)
if self._settings.extraneous_activity_delays is not None:
self._extraneous_delays_dir = self._output_dir / "extraneous-delay-timers"
create_folder(self._extraneous_delays_dir)
self._best_result_dir = self._output_dir / "best_result"
create_folder(self._best_result_dir)
[docs]
def run(self, runtimes: Optional[RuntimeMeter] = None):
"""
Executes the SIMOD pipeline to discover the BPS model that better reflects the behavior recorded in the input
event log based on the specified configuration.
Parameters
----------
runtimes : :class:`~simod.runtime_meter.RuntimeMeter`, optional
Instance for tracking the runtime of the different stages in the SIMOD pipeline. When provided, SIMOD
pipeline stages will be tracked and reported along with stages previously tracked in the instance (e.g.,
preprocessing). If not provided, the runtime tracking reported will only contain SIMOD stages.
Returns
-------
None
The method performs in-place execution of the pipeline and does not return a value.
Notes
-----
- This method generates all output files under the folder ``[output_dir]/<latest_run>/best_result/``.
- This method updates internal attributes of the class, such as `final_bps_model`, with the best BPS model found
during the pipeline execution.
"""
# Runtime object
runtimes = RuntimeMeter() if runtimes is None else runtimes
runtimes.start(RuntimeMeter.TOTAL)
# Model activities might be different from event log activities if the model has been provided,
# because we split the event log into train, test, and validation partitions.
# We use model_activities to repair resource_model later after its discovery from a reduced event log.
model_activities: Optional[list[str]] = None
if self._settings.common.process_model_path is not None:
model_activities = get_activities_names_from_bpmn(self._settings.common.process_model_path)
# --- Discover Default Case Arrival and Resource Allocation models --- #
print_section("Discovering initial BPS Model")
runtimes.start(RuntimeMeter.INITIAL_MODEL)
self._best_bps_model.case_arrival_model = discover_case_arrival_model(
self._event_log.train_validation_partition, # No optimization process here, use train + validation
self._event_log.log_ids,
use_observed_arrival_distribution=self._settings.common.use_observed_arrival_distribution,
)
calendar_discovery_parameters = CalendarDiscoveryParameters()
self._best_bps_model.resource_model = discover_resource_model(
self._event_log.train_partition, # Only train to not discover tasks that won't exist for control-flow opt.
self._event_log.log_ids,
calendar_discovery_parameters,
)
self._best_bps_model.calendar_granularity = calendar_discovery_parameters.granularity
if model_activities is not None:
repair_with_missing_activities(
resource_model=self._best_bps_model.resource_model,
model_activities=model_activities,
event_log=self._event_log.train_validation_partition,
log_ids=self._event_log.log_ids,
)
runtimes.stop(RuntimeMeter.INITIAL_MODEL)
# --- Control-Flow Optimization --- #
print_section("Optimizing control-flow parameters")
runtimes.start(RuntimeMeter.CONTROL_FLOW_MODEL)
best_control_flow_params = self._optimize_control_flow()
self._best_bps_model.process_model = self._control_flow_optimizer.best_bps_model.process_model
self._best_bps_model.gateway_probabilities = self._control_flow_optimizer.best_bps_model.gateway_probabilities
self._best_bps_model.branch_rules = self._control_flow_optimizer.best_bps_model.branch_rules
runtimes.stop(RuntimeMeter.CONTROL_FLOW_MODEL)
# --- Data Attributes --- #
if (self._settings.common.discover_data_attributes or
self._settings.resource_model.discover_prioritization_rules):
print_section("Discovering data attributes")
runtimes.start(RuntimeMeter.DATA_ATTRIBUTES_MODEL)
global_attributes, case_attributes, event_attributes = discover_data_attributes(
self._event_log.train_validation_partition,
self._event_log.log_ids,
)
self._best_bps_model.global_attributes = global_attributes
self._best_bps_model.case_attributes = case_attributes
self._best_bps_model.event_attributes = event_attributes
runtimes.stop(RuntimeMeter.DATA_ATTRIBUTES_MODEL)
# --- Resource Model Discovery --- #
print_section("Optimizing resource model parameters")
runtimes.start(RuntimeMeter.RESOURCE_MODEL)
best_resource_model_params = self._optimize_resource_model(model_activities)
self._best_bps_model.resource_model = self._resource_model_optimizer.best_bps_model.resource_model
self._best_bps_model.calendar_granularity = self._resource_model_optimizer.best_bps_model.calendar_granularity
self._best_bps_model.prioritization_rules = self._resource_model_optimizer.best_bps_model.prioritization_rules
self._best_bps_model.batching_rules = self._resource_model_optimizer.best_bps_model.batching_rules
runtimes.stop(RuntimeMeter.RESOURCE_MODEL)
# --- Extraneous Delays Discovery --- #
if self._settings.extraneous_activity_delays is not None:
print_section("Discovering extraneous delays")
runtimes.start(RuntimeMeter.EXTRANEOUS_DELAYS)
timers = self._optimize_extraneous_activity_delays()
self._best_bps_model.extraneous_delays = timers
add_timers_to_bpmn_model(self._best_bps_model.process_model, timers) # Update BPMN model on disk
runtimes.stop(RuntimeMeter.EXTRANEOUS_DELAYS)
# --- Discover final BPS model --- #
print_section("Discovering final BPS model")
runtimes.start(RuntimeMeter.FINAL_MODEL)
self.final_bps_model = BPSModel( # Bypass all models already discovered with train+validation
process_model=get_process_model_path(self._best_result_dir, self._event_log.process_name),
case_arrival_model=self._best_bps_model.case_arrival_model,
case_attributes=self._best_bps_model.case_attributes,
global_attributes=self._best_bps_model.global_attributes,
event_attributes=self._best_bps_model.event_attributes,
)
# Process model
if self._settings.common.process_model_path is None:
# Discover process model with best control-flow parameters
print_subsection(
f"Discovering process model with best control-flow settings: {best_control_flow_params.to_dict()}"
)
# Instantiate event log to discover the process model with
xes_log_path = self._best_result_dir / f"{self._event_log.process_name}_train_val.xes"
if best_control_flow_params.mining_algorithm is ProcessModelDiscoveryAlgorithm.SPLIT_MINER_V1:
self._event_log.train_validation_to_xes(xes_log_path, only_complete_events=True)
else:
self._event_log.train_validation_to_xes(xes_log_path)
# Discover the process model
discover_process_model(
log_path=xes_log_path,
output_model_path=self.final_bps_model.process_model,
params=best_control_flow_params,
)
else:
# Copy provided process model to best result folder
print_subsection("Using provided process model")
shutil.copy(self._settings.common.process_model_path, self.final_bps_model.process_model)
# Gateway probabilities
print_subsection("Discovering gateway probabilities")
best_bpmn_graph = BPMNGraph.from_bpmn_path(self.final_bps_model.process_model)
self.final_bps_model.gateway_probabilities = compute_gateway_probabilities(
event_log=self._event_log.train_validation_partition,
log_ids=self._event_log.log_ids,
bpmn_graph=best_bpmn_graph,
discovery_method=best_control_flow_params.gateway_probabilities_method,
)
# Branch Rules
if self._settings.control_flow.discover_branch_rules:
print_section("Discovering branch conditions")
self.final_bps_model.branch_rules = discover_branch_rules(
best_bpmn_graph,
self._event_log.train_validation_partition,
self._event_log.log_ids,
f_score=best_control_flow_params.f_score
)
self.final_bps_model.gateway_probabilities = \
map_branch_rules_to_flows(self.final_bps_model.gateway_probabilities, self.final_bps_model.branch_rules)
# Resource model
print_subsection("Discovering best resource model")
self.final_bps_model.resource_model = discover_resource_model(
event_log=self._event_log.train_validation_partition,
log_ids=self._event_log.log_ids,
params=best_resource_model_params.calendar_discovery_params,
)
self.final_bps_model.calendar_granularity = best_resource_model_params.calendar_discovery_params.granularity
if model_activities is not None:
repair_with_missing_activities(
resource_model=self.final_bps_model.resource_model,
model_activities=model_activities,
event_log=self._event_log.train_validation_partition,
log_ids=self._event_log.log_ids,
)
# Prioritization
if best_resource_model_params.discover_prioritization_rules:
print_subsection("Discovering prioritization rules")
self.final_bps_model.prioritization_rules = discover_prioritization_rules(
self._event_log.train_validation_partition,
self._event_log.log_ids,
self._best_bps_model.case_attributes,
)
# Batching
if best_resource_model_params.discover_batching_rules:
print_subsection("Discovering batching rules")
self.final_bps_model.batching_rules = discover_batching_rules(
self._event_log.train_validation_partition, self._event_log.log_ids
)
# Extraneous delays
if self._best_bps_model.extraneous_delays is not None:
# Add discovered delays and update BPMN model on disk
self.final_bps_model.extraneous_delays = self._best_bps_model.extraneous_delays
add_timers_to_bpmn_model(self.final_bps_model.process_model, self._best_bps_model.extraneous_delays)
self.final_bps_model.replace_activity_names_with_ids()
runtimes.stop(RuntimeMeter.FINAL_MODEL)
runtimes.stop(RuntimeMeter.TOTAL)
# Write JSON parameters to file
json_parameters_path = get_simulation_parameters_path(self._best_result_dir, self._event_log.process_name)
with json_parameters_path.open("w") as f:
json.dump(self.final_bps_model.to_prosimos_format(), f)
# --- Evaluate final BPS model --- #
if self._settings.common.perform_final_evaluation:
print_subsection("Evaluate")
runtimes.start(RuntimeMeter.EVALUATION)
simulation_dir = self._best_result_dir / "evaluation"
simulation_dir.mkdir(parents=True, exist_ok=True)
self._evaluate_model(self.final_bps_model.process_model, json_parameters_path, simulation_dir)
runtimes.stop(RuntimeMeter.EVALUATION)
# --- Export settings and clean temporal files --- #
print_section(f"Exporting canonical model, runtimes, settings and cleaning up intermediate files")
canonical_model_path = self._best_result_dir / "canonical_model.json"
_export_canonical_model(canonical_model_path, best_control_flow_params, best_resource_model_params)
runtimes_model_path = self._best_result_dir / "runtimes.json"
_export_runtimes(runtimes_model_path, runtimes)
if self._settings.common.clean_intermediate_files:
self._clean_up()
self._settings.to_yaml(self._best_result_dir)
# --- Add BPMN diagram to the model --- #
add_bpmn_diagram_to_model(self.final_bps_model.process_model)
def _optimize_control_flow(self) -> ControlFlowHyperoptIterationParams:
"""
Control-flow and Gateway Probabilities discovery.
"""
self._control_flow_optimizer = ControlFlowOptimizer(
event_log=self._event_log,
bps_model=self._best_bps_model,
settings=self._settings.control_flow,
base_directory=self._control_flow_dir,
)
best_control_flow_params = self._control_flow_optimizer.run()
return best_control_flow_params
def _optimize_resource_model(
self, model_activities: Optional[list[str]] = None
) -> ResourceModelHyperoptIterationParams:
"""
Resource Model (resource profiles, calendars an activity performances) discovery.
"""
self._resource_model_optimizer = ResourceModelOptimizer(
event_log=self._event_log,
bps_model=self._best_bps_model,
settings=self._settings.resource_model,
base_directory=self._resource_model_dir,
model_activities=model_activities,
)
best_resource_model_params = self._resource_model_optimizer.run()
return best_resource_model_params
def _optimize_extraneous_activity_delays(self) -> List[ExtraneousDelay]:
settings = self._settings.extraneous_activity_delays
self._extraneous_delays_optimizer = ExtraneousDelaysOptimizer(
event_log=self._event_log,
bps_model=self._best_bps_model,
settings=settings,
base_directory=self._extraneous_delays_dir,
)
timers = self._extraneous_delays_optimizer.run()
return timers
def _evaluate_model(self, process_model: Path, json_parameters: Path, output_dir: Path):
simulation_cases = self._event_log.test_partition[self._settings.common.log_ids.case].nunique()
simulation_start_time = self._event_log.test_partition[self._settings.common.log_ids.start_time].min()
metrics = (
self._settings.common.evaluation_metrics
if isinstance(self._settings.common.evaluation_metrics, list)
else [self._settings.common.evaluation_metrics]
)
self._event_log.test_partition.to_csv(output_dir / "test_log.csv", index=False)
measurements = simulate_and_evaluate(
process_model_path=process_model,
parameters_path=json_parameters,
output_dir=output_dir,
simulation_cases=simulation_cases,
simulation_start_time=simulation_start_time,
validation_log=self._event_log.test_partition,
validation_log_ids=self._event_log.log_ids,
num_simulations=self._settings.common.num_final_evaluations,
metrics=metrics,
)
measurements_path = output_dir / "evaluation_metrics.csv"
measurements_df = pd.DataFrame.from_records(measurements)
measurements_df.to_csv(measurements_path, index=False)
def _clean_up(self):
print_section("Removing intermediate files")
self._control_flow_optimizer.cleanup()
self._resource_model_optimizer.cleanup()
if self._settings.extraneous_activity_delays is not None:
self._extraneous_delays_optimizer.cleanup()
if self._settings.common.process_model_path is None:
final_xes_log_path = self._best_result_dir / f"{self._event_log.process_name}_train_val.xes"
remove_asset(final_xes_log_path)
def _export_canonical_model(
file_path: Path,
control_flow_settings: ControlFlowHyperoptIterationParams,
calendar_settings: ResourceModelHyperoptIterationParams,
):
canon = {
"control_flow": control_flow_settings.to_dict(),
"calendars": calendar_settings.to_dict(),
}
with open(file_path, "w") as f:
json.dump(canon, f)
def _export_runtimes(
file_path: Path,
runtimes: RuntimeMeter
):
with open(file_path, "w") as file:
json.dump(
runtimes.runtimes | {'explanation': f"Add '{RuntimeMeter.PREPROCESSING}' with '{RuntimeMeter.TOTAL}' "
f"for the runtime of the entire SIMOD pipeline and preprocessing "
f"stage. '{RuntimeMeter.EVALUATION}', if reported, should be left out "
f"as it measures the quality assessment of the final BPS model (i.e., "
f"it is not part of the discovery process."},
file
)