import json
import shutil
from pathlib import Path
from typing import List, Optional, Tuple
import hyperopt
import numpy as np
import pandas as pd
from hyperopt import STATUS_FAIL, STATUS_OK, Trials, fmin, hp, tpe
from pix_framework.discovery.gateway_probabilities import (
GatewayProbabilities,
GatewayProbabilitiesDiscoveryMethod,
compute_gateway_probabilities,
)
from simod.branch_rules.discovery import discover_branch_rules, map_branch_rules_to_flows
from simod.branch_rules.types import BranchRules
from pix_framework.filesystem.file_manager import create_folder, get_random_folder_id, remove_asset
from pix_framework.io.bpm_graph import BPMNGraph
from .discovery import discover_process_model
from .settings import HyperoptIterationParams
from ..cli_formatter import print_message, print_step, print_subsection
from ..event_log.event_log import EventLog
from ..settings.control_flow_settings import ControlFlowSettings, ProcessModelDiscoveryAlgorithm
from ..simulation.parameters.BPS_model import BPSModel
from ..simulation.prosimos import simulate_and_evaluate
from ..utilities import get_process_model_path, get_simulation_parameters_path, hyperopt_step
[docs]
class ControlFlowOptimizer:
"""
Optimizes the control-flow of a business process model using hyperparameter optimization.
This class performs iterative optimization to refine the structure of a process model
and discover optimal gateway probabilities. It evaluates different configurations to
improve the process model based on a given metric.
The search space is built based on the parameters ranges in [settings].
Attributes
----------
event_log : :class:`EventLog`
Event log containing train and validation partitions.
initial_bps_model : :class:`BPSModel`
Business process simulation (BPS) model to use as a base, by replacing its control-flow model
with the discovered one in each iteration.
settings : :class:`ControlFlowSettings`
Configuration settings to build the search space for the optimization process.
base_directory : :class:`pathlib.Path`
Root directory where output files will be stored.
best_bps_model : :class:`BPSModel`, optional
Best discovered BPS model after the optimization process.
evaluation_measurements : :class:`pandas.DataFrame`
Quality measures recorded for each hyperopt iteration.
Notes
-----
- If no process model is provided, a discovery method will be used.
- Optimization is performed using TPE-hyperparameter optimization.
"""
# Event log with train/validation partitions
event_log: EventLog
# BPS model taken as starting point
initial_bps_model: BPSModel
# Configuration settings
settings: ControlFlowSettings
# Root directory for the output files
base_directory: Path
# Path to the best process model
best_bps_model: Optional[BPSModel]
# Quality measure of each hyperopt iteration
evaluation_measurements: pd.DataFrame
# Flag indicating if the model is provided of it needs to be discovered
_need_to_discover_model: bool
# Path to the training log in XES format, needed for Split Miner
_xes_train_log_path: Optional[Path] = None
# Set of trials for the hyperparameter optimization process
_bayes_trials = Trials
def __init__(self, event_log: EventLog, bps_model: BPSModel, settings: ControlFlowSettings, base_directory: Path):
# Save event log, optimization settings, and output directory
self.event_log = event_log
self.initial_bps_model = bps_model.deep_copy()
self.settings = settings
self.base_directory = base_directory
# Check if it is needed to discover the process model
self.best_bps_model = None
if self.initial_bps_model.process_model is None:
# Not provided, create path to best discovered model
self._need_to_discover_model = True
# Export training log (XES format) for SplitMiner
self._xes_train_both_timestamps_log_path = self.base_directory / (self.event_log.process_name + ".xes")
self.event_log.train_to_xes(self._xes_train_both_timestamps_log_path)
self._xes_train_only_end_log_path = self.base_directory / (self.event_log.process_name + "_only_end.xes")
self.event_log.train_to_xes(self._xes_train_only_end_log_path, only_complete_events=True)
else:
# Process model provided
self._need_to_discover_model = False
# Initialize table to store quality measures of each iteration
self.evaluation_measurements = pd.DataFrame(
columns=[
"distance",
"metric",
"status",
"gateway_probabilities",
"epsilon",
"eta",
"prioritize_parallelism",
"replace_or_joins",
"output_dir",
"f_score"
]
)
# Instantiate trials for hyper-optimization process
self._bayes_trials = Trials()
self.iteration_index = 0
def _hyperopt_iteration(self, hyperopt_iteration_dict: dict):
# Report new iteration
print_subsection(f"Control-flow optimization iteration {self.iteration_index}")
# Initialize status
status = STATUS_OK
# Create folder for this iteration
output_dir = self.base_directory / get_random_folder_id(prefix="iteration_")
create_folder(output_dir)
# Initialize BPS model for this iteration
current_bps_model = self.initial_bps_model.deep_copy()
# Parameters of this iteration
hyperopt_iteration_params = HyperoptIterationParams.from_hyperopt_dict(
hyperopt_dict=hyperopt_iteration_dict,
optimization_metric=self.settings.optimization_metric,
mining_algorithm=self.settings.mining_algorithm,
provided_model_path=None if self._need_to_discover_model else self.initial_bps_model.process_model,
output_dir=output_dir,
project_name=self.event_log.process_name,
)
print_message(f"Parameters: {hyperopt_iteration_params}")
# Discover process model (if needed)
if self._need_to_discover_model:
try:
status, current_bps_model.process_model = hyperopt_step(
status, self._discover_process_model, hyperopt_iteration_params
)
except Exception as e:
print_message(f"Process Discovery failed: {e}")
status = STATUS_FAIL
else:
current_bps_model.process_model = hyperopt_iteration_params.provided_model_path
# Discover gateway probabilities
status, current_bps_model.gateway_probabilities = hyperopt_step(
status,
self._discover_gateway_probabilities,
current_bps_model.process_model,
hyperopt_iteration_params.gateway_probabilities_method,
)
# Discover branch rules
if self.settings.discover_branch_rules:
status, current_bps_model.branch_rules = hyperopt_step(
status,
self._discover_branch_rules,
current_bps_model.process_model,
hyperopt_iteration_params
)
current_bps_model.gateway_probabilities = map_branch_rules_to_flows(
current_bps_model.gateway_probabilities,
current_bps_model.branch_rules
)
# Simulate candidate and evaluate its quality
status, evaluation_measurements = hyperopt_step(
status, self._simulate_bps_model, current_bps_model, hyperopt_iteration_params.output_dir
)
# Define the response of this iteration
status, response = self._define_response(
status, evaluation_measurements, hyperopt_iteration_params.output_dir, current_bps_model.process_model
)
print(f"Control-flow optimization iteration response: {response}")
# Save the quality of this evaluation and increase iteration index
self._process_measurements(hyperopt_iteration_params, status, evaluation_measurements)
self.iteration_index += 1
return response
[docs]
def run(self) -> HyperoptIterationParams:
"""
Runs the control-flow optimization process.
This method defines the hyperparameter search space and executes a
TPE-hyperparameter optimization process to discover the best control-flow model.
It evaluates multiple iterations and selects the best-performing set of parameters
for its discovery.
Returns
-------
:class:`~simod.control_flow.settings.HyperoptIterationParams`
The parameters of the best iteration of the optimization process.
Raises
------
AssertionError
If the best discovered process model path does not exist after optimization.
"""
# Define search space
self.iteration_index = 0
search_space = self._define_search_space(settings=self.settings)
# Launch optimization process
best_hyperopt_params = fmin(
fn=self._hyperopt_iteration,
space=search_space,
algo=tpe.suggest,
max_evals=self.settings.num_iterations,
trials=self._bayes_trials,
show_progressbar=False,
)
best_hyperopt_params = hyperopt.space_eval(search_space, best_hyperopt_params)
# Process best results
results = pd.DataFrame(self._bayes_trials.results).sort_values("loss")
best_result = results[results.status == STATUS_OK].iloc[0]
assert best_result[
"process_model_path"
].exists(), f"Best model path {best_result['process_model_path']} does not exist"
# Re-build parameters of the best hyperopt iteration
best_hyperopt_parameters = HyperoptIterationParams.from_hyperopt_dict(
hyperopt_dict=best_hyperopt_params,
optimization_metric=self.settings.optimization_metric,
mining_algorithm=self.settings.mining_algorithm,
provided_model_path=None if self._need_to_discover_model else self.initial_bps_model.process_model,
output_dir=best_result["output_dir"],
project_name=self.event_log.process_name,
)
# Instantiate best BPS model
self.best_bps_model = self.initial_bps_model.deep_copy()
# Update best process model (save it in base directory)
self.best_bps_model.process_model = get_process_model_path(self.base_directory, self.event_log.process_name)
best_model_path = (
best_result["process_model_path"] if self._need_to_discover_model else self.initial_bps_model.process_model
)
shutil.copyfile(best_model_path, self.best_bps_model.process_model)
# Update simulation parameters (save them in base directory)
best_parameters_path = get_simulation_parameters_path(self.base_directory, self.event_log.process_name)
shutil.copyfile(
get_simulation_parameters_path(best_result["output_dir"], self.event_log.process_name), best_parameters_path
)
self.best_bps_model.gateway_probabilities = [
GatewayProbabilities.from_dict(gateway_probabilities)
for gateway_probabilities in json.load(open(best_parameters_path, "r"))["gateway_branching_probabilities"]
]
# Save evaluation measurements
self.evaluation_measurements.sort_values("distance", ascending=True, inplace=True)
self.evaluation_measurements.to_csv(self.base_directory / "evaluation_measures.csv", index=False)
# Return settings of the best iteration
return best_hyperopt_parameters
def _define_search_space(self, settings: ControlFlowSettings) -> dict:
space = {}
if isinstance(settings.gateway_probabilities, list):
space["gateway_probabilities_method"] = hp.choice(
"gateway_probabilities_method", settings.gateway_probabilities
)
else:
space["gateway_probabilities_method"] = settings.gateway_probabilities
# Process model discovery parameters if we need to discover it
if self._need_to_discover_model:
if settings.mining_algorithm == ProcessModelDiscoveryAlgorithm.SPLIT_MINER_V1:
if isinstance(settings.epsilon, tuple):
space["epsilon"] = hp.uniform("epsilon", settings.epsilon[0], settings.epsilon[1])
else:
space["epsilon"] = settings.epsilon
if isinstance(settings.eta, tuple):
space["eta"] = hp.uniform("eta", settings.eta[0], settings.eta[1])
else:
space["eta"] = settings.eta
if isinstance(settings.prioritize_parallelism, list):
space["prioritize_parallelism"] = hp.choice(
"prioritize_parallelism", [str(value) for value in settings.prioritize_parallelism]
)
else:
space["prioritize_parallelism"] = str(settings.prioritize_parallelism)
if isinstance(settings.replace_or_joins, list):
space["replace_or_joins"] = hp.choice(
"replace_or_joins", [str(value) for value in settings.replace_or_joins]
)
else:
space["replace_or_joins"] = str(settings.replace_or_joins)
elif settings.mining_algorithm == ProcessModelDiscoveryAlgorithm.SPLIT_MINER_V2:
if isinstance(settings.epsilon, tuple):
space["epsilon"] = hp.uniform("epsilon", settings.epsilon[0], settings.epsilon[1])
else:
space["epsilon"] = settings.epsilon
if settings.discover_branch_rules and settings.f_score:
if isinstance(settings.f_score, tuple):
space["f_score"] = hp.uniform("f_score", settings.f_score[0], settings.f_score[1])
else:
space["f_score"] = settings.f_score
return space
def cleanup(self):
remove_asset(self.base_directory)
@staticmethod
def _define_response(
status: str, evaluation_measurements: list, output_dir: Path, process_model_path: Path
) -> Tuple[str, dict]:
# Compute mean distance if status is OK
if status is STATUS_OK:
distance = np.mean([x["distance"] for x in evaluation_measurements])
# Change status if distance value is negative
if distance < 0.0:
status = STATUS_FAIL
else:
distance = 1.0
# Define response dict
response = {
"loss": distance, # Loss value for the fmin function
"status": status, # Status of the optimization iteration
"output_dir": output_dir,
"process_model_path": process_model_path,
}
# Return updated status and processed response
return status, response
def _process_measurements(self, params: HyperoptIterationParams, status, evaluation_measurements):
optimization_parameters = params.to_dict()
optimization_parameters["status"] = status
if status == STATUS_OK:
for measurement in evaluation_measurements:
values = {
"distance": measurement["distance"],
"metric": measurement["metric"],
}
values = values | optimization_parameters
self.evaluation_measurements = pd.concat([self.evaluation_measurements, pd.DataFrame([values])])
else:
values = {
"distance": 0,
"metric": params.optimization_metric,
}
values = values | optimization_parameters
self.evaluation_measurements = pd.concat([self.evaluation_measurements, pd.DataFrame([values])])
def _discover_process_model(self, params: HyperoptIterationParams) -> Path:
print_step(f"Discovering Process Model with {params.mining_algorithm.value}")
output_model_path = get_process_model_path(params.output_dir, self.event_log.process_name)
if params.mining_algorithm is ProcessModelDiscoveryAlgorithm.SPLIT_MINER_V1:
discover_process_model(self._xes_train_only_end_log_path, output_model_path, params)
else:
discover_process_model(self._xes_train_both_timestamps_log_path, output_model_path, params)
return output_model_path
def _discover_branch_rules(self, process_model: Path, params: HyperoptIterationParams) -> List[BranchRules]:
print_step(f"Discovering branch rules with f_score {params.f_score}")
bpmn_graph = BPMNGraph.from_bpmn_path(process_model)
return discover_branch_rules(
bpmn_graph,
self.event_log.train_partition,
self.event_log.log_ids,
f_score=params.f_score
)
def _discover_gateway_probabilities(
self, process_model: Path, gateway_probabilities_method: GatewayProbabilitiesDiscoveryMethod
) -> List[GatewayProbabilities]:
print_step(f"Computing gateway probabilities with {gateway_probabilities_method}")
bpmn_graph = BPMNGraph.from_bpmn_path(process_model)
return compute_gateway_probabilities(
event_log=self.event_log.train_partition,
log_ids=self.event_log.log_ids,
bpmn_graph=bpmn_graph,
discovery_method=gateway_probabilities_method,
)
def _simulate_bps_model(self, bps_model: BPSModel, output_dir: Path) -> List[dict]:
bps_model.replace_activity_names_with_ids()
json_parameters_path = bps_model.to_json(output_dir, self.event_log.process_name)
evaluation_measures = simulate_and_evaluate(
process_model_path=bps_model.process_model,
parameters_path=json_parameters_path,
output_dir=output_dir,
simulation_cases=self.event_log.validation_partition[self.event_log.log_ids.case].nunique(),
simulation_start_time=self.event_log.validation_partition[self.event_log.log_ids.start_time].min(),
validation_log=self.event_log.validation_partition,
validation_log_ids=self.event_log.log_ids,
metrics=[self.settings.optimization_metric],
num_simulations=self.settings.num_evaluations_per_iteration,
)
return evaluation_measures