import copy
import json
import shutil
from pathlib import Path
from typing import List, Optional, Tuple
import hyperopt
import numpy as np
import pandas as pd
from hyperopt import STATUS_FAIL, STATUS_OK, Trials, fmin, hp, tpe
from pix_framework.discovery.resource_calendar_and_performance.calendar_discovery_parameters import (
CalendarDiscoveryParameters,
)
from pix_framework.discovery.resource_model import ResourceModel, discover_resource_model
from pix_framework.discovery.resource_profiles import discover_pool_resource_profiles
from pix_framework.filesystem.file_manager import create_folder, get_random_folder_id, remove_asset
from .repair import repair_with_missing_activities
from .settings import HyperoptIterationParams
from ..batching.discovery import discover_batching_rules
from ..cli_formatter import print_message, print_step, print_subsection
from ..event_log.event_log import EventLog
from ..prioritization.discovery import discover_prioritization_rules
from ..settings.resource_model_settings import CalendarType, ResourceModelSettings
from ..simulation.parameters.BPS_model import BPSModel
from ..simulation.prosimos import simulate_and_evaluate
from ..utilities import get_process_model_path, get_simulation_parameters_path, hyperopt_step
[docs]
class ResourceModelOptimizer:
"""
Optimizes the resource model of a business process model using hyperparameter optimization.
This class performs iterative optimization to refine the resource model
and discover optimal resource profiles and availability calendars. It
evaluates different configurations to improve the process model based
on a given metric.
The search space is built based on the parameters ranges in [settings].
Attributes
----------
event_log : :class:`~simod.event_log.event_log.EventLog`
Event log containing train and validation partitions.
initial_bps_model : :class:`~simod.simulation.parameters.BPS_model.BPSModel`
Business process simulation (BPS) model to use as a base, by replacing its resource model
with the discovered one in each iteration.
settings : :class:`~simod.settings.resource_model_settings.ResourceModelSettings`
Configuration settings to build the search space for the optimization process.
base_directory : :class:`pathlib.Path`
Root directory where output files will be stored.
best_bps_model : :class:`~simod.simulation.parameters.BPS_model.BPSModel`, optional
Best discovered BPS model after the optimization process.
evaluation_measurements : :class:`pandas.DataFrame`
Quality measures recorded for each hyperopt iteration.
Notes
-----
- Optimization is performed using TPE-hyperparameter optimization.
"""
# Event log with train/validation partitions
event_log: EventLog
# BPS model taken as starting point
initial_bps_model: BPSModel
# Configuration settings
settings: ResourceModelSettings
# Root directory for the output files
base_directory: Path
# Path to the best process model
best_bps_model: Optional[BPSModel]
# Quality measure of each hyperopt iteration
evaluation_measurements: pd.DataFrame
# Set of trials for the hyperparameter optimization process
_bayes_trials = Trials
def __init__(
self,
event_log: EventLog,
bps_model: BPSModel,
settings: ResourceModelSettings,
base_directory: Path,
model_activities: Optional[list[str]] = None,
):
# Save event log, optimization settings, and output directory
self.event_log = event_log
self.initial_bps_model = bps_model.deep_copy()
self.settings = settings
self.base_directory = base_directory
self.model_activities = model_activities
# Initialize table to store quality measures of each iteration
self.evaluation_measurements = pd.DataFrame(
columns=[
"distance",
"metric",
"status",
"discovery_type",
"granularity",
"confidence",
"support",
"participation",
"output_dir",
]
)
# Instantiate trials for hyper-optimization process
self._bayes_trials = Trials()
self.iteration_index = 0
# Discover resource pools (performance purposes) if needed
if self.settings.discovery_type is CalendarType.DIFFERENTIATED_BY_POOL:
self._resource_pools = discover_pool_resource_profiles(
self.event_log.train_partition, self.event_log.log_ids
)
else:
self._resource_pools = None
# Prioritization
if self.settings.discover_prioritization_rules and len(self.initial_bps_model.case_attributes) > 0:
print_subsection("Discovering prioritization rules")
self._prioritization_rules = discover_prioritization_rules(
self.event_log.train_partition,
self.event_log.log_ids,
self.initial_bps_model.case_attributes,
)
elif self.settings.discover_prioritization_rules:
print_subsection("0 case attributes discovered, turning off prioritization discovery.")
self._prioritization_rules = []
else:
self._prioritization_rules = None
# Batching
if self.settings.discover_batching_rules:
print_subsection("Discovering batching rules")
self._batching_rules = discover_batching_rules(self.event_log.train_partition, self.event_log.log_ids)
else:
self._batching_rules = None
def _hyperopt_iteration(self, hyperopt_iteration_dict: dict):
# Report new iteration
print_subsection(f"Resource Model optimization iteration {self.iteration_index}")
# Initialize status
status = STATUS_OK
# Create folder for this iteration
output_dir = self.base_directory / get_random_folder_id(prefix="iteration_")
create_folder(output_dir)
# Initialize BPS model for this iteration
current_bps_model = self.initial_bps_model.deep_copy()
# Parameters of this iteration
hyperopt_iteration_params = HyperoptIterationParams.from_hyperopt_dict(
hyperopt_dict=hyperopt_iteration_dict,
optimization_metric=self.settings.optimization_metric,
discovery_type=self.settings.discovery_type,
output_dir=output_dir,
process_model_path=current_bps_model.process_model,
project_name=self.event_log.process_name,
)
print_message(f"Parameters: {hyperopt_iteration_params}")
# Discover resource model
status, current_bps_model.resource_model = hyperopt_step(
status, self._discover_resource_model, hyperopt_iteration_params.calendar_discovery_params
)
current_bps_model.calendar_granularity = hyperopt_iteration_params.calendar_discovery_params.granularity
if self.model_activities is not None:
repair_with_missing_activities(
resource_model=current_bps_model.resource_model,
model_activities=self.model_activities,
event_log=self.event_log.train_validation_partition,
log_ids=self.event_log.log_ids,
)
# Add prioritization if needed
if hyperopt_iteration_params.discover_prioritization_rules:
current_bps_model.prioritization_rules = self._prioritization_rules
# Add batching rules if needed
if hyperopt_iteration_params.discover_batching_rules:
current_bps_model.batching_rules = self._batching_rules
# Simulate candidate and evaluate its quality
status, evaluation_measurements = hyperopt_step(
status,
self._simulate_bps_model,
current_bps_model,
hyperopt_iteration_params.output_dir
)
# Define the response of this iteration
status, response = self._define_response(
status, evaluation_measurements, hyperopt_iteration_params.output_dir, current_bps_model.process_model
)
print(f"Resource Model optimization iteration response: {response}")
# Save the quality of this evaluation and increase iteration index
self._process_measurements(hyperopt_iteration_params, status, evaluation_measurements)
self.iteration_index += 1
return response
[docs]
def run(self) -> HyperoptIterationParams:
"""
Runs the resource model optimization process.
This method defines the hyperparameter search space and executes a
TPE-hyperparameter optimization process to discover the best resource model.
It evaluates multiple iterations and selects the best-performing set of parameters
for its discovery.
Returns
-------
:class:`~simod.resource_model.settings.HyperoptIterationParams`
The parameters of the best iteration of the optimization process.
"""
# Define search space
self.iteration_index = 0
search_space = self._define_search_space(settings=self.settings)
# Launch optimization process
params_best_iteration = fmin(
fn=self._hyperopt_iteration,
space=search_space,
algo=tpe.suggest,
max_evals=self.settings.num_iterations,
trials=self._bayes_trials,
show_progressbar=False,
)
params_best_iteration = hyperopt.space_eval(search_space, params_best_iteration)
# Process best results
results = pd.DataFrame(self._bayes_trials.results).sort_values("loss")
best_result = results[results.status == STATUS_OK].iloc[0]
# Re-build parameters of the best hyperopt iteration
best_hyperopt_parameters = HyperoptIterationParams.from_hyperopt_dict(
hyperopt_dict=params_best_iteration,
optimization_metric=self.settings.optimization_metric,
discovery_type=self.settings.discovery_type,
output_dir=best_result["output_dir"],
project_name=self.event_log.process_name,
process_model_path=self.initial_bps_model.process_model,
)
# Instantiate best BPS model
self.best_bps_model = self.initial_bps_model.deep_copy()
# Update best process model (save it in base directory)
self.best_bps_model.process_model = get_process_model_path(self.base_directory, self.event_log.process_name)
shutil.copyfile(best_result["process_model_path"], self.best_bps_model.process_model)
# Update simulation parameters (save them in base directory)
best_parameters_path = get_simulation_parameters_path(self.base_directory, self.event_log.process_name)
shutil.copyfile(
get_simulation_parameters_path(best_result["output_dir"], self.event_log.process_name), best_parameters_path
)
# Update resource model
self.best_bps_model.resource_model = ResourceModel.from_dict(json.load(open(best_parameters_path, "r")))
self.best_bps_model.calendar_granularity = best_hyperopt_parameters.calendar_discovery_params.granularity
# Save evaluation measurements
self.evaluation_measurements.sort_values("distance", ascending=True, inplace=True)
self.evaluation_measurements.to_csv(self.base_directory / "evaluation_measures.csv", index=False)
# Return settings of the best iteration
return best_hyperopt_parameters
def _discover_resource_model(self, params: CalendarDiscoveryParameters) -> ResourceModel:
print_step(f"Discovering resource model with {params}")
return discover_resource_model(
event_log=self.event_log.train_partition,
log_ids=self.event_log.log_ids,
params=params,
provided_profiles=copy.deepcopy(self._resource_pools),
)
def cleanup(self):
print_step(f"Removing {self.base_directory}")
remove_asset(self.base_directory)
def _define_search_space(self, settings: ResourceModelSettings):
space = {}
# If discovery type requires discovery, create search space for parameters
if settings.discovery_type in [
CalendarType.UNDIFFERENTIATED,
CalendarType.DIFFERENTIATED_BY_RESOURCE,
CalendarType.DIFFERENTIATED_BY_POOL,
]:
if isinstance(settings.granularity, tuple):
space["granularity"] = hp.uniform("granularity", settings.granularity[0], settings.granularity[1])
else:
space["granularity"] = settings.granularity
if isinstance(settings.confidence, tuple):
space["confidence"] = hp.uniform("confidence", settings.confidence[0], settings.confidence[1])
else:
space["confidence"] = settings.confidence
if isinstance(settings.support, tuple):
space["support"] = hp.uniform("support", settings.support[0], settings.support[1])
else:
space["support"] = settings.support
if isinstance(settings.participation, tuple):
space["participation"] = hp.uniform(
"participation", settings.participation[0], settings.participation[1]
)
else:
space["participation"] = settings.participation
if settings.discover_prioritization_rules and len(self._prioritization_rules) > 0:
space["discover_prioritization_rules"] = hp.choice("discover_prioritization_rules", [True, False])
else:
space["discover_prioritization_rules"] = False
if settings.discover_batching_rules and len(self._batching_rules) > 0:
space["discover_batching_rules"] = hp.choice("discover_batching_rules", [True, False])
else:
space["discover_batching_rules"] = False
elif settings.discovery_type == CalendarType.DIFFERENTIATED_BY_RESOURCE_FUZZY:
if isinstance(settings.granularity, tuple):
space["granularity"] = hp.uniform("granularity", settings.granularity[0], settings.granularity[1])
else:
space["granularity"] = settings.granularity
if isinstance(settings.fuzzy_angle, tuple):
space["fuzzy_angle"] = hp.uniform("fuzzy_angle", settings.fuzzy_angle[0], settings.fuzzy_angle[1])
else:
space["fuzzy_angle"] = settings.fuzzy_angle
return space
def _process_measurements(self, params: HyperoptIterationParams, status: str, evaluation_measurements: list):
data = {
"output_dir": params.output_dir,
"metric": params.optimization_metric,
"discovery_type": params.calendar_discovery_params.discovery_type,
"granularity": params.calendar_discovery_params.granularity,
"confidence": params.calendar_discovery_params.confidence,
"support": params.calendar_discovery_params.support,
"participation": params.calendar_discovery_params.participation,
"discover_prioritization_rules": params.discover_prioritization_rules,
"discover_batching_rules": params.discover_batching_rules,
"status": status,
}
if status == STATUS_OK:
for measurement in evaluation_measurements:
values = {
"distance": measurement["distance"],
"metric": measurement["metric"],
}
values = values | data
self.evaluation_measurements = pd.concat([self.evaluation_measurements, pd.DataFrame([values])])
else:
values = {
"distance": 0,
"metric": params.optimization_metric,
}
values = values | data
self.evaluation_measurements = pd.concat([self.evaluation_measurements, pd.DataFrame([values])])
@staticmethod
def _define_response(
status: str, evaluation_measurements: list, output_dir: Path, process_model_path: Path
) -> Tuple[str, dict]:
# Compute mean distance if status is OK
if status is STATUS_OK:
distance = np.mean([x["distance"] for x in evaluation_measurements])
# Change status if distance value is negative
if distance < 0.0:
status = STATUS_FAIL
else:
distance = 1.0
# Define response dict
response = {
"loss": distance, # Loss value for the fmin function
"status": status, # Status of the optimization iteration
"output_dir": output_dir,
"process_model_path": process_model_path,
}
# Return updated status and processed response
return status, response
def _simulate_bps_model(self, bps_model: BPSModel, output_dir: Path) -> List[dict]:
bps_model.replace_activity_names_with_ids()
json_parameters_path = bps_model.to_json(output_dir, self.event_log.process_name)
evaluation_measures = simulate_and_evaluate(
process_model_path=bps_model.process_model,
parameters_path=json_parameters_path,
output_dir=output_dir,
simulation_cases=self.event_log.validation_partition[self.event_log.log_ids.case].nunique(),
simulation_start_time=self.event_log.validation_partition[self.event_log.log_ids.start_time].min(),
validation_log=self.event_log.validation_partition,
validation_log_ids=self.event_log.log_ids,
metrics=[self.settings.optimization_metric],
num_simulations=self.settings.num_evaluations_per_iteration,
)
return evaluation_measures