Parallel processing

Learning objectives:

  • Learn how to implement parallel processing.
  • Understand how to choose an appropriate number of cores.

Relevant reproducibility guidelines:

  • STARS Reproducibility Recommendations: Optimise model run time.

Pre-reading:

This page continues on from: Replications.

Entity generation → Entity processing → Initialisation bias → Performance measures → Replications → Parallel processing

Required packages:

These should be available from environment setup in the “Test yourself” section of Environments.

import time

from IPython.display import HTML
from itables import to_html_datatable
from joblib import Parallel, delayed, cpu_count
import numpy as np
import pandas as pd
import plotly.express as px
import scipy.stats as st
import simpy
from sim_tools.distributions import Exponential
library(data.table)
library(dplyr)
library(future)
library(future.apply)
library(ggplot2)
library(knitr)
library(plotly)
library(simmer)
library(tidyr)


A computer’s processor has multiple cores. Each core is like a worker that can do tasks.

Normally, when you run a program, it only uses one core, so only one worker is doing the job. This is referred to as serial or sequential processing.

Parallel processing means telling the computer to use more than one core at the same time. With more workers sharing the tasks, the work can be done faster.

How to add parallel processing

On this page, we are making changes to the simulation code from the Replications page - but the Parameters class has been updated based on the determined length of warm-up, a data collection period of 2 weeks (20,160 minutes), and the determined number of replications.

Parameter class

A new attribute cores is used to record the number of CPU cores to use for parallel execution. If set to -1, it will use all available cores. If set to 1, it will run sequentially.

class Parameters:
    """
    Parameter class.

    Attributes
    ----------
    interarrival_time : float
        Mean time between arrivals (minutes).
    consultation_time : float
        Mean length of doctor's consultation (minutes).
    number_of_doctors : int
        Number of doctors.
    warm_up_period : int
        Duration of the warm-up period (minutes).
    data_collection_period : int
        Duration of the data collection period (minutes).
    number_of_runs : int
        The number of runs (i.e., replications).
    cores : int
        Number of CPU cores to use for parallel execution. For all
        available cores, set to -1. For sequential execution, set to 1.
    verbose : bool
        Whether to print messages as simulation runs.
    """
    def __init__(
        self, interarrival_time=5, consultation_time=10, number_of_doctors=3,
        warm_up_period=30, data_collection_period=40,
        number_of_runs=5, cores=1, verbose=False
    ):
        """
        Initialise Parameters instance.

        Parameters
        ----------
        interarrival_time : float
            Time between arrivals (minutes).
        consultation_time : float
            Length of consultation (minutes).
        number_of_doctors : int
            Number of doctors.
        warm_up_period : int
            Duration of the warm-up period (minutes).
        data_collection_period : int
            Duration of the data collection period (minutes).
        number_of_runs : int
            The number of runs (i.e., replications).
        cores : int
            Number of CPU cores to use for parallel execution. For all
            available cores, set to -1. For sequential execution, set to 1.
        verbose : bool
            Whether to print messages as simulation runs.
        """
        self.interarrival_time = interarrival_time
        self.consultation_time = consultation_time
        self.number_of_doctors = number_of_doctors
        self.warm_up_period = warm_up_period
        self.data_collection_period = data_collection_period
        self.number_of_runs = number_of_runs
        self.cores = cores
        self.verbose = verbose

Patient class

No changes required.

class Patient:
    """
    Represents a patient.

    Attributes
    ----------
    patient_id : int
        Unique patient identifier.
    period : str
        Arrival period (warm up or data collection) with emoji.
    arrival_time : float
        Time patient entered the system (minutes).
    wait_time : float
        Time spent waiting for the doctor (minutes).
    time_with_doctor : float
        Time spent in consultation with a doctor (minutes).
    end_time : float
        Time that patient leaves (minutes), or NaN if remain in system.
    """
    def __init__(self, patient_id, period, arrival_time):
        """
        Initialises a new patient.

        Parameters
        ----------
        patient_id : int
            Unique patient identifier.
        period : str
            Arrival period (warm up or data collection) with emoji.
        arrival_time : float
            Time patient entered the system (minutes).
        """
        self.patient_id = patient_id
        self.period = period
        self.arrival_time = arrival_time
        self.wait_time = np.nan
        self.time_with_doctor = np.nan
        self.end_time = np.nan

Monitored resource class

No changes required.

class MonitoredResource(simpy.Resource):
    """
    SimPy Resource subclass that tracks utilisation, queue length and mean
    number of patients in the system.

    Attributes
    ----------
    time_last_event : list
        Time of the most recent resource request or release event.
    area_resource_busy : list
        Cumulative time resources were busy, for computing utilisation.
    area_n_in_queue : list
        Cumulative time patients spent queueing for a resource, for
        computing average queue length.

    Notes
    -----
    Class adapted from Monks, Harper and Heather 2025.
    """

    def __init__(self, *args, **kwargs):
        """
        Initialise the MonitoredResource, resetting monitoring statistics.

        Parameters
        ----------
        *args :
            Positional arguments to be passed to the parent class.
        **kwargs :
            Keyword arguments to be passed to the parent class.
        """
        # Initialise a SimPy Resource
        super().__init__(*args, **kwargs)
        # Run the init_results() method
        self.init_results()

    def init_results(self):
        """
        Reset monitoring attributes.
        """
        self.time_last_event = [self._env.now]
        self.area_resource_busy = [0.0]
        self.area_n_in_queue = [0.0]

    def request(self, *args, **kwargs):
        """
        Update time-weighted statistics before requesting a resource.

        Parameters
        ----------
        *args :
            Positional arguments to be passed to the parent class.
        **kwargs :
            Keyword arguments to be passed to the parent class.

        Returns
        -------
        simpy.events.Event
            Event for the resource request.
        """
        # Update time-weighted statistics
        self.update_time_weighted_stats()
        # Request a resource
        return super().request(*args, **kwargs)

    def release(self, *args, **kwargs):
        """
        Update time-weighted statistics before releasing a resource.

        Parameters
        ----------
        *args :
            Positional arguments to be passed to the parent class.
        **kwargs :
            Keyword arguments to be passed to the parent class.

        Returns
        -------
        simpy.events.Event
            Event for the resource release.
        """
        # Update time-weighted statistics
        self.update_time_weighted_stats()
        # Release a resource
        return super().release(*args, **kwargs)

    def update_time_weighted_stats(self):
        """
        Update the time-weighted statistics for resource usage.

        Notes
        -----
        - These sums can be referred to as "the area under the curve".
        - They are called "time-weighted" statistics as they account for how
          long certain events or states persist over time.
        """
        # Calculate time since last event
        time_since_last_event = self._env.now - self.time_last_event[-1]

        # Add record of current time
        self.time_last_event.append(self._env.now)

        # Add "area under curve" of resources in use
        # self.count is the number of resources in use
        self.area_resource_busy.append(self.count * time_since_last_event)

        # Add "area under curve" of people in queue
        # len(self.queue) is the number of requests queued
        self.area_n_in_queue.append(len(self.queue) * time_since_last_event)

Model class

No changes required.

class Model:
    """
    Simulation model.

    Attributes
    ----------
    param : Parameters
        Simulation parameters.
    run_number : int
        Run number for random seed generation.
    env : simpy.Environment
        The SimPy environment for the simulation.
    doctor : simpy.Resource
        SimPy resource representing doctors.
    patients : list
        List of Patient objects.
    results_list : list
        List of dictionaries with the attributes of each patient.
    area_n_in_system : list of float
        List containing incremental area contributions used for
        time-weighted statistics of the number of patients in the system.
    time_last_n_in_system : float
        Simulation time at last update of the number-in-system statistic.
    n_in_system: int
        Current number of patients present in the system, including
        waiting and being served.
    arrival_dist : Exponential
        Distribution used to generate random patient inter-arrival times.
    consult_dist : Exponential
        Distribution used to generate length of a doctor's consultation.
    """
    def __init__(self, param, run_number):
        """
        Create a new Model instance.

        Parameters
        ----------
        param : Parameters
            Simulation parameters.
        run_number : int
            Run number for random seed generation.
        """
        self.param = param
        self.run_number = run_number

        # Create SimPy environment
        self.env = simpy.Environment()

        # Create resource
        self.doctor = MonitoredResource(
            self.env, capacity=self.param.number_of_doctors
        )

        # Create a random seed sequence based on the run number
        ss = np.random.SeedSequence(self.run_number)
        seeds = ss.spawn(2)

        # Set up attributes to store results
        self.patients = []
        self.results_list = []
        self.area_n_in_system = [0]
        self.time_last_n_in_system = self.env.now
        self.n_in_system = 0

        # Initialise distributions
        self.arrival_dist = Exponential(mean=self.param.interarrival_time,
                                        random_seed=seeds[0])
        self.consult_dist = Exponential(mean=self.param.consultation_time,
                                        random_seed=seeds[1])

    def update_n_in_system(self, inc):
        """
        Update the time-weighted statistics for number of patients in system.

        Parameters
        ----------
        inc : int
            Change in the number of patients (+1, 0, -1).
        """
        # Compute time since last event and calculate area under curve for that
        duration = self.env.now - self.time_last_n_in_system
        self.area_n_in_system.append(self.n_in_system * duration)
        # Update time and n in system
        self.time_last_n_in_system = self.env.now
        self.n_in_system += inc

    def generate_arrivals(self):
        """
        Process that generates patient arrivals.
        """
        while True:
            # Sample and pass time to next arrival
            sampled_iat = self.arrival_dist.sample()
            yield self.env.timeout(sampled_iat)

            # Check whether arrived during warm-up or data collection
            if self.env.now < self.param.warm_up_period:
                period = "\U0001F538 WU"
            else:
                period = "\U0001F539 DC"

            # Create a new patient
            patient = Patient(patient_id=len(self.patients)+1,
                              period=period,
                              arrival_time=self.env.now)
            self.patients.append(patient)

            # Update the number in the system
            self.update_n_in_system(inc=1)

            # Print arrival time
            if self.param.verbose:
                print(f"{patient.period} Patient {patient.patient_id} " +
                      f"arrives at time: {patient.arrival_time:.3f}")

            # Start process of consultation
            self.env.process(self.consultation(patient))

    def consultation(self, patient):
        """
        Process that simulates a consultation.

        Parameters
        ----------
        patient :
            Instance of the Patient() class representing a single patient.
        """
        start_wait = self.env.now
        # Patient requests access to a doctor (resource)
        with self.doctor.request() as req:
            yield req

            # Record how long patient waited before consultation
            patient.wait_time = self.env.now - start_wait

            if self.param.verbose:
                print(f"{patient.period} Patient {patient.patient_id} " +
                      f"starts consultation at: {self.env.now:.3f}")

            # Sample consultation duration and pass time spent with doctor
            patient.time_with_doctor = self.consult_dist.sample()
            yield self.env.timeout(patient.time_with_doctor)

            # Update number in system
            self.update_n_in_system(inc=-1)

            # Record end time
            patient.end_time = self.env.now
            if self.param.verbose:
                print(f"{patient.period} Patient {patient.patient_id} " +
                      f"leaves at: {patient.end_time:.3f}")

    def reset_results(self):
        """
        Reset results.
        """
        self.patients = []
        self.doctor.init_results()
        # For number in system, we reset area and time but not the count, as
        # it should include any remaining warm-up patients in the count
        self.area_n_in_system = [0]
        self.time_last_n_in_system = self.env.now

    def warmup(self):
        """
        Reset result collection after the warm-up period.
        """
        if self.param.warm_up_period > 0:
            # Delay process until warm-up period has completed
            yield self.env.timeout(self.param.warm_up_period)
            # Reset results variables
            self.reset_results()
            if self.param.verbose:
                print(f"Warm up period ended at time: {self.env.now}")

    def run(self):
        """
        Run the simulation.
        """
        # Schedule arrival generator and warm-up
        self.env.process(self.generate_arrivals())
        self.env.process(self.warmup())

        # Run the simulation
        self.env.run(until=(self.param.warm_up_period +
                            self.param.data_collection_period))

        # At simulation end, update time-weighted statistics by accounting
        # for the time from the last event up to the simulation finish.
        self.doctor.update_time_weighted_stats()

        # Run final calculation of number in system
        self.update_n_in_system(inc=0)

        # Create list of dictionaries containing each patient's attributes
        self.results_list = [x.__dict__ for x in self.patients]

Summary statistics function

No changes required.

def summary_stats(data):
    """
    Calculate mean, standard deviation and 95% confidence interval (CI).

    Parameters
    ----------
    data : pd.Series
        Data to use in calculation.

    Returns
    -------
    tuple
        (mean, standard deviation, CI lower, CI upper).
    """
    # Remove any NaN from the series
    data = data.dropna()

    # Find number of observations
    count = len(data)

    # If there are no observations, then set all to NaN
    if count == 0:
        mean, std_dev, ci_lower, ci_upper = np.nan, np.nan, np.nan, np.nan
    # If there is only one or two observations, can do mean but not others
    elif count < 3:
        mean = data.mean()
        std_dev, ci_lower, ci_upper = np.nan, np.nan, np.nan
    # With more than one observation, can calculate all...
    else:
        mean = data.mean()
        std_dev = data.std()
        # Special case for CI if variance is 0
        if np.var(data) == 0:
            ci_lower, ci_upper = mean, mean
        else:
            # Calculation of CI uses t-distribution, which is suitable for
            # smaller sample sizes (n<30)
            ci_lower, ci_upper = st.t.interval(
                confidence=0.95,
                df=count-1,
                loc=mean,
                scale=st.sem(data))
    return mean, std_dev, ci_lower, ci_upper

Runner class

The modified run_reps() method now distinguishes between sequential and parallel execution modes using the cores attribute from the parameters.

If cores is 1, simulations run sequentially.

For parallel execution, the code first checks if the requested number of cores is valid: allowing values of -1 (use all CPUs) or any integer between 1 and the total CPU count minus one. If valid, the code uses joblib’s Parallel and delayed to distribute simulation runs across the specified number of cores

class Runner:
    """
    Run the simulation.

    Attributes
    ----------
    param : Parameters
        Simulation parameters.
    """
    def __init__(self, param):
        """
        Initialise a new instance of the Runner class.

        Parameters
        ----------
        param : Parameters
            Simulation parameters.
        """
        self.param = param

    def run_single(self, run):
        """
        Runs the simulation once and performs results calculations.

        Parameters
        ----------
        run : int
            Run number for the simulation.

        Returns
        -------
        dict
            Contains patient-level results and results from each run.
        """
        model = Model(param=self.param, run_number=run)
        model.run()

        # Patient results
        patient_results = pd.DataFrame(model.results_list)
        patient_results["run"] = run
        patient_results["time_in_system"] = (
            patient_results["end_time"] - patient_results["arrival_time"]
        )
        # For each patient, if they haven't seen a doctor yet, calculate
        # their wait as current time minus arrival, else set as missing
        patient_results["unseen_wait_time"] = np.where(
            patient_results["time_with_doctor"].isna(),
            model.env.now - patient_results["arrival_time"], np.nan
        )

        # Run results
        run_results = {
            "run_number": run,
            "arrivals": len(patient_results),
            "mean_wait_time": patient_results["wait_time"].mean(),
            "mean_time_with_doctor": (
                patient_results["time_with_doctor"].mean()
            ),
            "mean_utilisation_tw": (
                sum(model.doctor.area_resource_busy) / (
                    self.param.number_of_doctors *
                    self.param.data_collection_period
                )
            ),
            "mean_queue_length": (
                sum(model.doctor.area_n_in_queue) /
                self.param.data_collection_period
            ),
            "mean_time_in_system": patient_results["time_in_system"].mean(),
            "mean_patients_in_system": (
                sum(model.area_n_in_system) /
                self.param.data_collection_period
            ),
            "unseen_count": patient_results["time_with_doctor"].isna().sum(),
            "unseen_wait_time": patient_results["unseen_wait_time"].mean()
        }

        return {
            "patient": patient_results,
            "run": run_results
        }

    def run_reps(self):
        """
        Execute a single model configuration for multiple runs.

        Returns
        -------
        dict
            Contains patient-level results, results from each run and
            overall results.
        """
        # Sequential execution
        if self.param.cores == 1: 
            all_results = [self.run_single(run)
                           for run in range(self.param.number_of_runs)]
        # Parallel execution
        else: 
            # Check number of cores is valid
            valid_cores = [-1] + list(range(1, cpu_count()))
            if self.param.cores not in valid_cores: 
                raise ValueError(
                    f"Invalid cores: {self.param.cores}. Must be one of: " +
                    f"{valid_cores}."
                )
            # Execute replications
            all_results = Parallel(n_jobs=self.param.cores)(
                delayed(self.run_single)(run)
                for run in range(self.param.number_of_runs)
            )

        # Separate results from each run into appropriate lists
        patient_results_list = [result["patient"] for result in all_results]
        run_results_list = [result["run"] for result in all_results]

        # Convert lists into dataframes
        patient_results_df = pd.concat(
            patient_results_list, ignore_index=True
        )
        run_results_df = pd.DataFrame(run_results_list)

        # Calculate average results and uncertainty from across all runs
        uncertainty_metrics = {}
        run_col = run_results_df.columns

        # Loop through the run performance measure columns
        # Calculate mean, standard deviation and 95% confidence interval
        for col in run_col[~run_col.isin(["run_number", "scenario"])]:
            uncertainty_metrics[col] = dict(zip(
                ["mean", "std_dev", "lower_95_ci", "upper_95_ci"],
                summary_stats(run_results_df[col])
            ))
        # Convert to dataframe
        overall_results_df = pd.DataFrame(uncertainty_metrics)

        return {
            "patient": patient_results_df,
            "run": run_results_df,
            "overall": overall_results_df
        }

Parameter function

A new attribute cores is used to record the number of CPU cores to use for parallel execution. If set to -1, it will use all available cores. If set to 1, it will run sequentially.

#' Generate parameter list.
#'
#' @param interarrival_time Numeric. Time between arrivals (minutes).
#' @param consultation_time Numeric. Mean length of doctor's
#'   consultation (minutes).
#' @param number_of_doctors Numeric. Number of doctors.
#' @param warm_up_period Numeric. Duration of the warm-up period (minutes).
#' @param data_collection_period Numeric. Duration of the data collection
#'   period (minutes).
#' @param number_of_runs Numeric. The number of runs (i.e., replications).
#' @param cores Numeric. Number of CPU cores to use for parallel execution.
#' @param verbose Boolean. Whether to print messages as simulation runs.
#'
#' @return A named list of parameters.
#' @export

create_params <- function(
  interarrival_time = 5L,
  consultation_time = 10L,
  number_of_doctors = 3L,
  warm_up_period = 30L,
  data_collection_period = 40L,
  number_of_runs = 5L,
  cores = 1L, 
  verbose = FALSE
) {
  list(
    interarrival_time = interarrival_time,
    consultation_time = consultation_time,
    number_of_doctors = number_of_doctors,
    warm_up_period = warm_up_period,
    data_collection_period = data_collection_period,
    number_of_runs = number_of_runs,
    cores = cores, 
    verbose = verbose
  )
}

Warm-up function

No changes required.

#' Filters arrivals and resources to remove warm-up results.
#'
#' @param result Named list with two tables: monitored arrivals & resources.
#' @param warm_up_period Length of warm-up period.
#'
#' @importFrom dplyr arrange filter group_by mutate n slice ungroup
#' @importFrom rlang .data
#'
#' @return The named list `result`, but with the tables (`arrivals` and
#' `resources`) filtered to remove warm-up results.
#' @export

filter_warmup <- function(result, warm_up_period) {
  # Skip filtering if the warm-up period is zero
  if (warm_up_period == 0L) return(result)

  # Arrivals: Keep only patients who came during the data collection period
  result[["arrivals"]] <- result[["arrivals"]] |>
    group_by(.data[["name"]]) |>
    filter(all(.data[["start_time"]] >= warm_up_period)) |>
    ungroup()

  # Resources: Filter to resource events in the data collection period
  dc_resources <- filter(result[["resources"]],
                         .data[["time"]] >= warm_up_period)

  if (nrow(dc_resources) > 0L) {

    # For each resource, get the last even during warm-up, and replace time
    # for that event with the start time of the data collection period
    last_usage <- result[["resources"]] |>
      filter(.data[["time"]] < warm_up_period) |>
      arrange(.data[["time"]]) |>
      group_by(.data[["resource"]]) |>
      slice(n()) |>
      mutate(time = warm_up_period) |>
      ungroup()

    # Set that last event of the resource/s as the first row of the dataframe
    # else calculations would assume all resources were idle at the start of
    # the data collection period
    result[["resources"]] <- rbind(last_usage, dc_resources)

  } else {
    # No events after warm-up; use filtered resource table as is
    result[["resources"]] <- dc_resources
  }

  result
}

Functions to calculate different metrics

No changes required.

#' Calculate the number of arrivals
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#'
#' @importFrom dplyr n_distinct summarise ungroup
#' @importFrom rlang .data
#'
#' @return Tibble with column containing total number of arrivals.
#' @export

calc_arrivals <- function(arrivals) {
  arrivals |>
    summarise(arrivals = n_distinct(.data[["name"]])) |>
    ungroup()
}


#' Calculate the mean wait time for each resource
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#'
#' @importFrom dplyr group_by summarise ungroup
#' @importFrom rlang .data
#' @importFrom tidyr drop_na pivot_wider
#' @importFrom tidyselect any_of
#'
#' @return Tibble with columns containing result for each resource.
#' @export

calc_mean_wait <- function(arrivals) {

  # Create subset of data that removes patients who were still waiting
  complete_arrivals <- drop_na(arrivals, any_of("wait_time"))

  # Calculate mean wait time for each resource
  complete_arrivals |>
    group_by(.data[["resource"]]) |>
    summarise(mean_wait_time = mean(.data[["wait_time"]])) |>
    pivot_wider(names_from = "resource",
                values_from = "mean_wait_time",
                names_glue = "mean_wait_time_{resource}") |>
    ungroup()
}


#' Calculate the mean length of time patients spent with each resource
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#' @param resources Dataframe with times patients use or queue for resources.
#'
#' @importFrom dplyr group_by summarise ungroup
#' @importFrom rlang .data
#' @importFrom tidyr drop_na pivot_wider
#' @importFrom tidyselect any_of
#'
#' @return Tibble with columns containing result for each resource.
#' @export

calc_mean_serve_length <- function(arrivals, resources) {

  # Create subset of data that removes patients who were still waiting
  complete_arrivals <- drop_na(arrivals, any_of("serve_length"))

  # Calculate mean serve time for each resource
  complete_arrivals |>
    group_by(.data[["resource"]]) |>
    summarise(mean_serve_time = mean(.data[["serve_length"]])) |>
    pivot_wider(names_from = "resource",
                values_from = "mean_serve_time",
                names_glue = "mean_time_with_{resource}") |>
    ungroup()
}


#' Calculate the resource utilisation
#'
#' Utilisation is given by the total effective usage time (`in_use`) over
#' the total time intervals considered (`dt`).
#'
#' Credit: The utilisation calculation is adapted from the
#' `plot.resources.utilization()` function in simmer.plot 0.1.18, which is
#' shared under an MIT Licence (Ucar I, Smeets B (2023). simmer.plot: Plotting
#' Methods for 'simmer'. https://r-simmer.org
#' https://github.com/r-simmer/simmer.plot.).
#'
#' @param resources Dataframe with times patients use or queue for resources.
#' @param groups Optional list of columns to group by for the calculation.
#' @param summarise If TRUE, return overall utilisation. If FALSE, just
#' return the resource dataframe with the additional columns
#' interval_duration, effective_capacity and utilisation.
#'
#' @importFrom dplyr across group_by mutate ungroup
#' @importFrom rlang .data
#' @importFrom tidyr pivot_wider
#' @importFrom tidyselect all_of
#'
#' @return Tibble with columns containing result for each resource.
#' @export

calc_utilisation <- function(resources, groups = NULL, summarise = TRUE) {

  # Create list of grouping variables (always "resource" but can add others)
  group_vars <- c("resource", groups)

  # Calculate utilisation
  util_df <- resources |>
    group_by(across(all_of(group_vars))) |>
    mutate(
      # Time between this row and the next
      interval_duration = lead(.data[["time"]]) - .data[["time"]],
      # Ensures effective capacity is never less than number of servers in
      # use (in case of situations where servers may exceed "capacity").
      effective_capacity = pmax(.data[["capacity"]], .data[["server"]]),
      # Divide number of servers in use by effective capacity
      # Set to NA if effective capacity is 0
      utilisation = ifelse(.data[["effective_capacity"]] > 0L,
                           .data[["server"]] / .data[["effective_capacity"]],
                           NA_real_)
    )

  # If summarise = TRUE, find total utilisation
  if (summarise) {
    util_df |>
      summarise(
        # Multiply each utilisation by its own unique duration. The total of
        # those is then divided by the total duration of all intervals.
        # Hence, we are calculated a time-weighted average utilisation.
        utilisation = (sum(.data[["utilisation"]] *
                             .data[["interval_duration"]], na.rm = TRUE) /
                         sum(.data[["interval_duration"]], na.rm = TRUE))
      ) |>
      pivot_wider(names_from = "resource",
                  values_from = "utilisation",
                  names_glue = "utilisation_{resource}") |>
      ungroup()
  } else {
    # If summarise = FALSE, just return util_df with no further processing
    ungroup(util_df)
  }
}


#' Calculate the time-weighted mean queue length.
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#'
#' @importFrom dplyr arrange group_by lead mutate summarise ungroup
#' @importFrom tidyr pivot_wider
#'
#' @return Tibble with column containing mean queue length.
#' @export

calc_mean_queue_length <- function(arrivals) {
  arrivals |>
    group_by(.data[["resource"]]) |>
    # Sort by arrival time
    arrange(.data[["start_time"]]) |>
    # Calculate time between this row and the next
    mutate(
      interval_duration = (lead(.data[["start_time"]]) - .data[["start_time"]])
    ) |>
    # Multiply each queue length by its own unique duration. The total of
    # those is then divided by the total duration of all intervals.
    # Hence, we are calculated a time-weighted average queue length.
    summarise(mean_queue_length = (
      sum(.data[["queue_on_arrival"]] *
            .data[["interval_duration"]], na.rm = TRUE) /
        sum(.data[["interval_duration"]], na.rm = TRUE)
    )
    ) |>
    # Reshape dataframe
    pivot_wider(names_from = "resource",
                values_from = "mean_queue_length",
                names_glue = "mean_queue_length_{resource}") |>
    ungroup()
}


#' Calculate the mean time in the system for finished patients.
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#'
#' @importFrom dplyr summarise ungroup
#'
#' @return Tibble with column containing the mean time in the system.
#' @export

calc_mean_time_in_system <- function(arrivals) {
  arrivals |>
    summarise(
      mean_time_in_system = mean(.data[["time_in_system"]], na.rm = TRUE)
    ) |>
    ungroup()
}


#' Calculate the time-weighted mean number of patients in the system.
#'
#' @param patient_count Dataframe with patient counts over time.
#'
#' @importFrom dplyr arrange lead mutate summarise ungroup
#'
#' @return Tibble with column containing mean number of patients in the system.
#' @export

calc_mean_patients_in_system <- function(patient_count) {
  patient_count |>
    # Sort by time
    arrange(.data[["time"]]) |>
    # Calculate time between this row and the next
    mutate(interval_duration = (lead(.data[["time"]]) - .data[["time"]])) |>
    # Multiply each patient count by its own unique duration. The total of
    # those is then divided by the total duration of all intervals.
    # Hence, we are calculated a time-weighted average patient count.
    summarise(
      mean_patients_in_system = (
        sum(.data[["count"]] * .data[["interval_duration"]], na.rm = TRUE) /
          sum(.data[["interval_duration"]], na.rm = TRUE)
      )
    ) |>
    ungroup()
}

Run results function

No changes required.

#' Get average results for the provided single run.
#'
#' @param results Named list with `arrivals` from `get_mon_arrivals()` and
#'   `resources` from `get_mon_resources()`
#'   (`per_resource = TRUE` and `ongoing = TRUE`).
#' @param run_number Integer index of current simulation run.
#'
#' @importFrom dplyr bind_cols
#' @importFrom tibble tibble
#'
#' @return Tibble with processed results from replication.
#' @export

get_run_results <- function(results, run_number) {
  metrics <- list(
    calc_arrivals(results[["arrivals"]]),
    calc_mean_wait(results[["arrivals"]]),
    calc_mean_serve_length(results[["arrivals"]]),
    calc_utilisation(results[["resources"]]),
    calc_mean_queue_length(results[["arrivals"]]),
    calc_mean_time_in_system(results[["arrivals"]]),
    calc_mean_patients_in_system(results[["patients_in_system"]])
  )
  dplyr::bind_cols(tibble(replication = run_number), metrics)
}

Model function

A new option (set_seed) controls whether to set a random seed based on the run_number. This is not optional as, for parallel reproducibility, we wil need to handle seed management using the runner rather than within model() itself.

#' Run simulation.
#'
#' @param param List. Model parameters.
#' @param run_number Numeric. Run number for random seed generation.
#' @param set_seed Boolean. Whether to set seed within the model function.
#'
#' @importFrom dplyr arrange bind_rows desc left_join mutate transmute
#' @importFrom rlang .data
#' @importFrom simmer add_generator add_resource get_mon_arrivals
#' @importFrom simmer get_attribute get_mon_resources now release run seize
#' @importFrom simmer set_attribute simmer timeout trajectory
#' @importFrom tidyr drop_na pivot_wider
#' @importFrom tidyselect all_of
#'
#' @return Named list with tables `arrivals`, `resources` and `run_results`.
#' @export

model <- function(param, run_number, set_seed = TRUE) {

  # Set random seed based on run number
  if (set_seed) {
    set.seed(run_number)
  }

  # Create simmer environment
  env <- simmer("simulation", verbose = param[["verbose"]])

  # Define the patient trajectory
  patient <- trajectory("consultation") |>
    # Record queue length on arrival 
    set_attribute("doctor_queue_on_arrival", 
                  function() get_queue_count(env, "doctor")) |> 
    seize("doctor", 1L) |>
    # Record time resource is obtained
    set_attribute("doctor_serve_start", function() now(env)) |>
    # Record sampled length of consultation 
    set_attribute("doctor_serve_length", function() { 
      rexp(n = 1L, rate = 1L / param[["consultation_time"]]) 
    }) |> 
    timeout(function() get_attribute(env, "doctor_serve_length")) |> 
    release("doctor", 1L)

  env <- env |>
    # Add doctor resource
    add_resource("doctor", param[["number_of_doctors"]]) |>
    # Add patient generator (mon=2 so can get manual attributes)
    add_generator("patient", patient, function() {
      rexp(n = 1L, rate = 1L / param[["interarrival_time"]])
    }, mon = 2L) |>
    # Run the simulation
    simmer::run(until = (param[["warm_up_period"]] +
                           param[["data_collection_period"]]))

  # Extract information on arrivals and resources from simmer environment
  result <- list(
    arrivals = get_mon_arrivals(env, per_resource = TRUE, ongoing = TRUE),
    resources = get_mon_resources(env)
  )

  # Get the extra arrivals attributes
  extra_attributes <- get_mon_attributes(env) |>
    dplyr::select("name", "key", "value") |>
    # Add column with resource name, and remove that from key
    mutate(resource = gsub("_.+", "", .data[["key"]]),
           key = gsub("^[^_]+_", "", .data[["key"]])) |>
    pivot_wider(names_from = "key", values_from = "value")

  # Merge extra attributes with the arrival data
  result[["arrivals"]] <- left_join(
    result[["arrivals"]], extra_attributes, by = c("name", "resource")
  )

  # Add time in system (unfinished patients will set to NaN) 
  result[["arrivals"]][["time_in_system"]] <- ( 
    result[["arrivals"]][["end_time"]] - result[["arrivals"]][["start_time"]] 
  )

  # Filter to remove results from the warm-up period
  result <- filter_warmup(result, param[["warm_up_period"]])

  # Gather all start and end times, with a row for each, marked +1 or -1 
  # Drop NA for end time, as those are patients who haven't left system 
  # at the end of the simulation 
  arrivals_start <- transmute( 
    result[["arrivals"]], time = .data[["start_time"]], change = 1L 
  ) 
  arrivals_end <- result[["arrivals"]] |> 
    drop_na(all_of("end_time")) |> 
    transmute(time = .data[["end_time"]], change = -1L) 
  events <- bind_rows(arrivals_start, arrivals_end) 

  # Determine the count of patients in the service with each entry/exit 
  result[["patients_in_system"]] <- events |> 
    # Sort events by time 
    arrange(.data[["time"]], desc(.data[["change"]])) |> 
    # Use cumulative sum to find number of patients in system at each time 
    mutate(count = cumsum(.data[["change"]])) |> 
    dplyr::select(c("time", "count")) 

  # Replace "replication" value with appropriate run number
  result[["arrivals"]] <- mutate(result[["arrivals"]],
                                 replication = run_number)
  result[["resources"]] <- mutate(result[["resources"]],
                                  replication = run_number)
  result[["patients_in_system"]] <- mutate(result[["patients_in_system"]], 
                                           replication = run_number) 

  # Calculate wait time for each patient
  result[["arrivals"]] <- result[["arrivals"]] |>
    mutate(wait_time = .data[["serve_start"]] - .data[["start_time"]])

  # Calculate the average results for that run
  result[["run_results"]] <- get_run_results(result, run_number)

  result
}

Runner function

The modified runner function now distinguishes between sequential and parallel execution modes using the cores attribute from parameters.

When working with future, it is recommended to set future.seed = seed. This creates independent random number streams for each worker and guarantees reproducible results across different platforms and cluster sizes.

We have provided the option of using the run number as a seed. This matches the logic in model(), allowing for direct comparison and debugging. However, this method does not comply with parallel RNG standards and can lead to correlated or overlapping streams, loss of reproducibility, or statistical artifacts if worker assignment or hardware changes. As stated in the future documentation: “Note that as.list(seq_along(x)) is not a valid set of such .Random.seed values.

For most cases, use the runner and set number_of_runs = 1 for single runs. Always use future-based seeding, regardless of whether executing sequentially or in parallel, for reliable results.

#' Run simulation for multiple replications.
#'
#' @param param Named list of model parameters.
#' @param use_future_seeding Logical. If TRUE, uses the recommended future
#' seeding (`future.seed = seed`), though results will not match `model()`.
#' If FALSE, seeds each run with its run number (like `model()`), though this
#' is discourage by future_lapply documentation.
#'
#' @importFrom future plan multisession sequential
#' @importFrom future.apply future_lapply
#' @importFrom dplyr bind_rows
#'
#' @return List with three tables: arrivals, resources and run_results.
#' @export

runner <- function(param, use_future_seeding = TRUE) {
  # Determine the parallel execution plan
  if (param[["cores"]] == 1L) {
    plan(sequential)  # Sequential execution
  } else {
    if (param[["cores"]] == -1L) {
      cores <- future::availableCores() - 1L
    } else {
      cores <- param[["cores"]]
    }
    plan(multisession, workers = cores)  # Parallel execution
  }

  # Set seed for future.seed
  if (isTRUE(use_future_seeding)) {
    # Recommended option. Base seed used when making others by future.seed
    custom_seed <- 123456L
  } else {
    # Not recommended (but will allow match to model())
    # Generates list of pre-generated seeds set to the run numbers (+ offset)
    create_seeds <- function(seed) {
      set.seed(seed)
      .Random.seed
    }
    custom_seed <- lapply(
      1L:param[["number_of_runs"]] + param[["seed_offset"]], 
      create_seeds
    )
  }

  # Run simulations (sequentially or in parallel)
  # Mark set_seed as FALSE as we handle this using future.seed(), rather than
  # within the function, and we don't want to override future.seed
  results <- future_lapply(
    1L:param[["number_of_runs"]], 
    function(i) {
      model(run_number = i, 
            param = param, 
            set_seed = FALSE)
    }, 
    future.seed = custom_seed
  )

  # If only one replication, return without changes
  if (param[["number_of_runs"]] == 1L) {
    return(results[[1L]])
  }

  # Combine results from all replications into unified tables
  all_arrivals <- do.call(
    rbind, lapply(results, function(x) x[["arrivals"]])
  )
  all_resources <- do.call(
    rbind, lapply(results, function(x) x[["resources"]])
  )

  # run_results may have missing data - use bind_rows to fill NAs automatically
  all_run_results <- dplyr::bind_rows(
    lapply(results, function(x) x[["run_results"]])
  )

  all_patients_in_system <- dplyr::bind_rows(
    lapply(results, function(x) x[["patients_in_system"]])
  )

  # Return a list containing the combined tables
  list(
    arrivals = all_arrivals,
    resources = all_resources,
    run_results = all_run_results,
    patients_in_system = all_patients_in_system
  )
}

Choosing an appropriate number of cores

If choosing to run your simulation in parallel, the maximum number of cores may not be the most efficient.

This is due to overhead in managing processes, passing data between them, and setting up the execution environment.

To determine how many to choose, try running your model with a varying number of cores and record how long each took. For example:

speed = []

# Run for 1 to 4 cores
for i in range(1, 5):
    # Start timer and run replications
    start_time = time.time()
    experiment = Runner(param = Parameters(cores=i))
    experiment.run_reps()
    # Find run time
    run_time = time.time() - start_time
    speed.append({"cores": i, "run_time": run_time})

# Convert times to dataframe
timing_results = pd.DataFrame(speed)
HTML(to_html_datatable((timing_results)))
Loading ITables v2.5.2 from the internet... (need help?)
# Plot time by number of cores
fig = px.line(timing_results, x="cores", y="run_time")
fig.update_layout(
    xaxis={"dtick": 1},
    xaxis_title="Number of cores",
    yaxis_title="Run time",
    template="plotly_white"
)
speed <- list()

# Run for 1 to 4 cores
for (i in 1L:4L){
  # Start timer and run replications
  start_time = Sys.time()
  runner(param = create_params(cores = i))
  # Find run time
  run_time = Sys.time() - start_time
  speed[[i]] <- list(cores = i, run_time = run_time)
}

# Convert to dataframe
speed_df = rbindlist(speed)
kable(speed_df)
cores run_time
1 0.3739345
2 1.3090684
3 1.5790362
4 1.8546638
p <- ggplot(speed_df, aes(x = .data[["cores"]], y = .data[["run_time"]])) +
  geom_line() +
  labs(x = "Cores", y = "Run time") +
  theme_minimal()
ggplotly(p)


With this example, the sequential (single-core) run is actually faster than the parallel versions. This happens because the overhead of creating and coordinating multiple processes outweighs the benefits for such a small simulation.

For larger-scale simulations, parallelisation will often bring performance improvements, but it is always worth running a quick benchmark to determine the most efficient number of cores for your case.

Explore the example models

Nurse visit simulation

GitHub Click to visit pydesrap_mms repository

Key files simulation/param.py
simulation/runner.py
notebooks/choosing_cores.ipynb
What to look for? The repository currently uses parallel processing, with the default set to use the maximum number of available cores (-1).
Why it matters? Although this model runs quickly for small examples, performance can vary when running many replications or larger experiments. For lightweight analyses, the difference may not exceed a minute, but for heavier workloads, choosing an appropriate number of cores can make a significant impact on overall efficiency.

GitHub Click to visit rdesrap_mms repository

Key files R/parameters.R
R/model.R
R/runner.R
rmarkdown/choosing_cores.Rmd
What to look for? The repository is set up to allow parallel processing, but currently defaults to running sequentially (cores = 1).
Why it matters? Although this model runs quickly for small examples, performance can vary when running many replications or larger experiments. For lightweight analyses, the difference may not exceed a minute, but for heavier workloads, choosing an appropriate number of cores can make a significant impact on overall efficiency.

Stroke pathway simulation

GitHub Click to visit pydesrap_stroke repository

Key files simulation/parameters.py
simulation/runner.py
What to look for? The repository is set up to allow parallel processing, but currently defaults to running sequentially (cores=1).
Why it matters? For this example the model is quick to run, so leaving it sequential keeps things simpler.

GitHub Click to visit rdesrap_stroke repository

Key files R/parameters.R
R/model.R
R/runner.R
What to look for? The repository is set up to allow parallel processing, but currently defaults to running sequentially (cores = 1).
Why it matters? For this example the model is quick to run, so leaving it sequential keeps things simpler.

Test yourself

Try adapting the model to use parallel processing and experiment with different numbers of cores:

  • Edit the parameters and model set up so parallelism is enabled.

  • Run the model using different values for the number of cores.

  • Record and plot the run times for each configuration to see how performance changes, and note whether there were any benefits to running in parallel.

  • When you increase the number of replications or the overall run time, do you notice greater benefits from parallel execution?