Source code for simod.simulation.prosimos

import itertools
import multiprocessing
from concurrent.futures import ProcessPoolExecutor as Pool
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple

import pandas as pd
from pix_framework.io.event_log import PROSIMOS_LOG_IDS, EventLogIDs, read_csv_log
from prosimos.simulation_engine import run_simulation

from simod.cli_formatter import print_message, print_notice, print_warning
from simod.metrics import compute_metric
from ..settings.common_settings import Metric

cpu_count = multiprocessing.cpu_count()


[docs] @dataclass class ProsimosSettings: """ Configuration settings for running a Prosimos simulation. Attributes ---------- bpmn_path : :class:`pathlib.Path` Path to the BPMN process model. parameters_path : :class:`pathlib.Path` Path to the Prosimos simulation parameters JSON file. output_log_path : :class:`pathlib.Path` Path to store the generated simulation log. num_simulation_cases : int Number of cases to simulate. simulation_start : :class:`pandas.Timestamp` Start timestamp for the simulation. """ bpmn_path: Path parameters_path: Path output_log_path: Path num_simulation_cases: int simulation_start: pd.Timestamp
[docs] def simulate(settings: ProsimosSettings): """ Runs a Prosimos simulation with the provided settings. Parameters ---------- settings : :class:`ProsimosSettings` Configuration settings containing paths and parameters for the simulation. Notes ----- - The function prints the simulation settings and invokes `run_simulation()`. - The labels of the start event, end event, and event timers are**not** recorded to the output log. - The simulation generates a process log stored in `settings.output_log_path`. """ print_message(f"Simulation settings: {settings}") run_simulation( bpmn_path=settings.bpmn_path.__str__(), json_path=settings.parameters_path.__str__(), total_cases=settings.num_simulation_cases, stat_out_path=None, # No statistics log_out_path=settings.output_log_path.__str__(), starting_at=settings.simulation_start.isoformat(), is_event_added_to_log=False, # Don't add Events (start/end/timers) to output log )
[docs] def simulate_and_evaluate( process_model_path: Path, parameters_path: Path, output_dir: Path, simulation_cases: int, simulation_start_time: pd.Timestamp, validation_log: pd.DataFrame, validation_log_ids: EventLogIDs, metrics: List[Metric], num_simulations: int = 1, ) -> List[dict]: """ Simulates a process model using Prosimos multiple times and evaluates the results. This function runs the simulation `num_simulations` times in parallel, compares the generated logs with a validation log, and evaluates them using provided metrics. Parameters ---------- process_model_path : :class:`pathlib.Path` Path to the BPMN process model. parameters_path : :class:`pathlib.Path` Path to the Prosimos simulation parameters JSON file. output_dir : :class:`pathlib.Path` Directory where simulated logs will be stored. simulation_cases : int Number of cases to simulate per run. simulation_start_time : :class:`pandas.Timestamp` Start timestamp for the simulation. validation_log : :class:`pandas.DataFrame` The actual event log to compare against. validation_log_ids : :class:`EventLogIDs` Column mappings for identifying events in the validation log. metrics : List[:class:`~simod.settings.common_settings.Metric`] A list of metrics used to evaluate the simulated logs. num_simulations : int, optional Number of parallel simulation runs (default is 1). Returns ------- List[dict] A list of evaluation results, one for each simulated log. Notes ----- - Uses multiprocessing to speed up simulation when `num_simulations > 1`. - Simulated logs are automatically compared with `validation_log`. """ simulation_log_paths = simulate_in_parallel( process_model_path, num_simulations, output_dir, parameters_path, simulation_cases, simulation_start_time ) evaluation_measurements = evaluate_logs(metrics, simulation_log_paths, validation_log, validation_log_ids) return evaluation_measurements
def simulate_in_parallel( process_model_path: Path, num_simulations: int, output_dir: Path, parameters_path: Path, simulation_cases: int, simulation_start_time: pd.Timestamp, ) -> List[Path]: """ Simulates a process model using Prosimos num_simulations times in parallel. :param process_model_path: Path to the BPMN model. :param num_simulations: Number of simulations to run in parallel. Default: 1. Each simulation produces a log. :param output_dir: Path to the output directory for simulated logs. :param parameters_path: Path to the Prosimos parameters. :param simulation_cases: Number of cases to simulate. :param simulation_start_time: Start time of the simulation. :return: Paths to the simulated logs. """ global cpu_count w_count = min(num_simulations, cpu_count) simulation_arguments = [ ProsimosSettings( bpmn_path=process_model_path, parameters_path=parameters_path, output_log_path=output_dir / f"simulated_log_{rep}.csv", num_simulation_cases=simulation_cases, simulation_start=simulation_start_time, ) for rep in range(num_simulations) ] print_notice(f"Simulating {len(simulation_arguments)} times with {w_count} workers") with Pool(w_count) as pool: pool.map(simulate, simulation_arguments) simulation_log_paths = [simulation_argument.output_log_path for simulation_argument in simulation_arguments] return simulation_log_paths def evaluate_logs( metrics: List[Metric], simulation_log_paths: List[Path], validation_log: pd.DataFrame, validation_log_ids: EventLogIDs, ) -> List[dict]: """ Calculates the evaluation metrics for the simulated logs comparing it with the validation log. """ global cpu_count w_count = min(len(simulation_log_paths), cpu_count) # Read simulated logs read_arguments = [ (simulation_log_paths[index], PROSIMOS_LOG_IDS, index) for index in range(len(simulation_log_paths)) ] print_notice(f"Reading {len(read_arguments)} simulated logs with {w_count} workers") with Pool(w_count) as pool: simulated_logs = pool.map(_read_simulated_log, read_arguments) # Evaluate evaluation_arguments = [ (validation_log, validation_log_ids, log, PROSIMOS_LOG_IDS, metrics) for log in simulated_logs ] print_notice(f"Evaluating {len(evaluation_arguments)} simulated logs with {w_count} workers") with Pool(w_count) as pool: evaluation_measurements = pool.map(_evaluate_logs_using_metrics, evaluation_arguments) evaluation_measurements = list(itertools.chain.from_iterable(evaluation_measurements)) return evaluation_measurements def _read_simulated_log(arguments: Tuple): log_path, log_ids, simulation_repetition_index = arguments df = read_csv_log(log_path, log_ids=log_ids) df["role"] = df["resource"] df["source"] = "simulation" df["run_num"] = simulation_repetition_index return df def _evaluate_logs_using_metrics(arguments: Tuple) -> List[dict]: validation_log: pd.DataFrame = arguments[0] validation_log_ids: EventLogIDs = arguments[1] simulated_log: pd.DataFrame = arguments[2] simulated_log_ids: EventLogIDs = arguments[3] metrics: List[Metric] = arguments[4] if len(simulated_log) > 0: rep = simulated_log.iloc[0].run_num else: print_warning("Error with the simulation! Trying to evaluate an empty simulated log.") rep = -1 measurements = [] for metric in metrics: value = compute_metric(metric, validation_log, validation_log_ids, simulated_log, simulated_log_ids) measurements.append({"run_num": rep, "metric": metric, "distance": value}) return measurements