Page last modified

January 5, 2026

Learning objectives:

  • Understand how to write and run tests.
  • Explore examples of the types of test you can run for your simulation model.

Relevant reproducibility guidelines:

  • NHS Levels of RAP (πŸ₯ˆ): Pipeline includes a testing framework (unit tests, back tests).

Pre-reading:

This page will run tests on the model from the parallel processing page (likewise, also used on the scenario and sensitivity analysis page).

Entity generation β†’ Entity processing β†’ Initialisation bias β†’ Performance measures β†’ Replications β†’ Parallel processing β†’ Tests


Testing is the process of evaluating a model to ensure it works as expected, gives reliable results, and can handle different conditions. By systematically checking for errors, inconsistencies, or unexpected behaviors, testing helps improve the quality of a model, catch errors and prevent future issues. They can also be used to run test cases that check your results are consistent (i.e., that it is reproducible).

When you create a model, you will naturally carry out tests, with simple manual checks where you observe outputs and ensure they look right. These checks can be formalised and automated so that you can run them after any changes, and catch any issues that arise.

Writing tests is fundamental for model verification. These tests should be run regularly and after code changes, to check that these changes haven’t introduced new issues (β€œregression testing”).

Introduction to writing and running tests

Writing a basic test

A popular framework for testing in python is pytest.

Each test in pytest is a function that contains an assertion statement to check a condition (e.g., number > 0). If the condition fails, pytest will return an error message (e.g. β€œThe number should be positive”).

Test are typically stored in a folder called tests/, and filenames start with the prefix test_. This naming convention allows pytest to automatically discover and run all the tests in the folder.

Here’s an example of a simple test using pytest:

import pytest


def test_positive():
    """
    Confirm that the number is positive.
    """
    number = 5
    assert number > 0, "The number should be positive"

A popular framework for testing in R is testthat.

Each test uses the test_that() functions and is structured around expectations that check specific conditions (e.g., expect_true(), expect_false(), expect_equal(), expect_error() - see package index for more).

Tests are typically stored in a folder called tests/testthat, and filenames start with the prefix test_. This naming convention allows testthat to automatically discover and run all tests in the folder.

Here’s an example of a simple test using testthat:

library(testthat)
test_that("Confirm that the number is positive", {
  number <- 5L
  expect_gt(number, 0L)
})
Test passed πŸ₯‡

Running tests

Tests are typically run from the terminal. Commands include:

  • pytest - runs all tests.
  • pytest tests/filename.py - runs tests from a specific file.

When you run a test, you’ll see an output like this in the terminal:

NoteTest output:
============================= test session starts ==============================
platform linux -- Python 3.11.9, pytest-8.4.1, pluggy-1.6.0
rootdir: /__w/des_rap_book/des_rap_book/pages/guide/verification_validation
plugins: anyio-4.11.0, timeout-2.4.0
collected 1 item

tests_resources/test_example_simple.py .                                 [100%]

============================== 1 passed in 0.01s ===============================
<ExitCode.OK: 0>

Tests are typically run from the R console. Commands include:

  • testthat::test_dir() - runs all tests in current directory (and sub-directories).
  • testthat::test_file("tests/testthat/test-filename.R") - run tests from a specific file.
  • devtools::test() - runs all tests in a package.

When you run a test, you’ll see output in the console indicating whether tests passed or failed.

Parametrised tests

We can execute the same test with different parameters using pytest.mark.parametrize.

Here’s an example:

import pytest


@pytest.mark.parametrize("number", [1, 2, 3, -1])
def test_positive_param(number):
    """
    Confirm that the number is positive.

    Arguments:
        number (float):
            Number to check.
    """
    assert number > 0, f"The number {number} is not positive."

In this example, we’re testing the same logic with four different values: 1, 2, 3, and -1. The last value, -1, will cause the test to fail. The error message includes the failed value for easy debugging.

NoteTest output:
============================= test session starts ==============================
platform linux -- Python 3.11.9, pytest-8.4.1, pluggy-1.6.0
rootdir: /__w/des_rap_book/des_rap_book/pages/guide/verification_validation
plugins: anyio-4.11.0, timeout-2.4.0
collected 4 items

tests_resources/test_example_param.py ...F                               [100%]

=================================== FAILURES ===================================
___________________________ test_positive_param[-1] ____________________________

number = -1

    @pytest.mark.parametrize("number", [1, 2, 3, -1])
    def test_positive_param(number):
        """
        Confirm that the number is positive.
    
        Arguments:
            number (float):
                Number to check.
        """
>       assert number > 0, f"The number {number} is not positive."
E       AssertionError: The number -1 is not positive.
E       assert -1 > 0

tests_resources/test_example_param.py:13: AssertionError
=========================== short test summary info ============================
FAILED tests_resources/test_example_param.py::test_positive_param[-1] - AssertionError: The number -1 is not positive.
assert -1 > 0
========================= 1 failed, 3 passed in 0.04s ==========================
<ExitCode.TESTS_FAILED: 1>

We can execute the same test with different parameters using the patrick package.

Here’s an example:

library(patrick)
library(testthat)

with_parameters_test_that("Confirm that the number is positive", {
  expect_gt(number, 0L)
}, number = c(1L, 2L, 3L, -1L))

In this example, we’re testing the same logic with four different values: 1, 2, 3, and -1. The last value, -1, will cause the test to fail.


══ Testing test_example_param.R ════════════════════════════════════════════════

[ FAIL 0 | WARN 0 | SKIP 0 | PASS 0 ]
[ FAIL 0 | WARN 0 | SKIP 0 | PASS 1 ]
[ FAIL 0 | WARN 0 | SKIP 0 | PASS 2 ]
[ FAIL 0 | WARN 0 | SKIP 0 | PASS 3 ]
[ FAIL 1 | WARN 0 | SKIP 0 | PASS 3 ]

── Failure ('test_example_param.R:5:3'): Confirm that the number is positive number=-1 ──
`number` is not strictly more than 0L. Difference: -1
Backtrace:
    β–†
 1. β”œβ”€rlang::eval_tidy(code, args)
 2. └─testthat::expect_gt(number, 0L) at test_example_param.R:5:3

[ FAIL 1 | WARN 0 | SKIP 0 | PASS 3 ]

Testing the model

There are many different ways of categorising tests. We will focus on three types:

  • Functional testing
  • Unit testing
  • Back testing

As mentioned above, we are running these tests on the model from the parallel processing page (which is also the one used on the scenario and sensitivity analysis page).

from joblib import Parallel, delayed, cpu_count
import numpy as np
import pandas as pd
import scipy.stats as st
import simpy
from sim_tools.distributions import Exponential


class Parameters:
    """
    Parameter class.

    Attributes
    ----------
    interarrival_time : float
        Mean time between arrivals (minutes).
    consultation_time : float
        Mean length of doctor's consultation (minutes).
    number_of_doctors : int
        Number of doctors.
    warm_up_period : int
        Duration of the warm-up period (minutes).
    data_collection_period : int
        Duration of the data collection period (minutes).
    number_of_runs : int
        The number of runs (i.e., replications).
    cores : int
        Number of CPU cores to use for parallel execution. For all
        available cores, set to -1. For sequential execution, set to 1.
    verbose : bool
        Whether to print messages as simulation runs.
    """
    def __init__(
        self, interarrival_time=5, consultation_time=10, number_of_doctors=3,
        warm_up_period=3000, data_collection_period=20160,
        number_of_runs=15, cores=1, verbose=False
    ):
        """
        Initialise Parameters instance.

        Parameters
        ----------
        interarrival_time : float
            Time between arrivals (minutes).
        consultation_time : float
            Length of consultation (minutes).
        number_of_doctors : int
            Number of doctors.
        warm_up_period : int
            Duration of the warm-up period (minutes).
        data_collection_period : int
            Duration of the data collection period (minutes).
        number_of_runs : int
            The number of runs (i.e., replications).
        cores : int
            Number of CPU cores to use for parallel execution. For all
            available cores, set to -1. For sequential execution, set to 1.
        verbose : bool
            Whether to print messages as simulation runs.
        """
        self.interarrival_time = interarrival_time
        self.consultation_time = consultation_time
        self.number_of_doctors = number_of_doctors
        self.warm_up_period = warm_up_period
        self.data_collection_period = data_collection_period
        self.number_of_runs = number_of_runs
        self.cores = cores
        self.verbose = verbose


class Patient:
    """
    Represents a patient.

    Attributes
    ----------
    patient_id : int
        Unique patient identifier.
    period : str
        Arrival period (warm up or data collection) with emoji.
    arrival_time : float
        Time patient entered the system (minutes).
    wait_time : float
        Time spent waiting for the doctor (minutes).
    time_with_doctor : float
        Time spent in consultation with a doctor (minutes).
    end_time : float
        Time that patient leaves (minutes), or NaN if remain in system.
    """
    def __init__(self, patient_id, period, arrival_time):
        """
        Initialises a new patient.

        Parameters
        ----------
        patient_id : int
            Unique patient identifier.
        period : str
            Arrival period (warm up or data collection) with emoji.
        arrival_time : float
            Time patient entered the system (minutes).
        """
        self.patient_id = patient_id
        self.period = period
        self.arrival_time = arrival_time
        self.wait_time = np.nan
        self.time_with_doctor = np.nan
        self.end_time = np.nan


class MonitoredResource(simpy.Resource):
    """
    SimPy Resource subclass that tracks utilisation, queue length and mean
    number of patients in the system.

    Attributes
    ----------
    time_last_event : list
        Time of the most recent resource request or release event.
    area_resource_busy : list
        Cumulative time resources were busy, for computing utilisation.
    area_n_in_queue : list
        Cumulative time patients spent queueing for a resource, for
        computing average queue length.

    Notes
    -----
    Class adapted from Monks, Harper and Heather 2025.
    """

    def __init__(self, *args, **kwargs):
        """
        Initialise the MonitoredResource, resetting monitoring statistics.

        Parameters
        ----------
        *args :
            Positional arguments to be passed to the parent class.
        **kwargs :
            Keyword arguments to be passed to the parent class.
        """
        # Initialise a SimPy Resource
        super().__init__(*args, **kwargs)
        # Run the init_results() method
        self.init_results()

    def init_results(self):
        """
        Reset monitoring attributes.
        """
        self.time_last_event = [self._env.now]
        self.area_resource_busy = [0.0]
        self.area_n_in_queue = [0.0]

    def request(self, *args, **kwargs):
        """
        Update time-weighted statistics before requesting a resource.

        Parameters
        ----------
        *args :
            Positional arguments to be passed to the parent class.
        **kwargs :
            Keyword arguments to be passed to the parent class.

        Returns
        -------
        simpy.events.Event
            Event for the resource request.
        """
        # Update time-weighted statistics
        self.update_time_weighted_stats()
        # Request a resource
        return super().request(*args, **kwargs)

    def release(self, *args, **kwargs):
        """
        Update time-weighted statistics before releasing a resource.

        Parameters
        ----------
        *args :
            Positional arguments to be passed to the parent class.
        **kwargs :
            Keyword arguments to be passed to the parent class.

        Returns
        -------
        simpy.events.Event
            Event for the resource release.
        """
        # Update time-weighted statistics
        self.update_time_weighted_stats()
        # Release a resource
        return super().release(*args, **kwargs)

    def update_time_weighted_stats(self):
        """
        Update the time-weighted statistics for resource usage.

        Notes
        -----
        - These sums can be referred to as "the area under the curve".
        - They are called "time-weighted" statistics as they account for how
          long certain events or states persist over time.
        """
        # Calculate time since last event
        time_since_last_event = self._env.now - self.time_last_event[-1]

        # Add record of current time
        self.time_last_event.append(self._env.now)

        # Add "area under curve" of resources in use
        # self.count is the number of resources in use
        self.area_resource_busy.append(self.count * time_since_last_event)

        # Add "area under curve" of people in queue
        # len(self.queue) is the number of requests queued
        self.area_n_in_queue.append(len(self.queue) * time_since_last_event)


class Model:
    """
    Simulation model.

    Attributes
    ----------
    param : Parameters
        Simulation parameters.
    run_number : int
        Run number for random seed generation.
    env : simpy.Environment
        The SimPy environment for the simulation.
    doctor : simpy.Resource
        SimPy resource representing doctors.
    patients : list
        List of Patient objects.
    results_list : list
        List of dictionaries with the attributes of each patient.
    area_n_in_system : list of float
        List containing incremental area contributions used for
        time-weighted statistics of the number of patients in the system.
    time_last_n_in_system : float
        Simulation time at last update of the number-in-system statistic.
    n_in_system: int
        Current number of patients present in the system, including
        waiting and being served.
    arrival_dist : Exponential
        Distribution used to generate random patient inter-arrival times.
    consult_dist : Exponential
        Distribution used to generate length of a doctor's consultation.
    """
    def __init__(self, param, run_number):
        """
        Create a new Model instance.

        Parameters
        ----------
        param : Parameters
            Simulation parameters.
        run_number : int
            Run number for random seed generation.
        """
        self.param = param
        self.run_number = run_number

        # Create SimPy environment
        self.env = simpy.Environment()

        # Create resource
        self.doctor = MonitoredResource(
            self.env, capacity=self.param.number_of_doctors
        )

        # Create a random seed sequence based on the run number
        ss = np.random.SeedSequence(self.run_number)
        seeds = ss.spawn(2)

        # Set up attributes to store results
        self.patients = []
        self.results_list = []
        self.area_n_in_system = [0]
        self.time_last_n_in_system = self.env.now
        self.n_in_system = 0

        # Initialise distributions
        self.arrival_dist = Exponential(mean=self.param.interarrival_time,
                                        random_seed=seeds[0])
        self.consult_dist = Exponential(mean=self.param.consultation_time,
                                        random_seed=seeds[1])

    def update_n_in_system(self, inc):
        """
        Update the time-weighted statistics for number of patients in system.

        Parameters
        ----------
        inc : int
            Change in the number of patients (+1, 0, -1).
        """
        # Compute time since last event and calculate area under curve for that
        duration = self.env.now - self.time_last_n_in_system
        self.area_n_in_system.append(self.n_in_system * duration)
        # Update time and n in system
        self.time_last_n_in_system = self.env.now
        self.n_in_system += inc

    def generate_arrivals(self):
        """
        Process that generates patient arrivals.
        """
        while True:
            # Sample and pass time to next arrival
            sampled_iat = self.arrival_dist.sample()
            yield self.env.timeout(sampled_iat)

            # Check whether arrived during warm-up or data collection
            if self.env.now < self.param.warm_up_period:
                period = "\U0001F538 WU"
            else:
                period = "\U0001F539 DC"

            # Create a new patient
            patient = Patient(patient_id=len(self.patients)+1,
                              period=period,
                              arrival_time=self.env.now)
            self.patients.append(patient)

            # Update the number in the system
            self.update_n_in_system(inc=1)

            # Print arrival time
            if self.param.verbose:
                print(f"{patient.period} Patient {patient.patient_id} " +
                      f"arrives at time: {patient.arrival_time:.3f}")

            # Start process of consultation
            self.env.process(self.consultation(patient))

    def consultation(self, patient):
        """
        Process that simulates a consultation.

        Parameters
        ----------
        patient :
            Instance of the Patient() class representing a single patient.
        """
        start_wait = self.env.now
        # Patient requests access to a doctor (resource)
        with self.doctor.request() as req:
            yield req

            # Record how long patient waited before consultation
            patient.wait_time = self.env.now - start_wait

            if self.param.verbose:
                print(f"{patient.period} Patient {patient.patient_id} " +
                      f"starts consultation at: {self.env.now:.3f}")

            # Sample consultation duration and pass time spent with doctor
            patient.time_with_doctor = self.consult_dist.sample()
            yield self.env.timeout(patient.time_with_doctor)

            # Update number in system
            self.update_n_in_system(inc=-1)

            # Record end time
            patient.end_time = self.env.now
            if self.param.verbose:
                print(f"{patient.period} Patient {patient.patient_id} " +
                      f"leaves at: {patient.end_time:.3f}")

    def reset_results(self):
        """
        Reset results.
        """
        self.patients = []
        self.doctor.init_results()
        # For number in system, we reset area and time but not the count, as
        # it should include any remaining warm-up patients in the count
        self.area_n_in_system = [0]
        self.time_last_n_in_system = self.env.now

    def warmup(self):
        """
        Reset result collection after the warm-up period.
        """
        if self.param.warm_up_period > 0:
            # Delay process until warm-up period has completed
            yield self.env.timeout(self.param.warm_up_period)
            # Reset results variables
            self.reset_results()
            if self.param.verbose:
                print(f"Warm up period ended at time: {self.env.now}")

    def run(self):
        """
        Run the simulation.
        """
        # Schedule arrival generator and warm-up
        self.env.process(self.generate_arrivals())
        self.env.process(self.warmup())

        # Run the simulation
        self.env.run(until=(self.param.warm_up_period +
                            self.param.data_collection_period))

        # At simulation end, update time-weighted statistics by accounting
        # for the time from the last event up to the simulation finish.
        self.doctor.update_time_weighted_stats()

        # Run final calculation of number in system
        self.update_n_in_system(inc=0)

        # Create list of dictionaries containing each patient's attributes
        self.results_list = [x.__dict__ for x in self.patients]


def summary_stats(data):
    """
    Calculate mean, standard deviation and 95% confidence interval (CI).

    Parameters
    ----------
    data : pd.Series
        Data to use in calculation.

    Returns
    -------
    tuple
        (mean, standard deviation, CI lower, CI upper).
    """
    # Remove any NaN from the series
    data = data.dropna()

    # Find number of observations
    count = len(data)

    # If there are no observations, then set all to NaN
    if count == 0:
        mean, std_dev, ci_lower, ci_upper = np.nan, np.nan, np.nan, np.nan
    # If there is only one or two observations, can do mean but not others
    elif count < 3:
        mean = data.mean()
        std_dev, ci_lower, ci_upper = np.nan, np.nan, np.nan
    # With more than one observation, can calculate all...
    else:
        mean = data.mean()
        std_dev = data.std()
        # Special case for CI if variance is 0
        if np.var(data) == 0:
            ci_lower, ci_upper = mean, mean
        else:
            # Calculation of CI uses t-distribution, which is suitable for
            # smaller sample sizes (n<30)
            ci_lower, ci_upper = st.t.interval(
                confidence=0.95,
                df=count-1,
                loc=mean,
                scale=st.sem(data))
    return mean, std_dev, ci_lower, ci_upper


class Runner:
    """
    Run the simulation.

    Attributes
    ----------
    param : Parameters
        Simulation parameters.
    """
    def __init__(self, param):
        """
        Initialise a new instance of the Runner class.

        Parameters
        ----------
        param : Parameters
            Simulation parameters.
        """
        self.param = param

    def run_single(self, run):
        """
        Runs the simulation once and performs results calculations.

        Parameters
        ----------
        run : int
            Run number for the simulation.

        Returns
        -------
        dict
            Contains patient-level results and results from each run.
        """
        model = Model(param=self.param, run_number=run)
        model.run()

        # Patient results
        patient_results = pd.DataFrame(model.results_list)
        patient_results["run"] = run
        patient_results["time_in_system"] = (
            patient_results["end_time"] - patient_results["arrival_time"]
        )
        # For each patient, if they haven't seen a doctor yet, calculate
        # their wait as current time minus arrival, else set as missing
        patient_results["unseen_wait_time"] = np.where(
            patient_results["time_with_doctor"].isna(),
            model.env.now - patient_results["arrival_time"], np.nan
        )

        # Run results
        run_results = {
            "run_number": run,
            "arrivals": len(patient_results),
            "mean_wait_time": patient_results["wait_time"].mean(),
            "mean_time_with_doctor": (
                patient_results["time_with_doctor"].mean()
            ),
            "mean_utilisation_tw": (
                sum(model.doctor.area_resource_busy) / (
                    self.param.number_of_doctors *
                    self.param.data_collection_period
                )
            ),
            "mean_queue_length": (
                sum(model.doctor.area_n_in_queue) /
                self.param.data_collection_period
            ),
            "mean_time_in_system": patient_results["time_in_system"].mean(),
            "mean_patients_in_system": (
                sum(model.area_n_in_system) /
                self.param.data_collection_period
            ),
            "unseen_count": patient_results["time_with_doctor"].isna().sum(),
            "unseen_wait_time": patient_results["unseen_wait_time"].mean()
        }

        return {
            "patient": patient_results,
            "run": run_results
        }

    def run_reps(self):
        """
        Execute a single model configuration for multiple runs.

        Returns
        -------
        dict
            Contains patient-level results, results from each run and
            overall results.
        """
        # Sequential execution
        if self.param.cores == 1:
            all_results = [self.run_single(run)
                           for run in range(self.param.number_of_runs)]
        # Parallel execution
        else:
            # Check number of cores is valid
            valid_cores = [-1] + list(range(1, cpu_count()))
            if self.param.cores not in valid_cores:
                raise ValueError(
                    f"Invalid cores: {self.param.cores}. Must be one of: " +
                    f"{valid_cores}."
                )
            # Execute replications
            all_results = Parallel(n_jobs=self.param.cores)(
                delayed(self.run_single)(run)
                for run in range(self.param.number_of_runs)
            )

        # Separate results from each run into appropriate lists
        patient_results_list = [result["patient"] for result in all_results]
        run_results_list = [result["run"] for result in all_results]

        # Convert lists into dataframes
        patient_results_df = pd.concat(
            patient_results_list, ignore_index=True
        )
        run_results_df = pd.DataFrame(run_results_list)

        # Calculate average results and uncertainty from across all runs
        uncertainty_metrics = {}
        run_col = run_results_df.columns

        # Loop through the run performance measure columns
        # Calculate mean, standard deviation and 95% confidence interval
        for col in run_col[~run_col.isin(["run_number", "scenario"])]:
            uncertainty_metrics[col] = dict(zip(
                ["mean", "std_dev", "lower_95_ci", "upper_95_ci"],
                summary_stats(run_results_df[col])
            ))
        # Convert to dataframe
        overall_results_df = pd.DataFrame(uncertainty_metrics)

        return {
            "patient": patient_results_df,
            "run": run_results_df,
            "overall": overall_results_df
        }
library(dplyr)
library(future)
library(future.apply)
library(simmer)
library(tidyr)

#' Generate parameter list.
#'
#' @param interarrival_time Numeric. Time between arrivals (minutes).
#' @param consultation_time Numeric. Mean length of doctor's
#'   consultation (minutes).
#' @param number_of_doctors Numeric. Number of doctors.
#' @param warm_up_period Numeric. Duration of the warm-up period (minutes).
#' @param data_collection_period Numeric. Duration of the data collection
#'   period (minutes).
#' @param number_of_runs Numeric. The number of runs (i.e., replications).
#' @param cores Numeric. Number of CPU cores to use for parallel execution.
#' @param verbose Boolean. Whether to print messages as simulation runs.
#'
#' @return A named list of parameters.
#' @export

create_params <- function(
  interarrival_time = 5L,
  consultation_time = 10L,
  number_of_doctors = 3L,
  warm_up_period = 30L,
  data_collection_period = 40L,
  number_of_runs = 5L,
  cores = 1L,
  verbose = FALSE
) {
  list(
    interarrival_time = interarrival_time,
    consultation_time = consultation_time,
    number_of_doctors = number_of_doctors,
    warm_up_period = warm_up_period,
    data_collection_period = data_collection_period,
    number_of_runs = number_of_runs,
    cores = cores,
    verbose = verbose
  )
}


#' Filters arrivals and resources to remove warm-up results.
#'
#' @param result Named list with two tables: monitored arrivals & resources.
#' @param warm_up_period Length of warm-up period.
#'
#' @importFrom dplyr arrange filter group_by mutate n slice ungroup
#' @importFrom rlang .data
#'
#' @return The named list `result`, but with the tables (`arrivals` and
#' `resources`) filtered to remove warm-up results.
#' @export

filter_warmup <- function(result, warm_up_period) {
  # Skip filtering if the warm-up period is zero
  if (warm_up_period == 0L) return(result)

  # Arrivals: Keep only patients who came during the data collection period
  result[["arrivals"]] <- result[["arrivals"]] |>
    group_by(.data[["name"]]) |>
    filter(all(.data[["start_time"]] >= warm_up_period)) |>
    ungroup()

  # Resources: Filter to resource events in the data collection period
  dc_resources <- filter(result[["resources"]],
                         .data[["time"]] >= warm_up_period)

  if (nrow(dc_resources) > 0L) {

    # For each resource, get the last even during warm-up, and replace time
    # for that event with the start time of the data collection period
    last_usage <- result[["resources"]] |>
      filter(.data[["time"]] < warm_up_period) |>
      arrange(.data[["time"]]) |>
      group_by(.data[["resource"]]) |>
      slice(n()) |>
      mutate(time = warm_up_period) |>
      ungroup()

    # Set that last event of the resource/s as the first row of the dataframe
    # else calculations would assume all resources were idle at the start of
    # the data collection period
    result[["resources"]] <- rbind(last_usage, dc_resources)

  } else {
    # No events after warm-up; use filtered resource table as is
    result[["resources"]] <- dc_resources
  }

  result
}


#' Calculate the number of arrivals
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#'
#' @importFrom dplyr n_distinct summarise ungroup
#' @importFrom rlang .data
#'
#' @return Tibble with column containing total number of arrivals.
#' @export

calc_arrivals <- function(arrivals) {
  arrivals |>
    summarise(arrivals = n_distinct(.data[["name"]])) |>
    ungroup()
}


#' Calculate the mean wait time for each resource
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#'
#' @importFrom dplyr group_by summarise ungroup
#' @importFrom rlang .data
#' @importFrom tidyr drop_na pivot_wider
#' @importFrom tidyselect any_of
#'
#' @return Tibble with columns containing result for each resource.
#' @export

calc_mean_wait <- function(arrivals) {

  # Create subset of data that removes patients who were still waiting
  complete_arrivals <- drop_na(arrivals, any_of("wait_time"))

  # Calculate mean wait time for each resource
  complete_arrivals |>
    group_by(.data[["resource"]]) |>
    summarise(mean_wait_time = mean(.data[["wait_time"]])) |>
    pivot_wider(names_from = "resource",
                values_from = "mean_wait_time",
                names_glue = "mean_wait_time_{resource}") |>
    ungroup()
}


#' Calculate the mean length of time patients spent with each resource
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#' @param resources Dataframe with times patients use or queue for resources.
#'
#' @importFrom dplyr group_by summarise ungroup
#' @importFrom rlang .data
#' @importFrom tidyr drop_na pivot_wider
#' @importFrom tidyselect any_of
#'
#' @return Tibble with columns containing result for each resource.
#' @export

calc_mean_serve_length <- function(arrivals, resources) {

  # Create subset of data that removes patients who were still waiting
  complete_arrivals <- drop_na(arrivals, any_of("serve_length"))

  # Calculate mean serve time for each resource
  complete_arrivals |>
    group_by(.data[["resource"]]) |>
    summarise(mean_serve_time = mean(.data[["serve_length"]])) |>
    pivot_wider(names_from = "resource",
                values_from = "mean_serve_time",
                names_glue = "mean_time_with_{resource}") |>
    ungroup()
}


#' Calculate the resource utilisation
#'
#' Utilisation is given by the total effective usage time (`in_use`) over
#' the total time intervals considered (`dt`).
#'
#' Credit: The utilisation calculation is adapted from the
#' `plot.resources.utilization()` function in simmer.plot 0.1.18, which is
#' shared under an MIT Licence (Ucar I, Smeets B (2023). simmer.plot: Plotting
#' Methods for 'simmer'. https://r-simmer.org
#' https://github.com/r-simmer/simmer.plot.).
#'
#' @param resources Dataframe with times patients use or queue for resources.
#' @param simulation_end_time Time at end of simulation run.
#' @param groups Optional list of columns to group by for the calculation.
#' @param summarise If TRUE, return overall utilisation. If FALSE, just return
#' the resource dataframe with the additional columns interval_duration,
#' effective_capacity and utilisation.
#'
#' @importFrom dplyr across group_by mutate n row_number summarise ungroup
#' @importFrom rlang .data
#' @importFrom tidyr pivot_wider
#' @importFrom tidyselect all_of
#'
#' @return Tibble with columns containing result for each resource.
#' @export

calc_utilisation <- function(
  resources, simulation_end_time, groups = NULL, summarise = TRUE
) {

  # Create list of grouping variables (always "resource" but can add others)
  group_vars <- c("resource", groups)

  # Calculate utilisation
  util_df <- resources |>
    group_by(across(all_of(group_vars))) |>
    mutate(
      # Calculate time between this row and the next. For final row,
      # lead(time) returns NA, so use simulation_end_time - time
      interval_duration = ifelse(
        row_number() == n(),
        simulation_end_time - .data[["time"]],
        lead(.data[["time"]]) - .data[["time"]]
      ),
      # Ensures effective capacity is never less than number of servers in
      # use (in case of situations where servers may exceed "capacity").
      effective_capacity = pmax(.data[["capacity"]], .data[["server"]]),
      # Divide number of servers in use by effective capacity
      # Set to NA if effective capacity is 0
      utilisation = ifelse(.data[["effective_capacity"]] > 0L,
                           .data[["server"]] / .data[["effective_capacity"]],
                           NA_real_)
    )

  # If summarise = TRUE, find total utilisation
  if (summarise) {
    util_df |>
      summarise(
        # Multiply each utilisation by its own unique duration. The total of
        # those is then divided by the total duration of all intervals.
        # Hence, we are calculated a time-weighted average utilisation.
        utilisation = (sum(.data[["utilisation"]] *
                             .data[["interval_duration"]], na.rm = TRUE) /
                         sum(.data[["interval_duration"]], na.rm = TRUE))
      ) |>
      pivot_wider(names_from = "resource",
                  values_from = "utilisation",
                  names_glue = "utilisation_{resource}") |>
      ungroup()
  } else {
    # If summarise = FALSE, just return the util_df with no further processing
    ungroup(util_df)
  }
}


#' Calculate the time-weighted mean queue length.
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#' @param simulation_end_time Time at end of simulation run.
#'
#' @importFrom dplyr arrange group_by lead mutate n row_number summarise
#' @importFrom dplyr ungroup
#' @importFrom tidyr pivot_wider
#'
#' @return Tibble with column containing mean queue length.
#' @export

calc_mean_queue_length <- function(arrivals, simulation_end_time) {
  arrivals |>
    group_by(.data[["resource"]]) |>
    # Sort by arrival time
    arrange(.data[["start_time"]]) |>
    # Calculate time between this row and the next. For final row,
    # lead(start_time) returns NA, so use simulation_end_time - start_time
    mutate(
      interval_duration = ifelse(
        row_number() == n(),
        simulation_end_time - .data[["start_time"]],
        lead(.data[["start_time"]]) - .data[["start_time"]]
      )
    ) |>
    # Multiply each queue length by its own unique duration. The total of
    # those is then divided by the total duration of all intervals.
    # Hence, we are calculated a time-weighted average queue length.
    summarise(mean_queue_length = (
      sum(.data[["queue_on_arrival"]] *
            .data[["interval_duration"]], na.rm = TRUE) /
        sum(.data[["interval_duration"]], na.rm = TRUE)
    )
    ) |>
    # Reshape dataframe
    pivot_wider(names_from = "resource",
                values_from = "mean_queue_length",
                names_glue = "mean_queue_length_{resource}") |>
    ungroup()
}


#' Calculate the mean time in the system for finished patients.
#'
#' @param arrivals Dataframe with times for each patient with each resource.
#'
#' @importFrom dplyr summarise ungroup
#'
#' @return Tibble with column containing the mean time in the system.
#' @export

calc_mean_time_in_system <- function(arrivals) {
  arrivals |>
    summarise(
      mean_time_in_system = mean(.data[["time_in_system"]], na.rm = TRUE)
    ) |>
    ungroup()
}


#' Calculate the time-weighted mean number of patients in the system.
#'
#' @param patient_count Dataframe with patient counts over time.
#' @param simulation_end_time Time at end of simulation run.
#'
#' @importFrom dplyr arrange lead mutate n row_number summarise ungroup
#'
#' @return Tibble with column containing mean number of patients in the system.
#' @export

calc_mean_patients_in_system <- function(patient_count, simulation_end_time) {
  patient_count |>
    # Sort by time
    arrange(.data[["time"]]) |>
    # Calculate time between this row and the next. For final row,
    # lead(time) returns NA, so use simulation_end_time - time
    mutate(
      interval_duration = ifelse(
        row_number() == n(),
        simulation_end_time - .data[["time"]],
        lead(.data[["time"]]) - .data[["time"]]
      )
    ) |>
    # Multiply each patient count by its own unique duration. The total of
    # those is then divided by the total duration of all intervals.
    # Hence, we are calculated a time-weighted average patient count.
    summarise(
      mean_patients_in_system = (
        sum(.data[["count"]] * .data[["interval_duration"]], na.rm = TRUE) /
          sum(.data[["interval_duration"]], na.rm = TRUE)
      )
    ) |>
    ungroup()
}


#' Get average results for the provided single run.
#'
#' @param results Named list with `arrivals` from `get_mon_arrivals()` and
#'   `resources` from `get_mon_resources()`
#'   (`per_resource = TRUE` and `ongoing = TRUE`).
#' @param run_number Integer index of current simulation run.
#' @param simulation_end_time Time at end of simulation run.
#'
#' @importFrom dplyr bind_cols
#' @importFrom tibble tibble
#'
#' @return Tibble with processed results from replication.
#' @export

get_run_results <- function(results, run_number, simulation_end_time) {
  metrics <- list(
    calc_arrivals(results[["arrivals"]]),
    calc_mean_wait(results[["arrivals"]]),
    calc_mean_serve_length(results[["arrivals"]]),
    calc_utilisation(results[["resources"]], simulation_end_time),
    calc_mean_queue_length(results[["arrivals"]], simulation_end_time),
    calc_mean_time_in_system(results[["arrivals"]]),
    calc_mean_patients_in_system(results[["patients_in_system"]],
                                 simulation_end_time)
  )
  dplyr::bind_cols(tibble(replication = run_number), metrics)
}


#' Run simulation.
#'
#' @param param List. Model parameters.
#' @param run_number Numeric. Run number for random seed generation.
#' @param set_seed Boolean. Whether to set seed within the model function.
#'
#' @importFrom dplyr arrange bind_rows desc left_join mutate transmute
#' @importFrom rlang .data
#' @importFrom simmer add_generator add_resource get_mon_arrivals
#' @importFrom simmer get_attribute get_mon_resources now release run seize
#' @importFrom simmer set_attribute simmer timeout trajectory
#' @importFrom tidyr drop_na pivot_wider
#' @importFrom tidyselect all_of
#'
#' @return Named list with tables `arrivals`, `resources` and `run_results`.
#' @export

model <- function(param, run_number, set_seed = TRUE) {

  # Set random seed based on run number
  if (set_seed) {
    set.seed(run_number)
  }

  # Create simmer environment
  env <- simmer("simulation", verbose = param[["verbose"]])

  # Calculate full run length
  full_run_length <- (param[["warm_up_period"]] +
                        param[["data_collection_period"]])

  # Define the patient trajectory
  patient <- trajectory("consultation") |>
    # Record queue length on arrival
    set_attribute("doctor_queue_on_arrival",
                  function() get_queue_count(env, "doctor")) |>
    seize("doctor", 1L) |>
    # Record time resource is obtained
    set_attribute("doctor_serve_start", function() now(env)) |>
    # Record sampled length of consultation
    set_attribute("doctor_serve_length", function() {
      rexp(n = 1L, rate = 1L / param[["consultation_time"]])
    }) |>
    timeout(function() get_attribute(env, "doctor_serve_length")) |>
    release("doctor", 1L)

  env <- env |>
    # Add doctor resource
    add_resource("doctor", param[["number_of_doctors"]]) |>
    # Add patient generator (mon=2 so can get manual attributes)
    add_generator("patient", patient, function() {
      rexp(n = 1L, rate = 1L / param[["interarrival_time"]])
    }, mon = 2L) |>
    # Run the simulation
    simmer::run(until = full_run_length)

  # Extract information on arrivals and resources from simmer environment
  result <- list(
    arrivals = get_mon_arrivals(env, per_resource = TRUE, ongoing = TRUE),
    resources = get_mon_resources(env)
  )

  # Get the extra arrivals attributes
  extra_attributes <- get_mon_attributes(env) |>
    dplyr::select("name", "key", "value") |>
    # Add column with resource name, and remove that from key
    mutate(resource = gsub("_.+", "", .data[["key"]]),
           key = gsub("^[^_]+_", "", .data[["key"]])) |>
    pivot_wider(names_from = "key", values_from = "value")

  # Merge extra attributes with the arrival data
  result[["arrivals"]] <- left_join(
    result[["arrivals"]], extra_attributes, by = c("name", "resource")
  )

  # Add time in system (unfinished patients will set to NaN)
  result[["arrivals"]][["time_in_system"]] <- (
    result[["arrivals"]][["end_time"]] - result[["arrivals"]][["start_time"]]
  )

  # Filter to remove results from the warm-up period
  result <- filter_warmup(result, param[["warm_up_period"]])

  # Gather all start and end times, with a row for each, marked +1 or -1
  # Drop NA for end time, as those are patients who haven't left system
  # at the end of the simulation
  arrivals_start <- transmute(
    result[["arrivals"]], time = .data[["start_time"]], change = 1L
  )
  arrivals_end <- result[["arrivals"]] |>
    drop_na(all_of("end_time")) |>
    transmute(time = .data[["end_time"]], change = -1L)
  events <- bind_rows(arrivals_start, arrivals_end)

  # Determine the count of patients in the service with each entry/exit
  result[["patients_in_system"]] <- events |>
    # Sort events by time
    arrange(.data[["time"]], desc(.data[["change"]])) |>
    # Use cumulative sum to find number of patients in system at each time
    mutate(count = cumsum(.data[["change"]])) |>
    dplyr::select(c("time", "count"))

  # Replace "replication" value with appropriate run number
  result[["arrivals"]] <- mutate(result[["arrivals"]],
                                 replication = run_number)
  result[["resources"]] <- mutate(result[["resources"]],
                                  replication = run_number)
  result[["patients_in_system"]] <- mutate(result[["patients_in_system"]],
                                           replication = run_number)

  # Calculate wait time for each patient
  result[["arrivals"]] <- mutate(
    result[["arrivals"]],
    wait_time = .data[["serve_start"]] - .data[["start_time"]]
  )

  # Calculate the average results for that run
  result[["run_results"]] <- get_run_results(
    result, run_number, simulation_end_time = full_run_length
  )

  result
}


#' Run simulation for multiple replications.
#'
#' @param param Named list of model parameters.
#' @param use_future_seeding Logical. If TRUE, uses the recommended future
#' seeding (`future.seed = seed`), though results will not match `model()`.
#' If FALSE, seeds each run with its run number (like `model()`), though this
#' is discourage by future_lapply documentation.
#'
#' @importFrom future plan multisession sequential
#' @importFrom future.apply future_lapply
#' @importFrom dplyr bind_rows
#'
#' @return List with three tables: arrivals, resources and run_results.
#' @export

runner <- function(param, use_future_seeding = TRUE) {
  # Determine the parallel execution plan
  if (param[["cores"]] == 1L) {
    plan(sequential)  # Sequential execution
  } else {
    if (param[["cores"]] == -1L) {
      cores <- future::availableCores() - 1L
    } else {
      cores <- param[["cores"]]
    }
    plan(multisession, workers = cores)  # Parallel execution
  }

  # Set seed for future.seed
  if (isTRUE(use_future_seeding)) {
    # Recommended option. Base seed used when making others by future.seed
    custom_seed <- 123456L
  } else {
    # Not recommended (but will allow match to model())
    # Generates list of pre-generated seeds set to the run numbers (+ offset)
    create_seeds <- function(seed) {
      set.seed(seed)
      .Random.seed
    }
    custom_seed <- lapply(
      1L:param[["number_of_runs"]] + param[["seed_offset"]],
      create_seeds
    )
  }

  # Run simulations (sequentially or in parallel)
  # Mark set_seed as FALSE as we handle this using future.seed(), rather than
  # within the function, and we don't want to override future.seed
  results <- future_lapply(
    1L:param[["number_of_runs"]],
    function(i) {
      model(run_number = i,
            param = param,
            set_seed = FALSE)
    },
    future.seed = custom_seed
  )

  # If only one replication, return without changes
  if (param[["number_of_runs"]] == 1L) {
    return(results[[1L]])
  }

  # Combine results from all replications into unified tables
  all_arrivals <- do.call(
    rbind, lapply(results, function(x) x[["arrivals"]])
  )
  all_resources <- do.call(
    rbind, lapply(results, function(x) x[["resources"]])
  )

  # run_results may have missing data - use bind_rows to fill NAs automatically
  all_run_results <- dplyr::bind_rows(
    lapply(results, function(x) x[["run_results"]])
  )

  all_patients_in_system <- dplyr::bind_rows(
    lapply(results, function(x) x[["patients_in_system"]])
  )

  # Return a list containing the combined tables
  list(
    arrivals = all_arrivals,
    resources = all_resources,
    run_results = all_run_results,
    patients_in_system = all_patients_in_system
  )
}

Functional tests

Functional tests verify that the system or components perform their intended functionality.

For example, we expect that the number of arrivals should decrease if:

  • The patient inter-arrival time increases.
  • The length of the data collection period decreases.
import pytest
from simulation import Parameters, Runner


@pytest.mark.parametrize("param_name, initial_value, adjusted_value", [
    ("interarrival_time", 2, 15),
    ("data_collection_period", 2000, 500)
])
def test_arrivals_decrease(param_name, initial_value, adjusted_value):
    """
    Test that adjusting parameters reduces the number of arrivals as expected.
    """
    # Run model with initial value
    param = Parameters(**{param_name: initial_value})
    experiment = Runner(param)
    initial_arrivals = experiment.run_single(run=0)["run"]["arrivals"]

    # Run model with adjusted value
    param = Parameters(**{param_name: adjusted_value})
    experiment = Runner(param)
    adjusted_arrivals = experiment.run_single(run=0)["run"]["arrivals"]

    # Check that arrivals from adjusted model are less
    assert initial_arrivals > adjusted_arrivals, (
        f'Changing "{param_name}" from {initial_value} to {adjusted_value} ' +
        "did not decrease the number of arrivals as expected: observed " +
        f"{initial_arrivals} and {adjusted_arrivals} arrivals, respectively."
    )
NoteTest output
============================= test session starts ==============================
platform linux -- Python 3.11.9, pytest-8.4.1, pluggy-1.6.0
rootdir: /__w/des_rap_book/des_rap_book/pages/guide/verification_validation
plugins: anyio-4.11.0, timeout-2.4.0
collected 2 items

tests_resources/test_functional.py ..                                    [100%]

============================== 2 passed in 0.26s ===============================
<ExitCode.OK: 0>
library(testthat)
library(patrick)

with_parameters_test_that("Test that adjusting parameters reduces arrivals", {
  # Run model with initial value
  init_param <- create_params(number_of_runs = 1L)
  init_param[[param_name]] <- init_value
  init_results <- runner(init_param)[["run_results"]]

  # Run model with adjusted value
  adj_param <- create_params(number_of_runs = 1L)
  adj_param[[param_name]] <- adj_value
  adj_results <- runner(adj_param)[["run_results"]]

  # Check that arrivals from adjusted model are less
  expect_lt(adj_results[["arrivals"]], init_results[["arrivals"]])
},
cases(
  list(param_name = "interarrival_time",
       init_value = 2L, adj_value = 15L),
  list(param_name = "data_collection_period",
       init_value = 2000L, adj_value = 500L)
))
Test passed 😸
Test passed 😸

As another example, we expect that our model should fail if the number of doctors or the patient inter-arrival time were set to 0. This is tested using test_zero_inputs:

import pytest
from simulation import Parameters, Model


@pytest.mark.parametrize("param_name, message", [
    ("number_of_doctors", '"capacity" must be > 0.'),
    ("interarrival_time", "mean must be positive, got 0")
])
def test_zero_inputs(param_name, message):
    """
    Check that the model fails when inputs that are zero are used.

    Parameters
    ----------
    param_name : str
        Name of parameter to change in the parameter class.
    message : str
        Error message that expect to see.
    """
    # Create parameter class with value set to zero
    param = Parameters(**{param_name: 0})

    # Verify that initialising the model raises an error
    with pytest.raises(ValueError, match=message):
        Model(param=param, run_number=0)
NoteTest output
============================= test session starts ==============================
platform linux -- Python 3.11.9, pytest-8.4.1, pluggy-1.6.0
rootdir: /__w/des_rap_book/des_rap_book/pages/guide/verification_validation
plugins: anyio-4.11.0, timeout-2.4.0
collected 2 items

tests_resources/test_unit.py ..                                          [100%]

============================== 2 passed in 0.01s ===============================
<ExitCode.OK: 0>
library(patrick)
library(R.utils)
library(testthat)

with_parameters_test_that("Check that model fails with zero inputs", {
  # Create parameter object with value set to zero
  param <- create_params()
  param[[param_name]] <- 0L

  # Verify that initialising the model raises an error
  expect_error(
    withTimeout(
      model(param = param, run_number = 0L),
      timeout = 3L,
      onTimeout = "error"
    ),
    "must be greater than 0"
  )
}, param_name = c("number_of_doctors", "interarrival_time"))

These tests fail as we do not have error handling for these values. In fact, with inter-arrival time set to 0, the model will run infinitely and crash your computer! Hence the use of withTimeout in the test.

To address this, we could add error handling which raises an error for users if they try to input a value of 0 (see the parameter validation page).


══ Testing test_unit.R ═════════════════════════════════════════════════════════

[ FAIL 0 | WARN 0 | SKIP 0 | PASS 0 ]
[ FAIL 1 | WARN 0 | SKIP 0 | PASS 0 ]
[ FAIL 2 | WARN 0 | SKIP 0 | PASS 0 ]

── Failure ('test_unit.R:11:3'): Check that model fails with zero inputs param_name=number_of_doctors ──
`withTimeout(...)` threw an error with unexpected message.
Expected match: "must be greater than 0"
Actual message: "β„Ή In argument: `wait_time = .data[[\"serve_start\"]] -\n  .data[[\"start_time\"]]`.\nCaused by error in `.data[[\"serve_start\"]]`:\n! Column `serve_start` not found in `.data`."
Backtrace:
    β–†
 1. β”œβ”€rlang::eval_tidy(code, args)
 2. └─testthat::expect_error(...) at test_unit.R:11:3

── Error ('test_unit.R:11:3'): Check that model fails with zero inputs param_name=interarrival_time ──
<subscriptOutOfBoundsError/error/condition>
Error in `.subset2(cnd, "rlang")`: subscript out of bounds
Backtrace:
     β–†
  1. β”œβ”€rlang::eval_tidy(code, args)
  2. β”œβ”€testthat::expect_error(...) at test_unit.R:11:3
  3. β”‚ └─testthat:::quasi_capture(...) at testthat/R/expect-condition.R:126:5
  4. β”‚   β”œβ”€testthat (local) .capture(...) at testthat/R/quasi-label.R:54:3
  5. β”‚   β”‚ └─base::withCallingHandlers(...) at testthat/R/deprec-condition.R:23:5
  6. β”‚   └─rlang::eval_bare(quo_get_expr(.quo), quo_get_env(.quo)) at testthat/R/quasi-label.R:54:3
  7. β”œβ”€R.utils::withTimeout(...) at rlang/R/eval.R:96:3
  8. β”‚ └─base::tryCatch(...) at R.utils/R/withTimeout.R:149:3
  9. β”‚   └─base (local) tryCatchList(expr, classes, parentenv, handlers)
 10. β”‚     └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
 11. β”‚       └─value[[3L]](cond)
 12. β”‚         β”œβ”€R.oo::throw(ex) at R.utils/R/withTimeout.R:159:9
 13. β”‚         └─R.oo::throw.Exception(ex) at staging/1/R.oo:1:14
 14. β”‚           └─base::signalCondition(this) at R.oo/R/Exception.R:313:3
 15. └─testthat (local) `<fn>`(`<TmtExcpt>`)
 16.   └─rlang::cnd_entrace(cnd) at testthat/R/deprec-condition.R:7:7
 17.     └─rlang:::cnd_some(cnd, function(x) !is_null(x[["trace"]])) at rlang/R/cnd-entrace.R:242:3

[ FAIL 2 | WARN 0 | SKIP 0 | PASS 0 ]


We can identify other examples of functional tests from our model verification page. These include:

  • Assertion checking - tests check that assertions hold true.
  • Stress testing - tests simulate worst-case load and ensure model is robust under heavy demand.
  • Boundary testing - tests check that the behaviour at, just inside, and just outside each boundary (if you have input variables with explicit limits).
  • Idle system - tests run model with little or no activity/waits/service

Unit tests

Unit tests are a type of functional testing that focuses on individual components (e.g. methods, classes) and tests them in isolation to ensure they work as intended. They can form part of β€œbottom-up” testing for model verification (see verification and validation page).

However, since we’re using a simulation package (SimPy), much of the core functionality is already well-tested. As a result, it’s often more relevant to write functional tests focused on your simulation logic, involving interactions between multiple classes or functions (e.g. parameters and model).

However, since we’re using a simulation package (simmer), much of the core functionality is already well-tested. As a result, it’s often more relevant to write functional tests focused on your simulation logic, involving interactions between multiple classes or functions (e.g. parameters and model).

You may still find some uses for unit tests though, such as for custom validation methods or small utility functions, before running bigger integration checks.

Back tests

Back tests check that the model code produces results consistent with those generated historically/from prior code.

First, we’ll generate a set of expected results, with a specific set of parameters. Although this may seem unnecessary in this case, as they match our default parameters, these are still specified to ensure that we are testing on the same parameters, even if defaults change.

param = Parameters(
    interarrival_time=5,
    consultation_time=10,
    number_of_doctors=3,
    warm_up_period=30,
    data_collection_period=40,
    number_of_runs=5,
    verbose=False
)
param <- create_params(
  interarrival_time = 5L,
  consultation_time = 10L,
  number_of_doctors = 3L,
  warm_up_period = 30L,
  data_collection_period = 40L,
  number_of_runs = 5L, verbose = FALSE
)

We’ll then run the model and save the results to .csv files.

runner = Runner(param=param)
results = runner.run_reps()
results["patient"].to_csv("tests_resources/python_patient.csv", index=False)
results["run"].to_csv("tests_resources/python_run.csv", index=False)
results["overall"].to_csv("tests_resources/python_overall.csv", index=False)
results <- runner(param)
write.csv(arrange(results[["arrivals"]], replication, start_time),
          file.path("tests_resources", "r_arrivals.csv"),
          row.names = FALSE)
write.csv(results[["resources"]],
          file.path("tests_resources", "r_resources.csv"),
          row.names = FALSE)
write.csv(results[["run_results"]],
          file.path("tests_resources", "r_run_results.csv"),
          row.names = FALSE)

In the test, we’ll run the same model parameters, then import and compare against the saved .csv file to check for any differences.

from pathlib import Path

import pandas as pd
from simulation import Parameters, Runner


def test_reproduction():
    """
    Check that results from particular run of the model match those
    previously generated using the code.
    """
    # Define model parameters
    param = Parameters(
        interarrival_time=5,
        consultation_time=10,
        number_of_doctors=3,
        warm_up_period=30,
        data_collection_period=40,
        number_of_runs=5,
        verbose=False
    )

    # Run simulation
    runner = Runner(param=param)
    results = runner.run_reps()

    # Import expected results
    exp_patient = pd.read_csv(
        Path(__file__).parent.joinpath("python_patient.csv")
    )
    exp_run = pd.read_csv(
        Path(__file__).parent.joinpath("python_run.csv")
    )
    exp_overall = pd.read_csv(
        Path(__file__).parent.joinpath("python_overall.csv")
    )

    # Compare results
    pd.testing.assert_frame_equal(results["patient"], exp_patient)
    pd.testing.assert_frame_equal(results["run"], exp_run)
    pd.testing.assert_frame_equal(
        results["overall"].reset_index(drop=True), exp_overall
    )
NoteTest output
============================= test session starts ==============================
platform linux -- Python 3.11.9, pytest-8.4.1, pluggy-1.6.0
rootdir: /__w/des_rap_book/des_rap_book/pages/guide/verification_validation
plugins: anyio-4.11.0, timeout-2.4.0
collected 1 item

tests_resources/test_back.py .                                           [100%]

============================== 1 passed in 0.03s ===============================
<ExitCode.OK: 0>
testthat::test_file(file.path("tests_resources", "test_back.R"))

══ Testing test_back.R ═════════════════════════════════════════════════════════

[ FAIL 0 | WARN 0 | SKIP 0 | PASS 0 ]
[ FAIL 0 | WARN 0 | SKIP 0 | PASS 1 ]
[ FAIL 0 | WARN 0 | SKIP 0 | PASS 2 ]
[ FAIL 0 | WARN 0 | SKIP 0 | PASS 3 ] Done!

We generate the expected results for our backtest in a seperate Python file or Jupyter notebook, rather than within the test itself.

We generate the expected results for our backtest in a seperate R file or R markdown file, rather than within the test itself.

We then would generally run tests using the same pre-generated .csv files, without regenerating them. However, the test will fail if the model logic is intentionally changed, leading to different results from the same parameters.

In that case, if we are certain that these changes are the reason for differing results, we should re-run the Python file or notebook to regenerate the .csv.

In that case, if we are certain that these changes are the reason for differing results, we should re-run the R file or R markdown file to regenerate the .csv.

It is crucial to exercise caution when doing this, to avoid unintentionally overwriting correct expected results.

Test coverage

Coverage refers to the percentage of your code that is executed when you run your tests. It can help you spot parts of your code that are not included in any tests.

The pytest-cov package can be used to run coverage calculations alongside pytest. After installing it, simply run tests with the --cov flag:

pytest --cov

See the GitHub actions page for guidance on generating a coverage badge for your README within CI/CD.


If your research is structured as a package, then you can use devtools to calculate coverage:

devtools::test_coverage()


If not structured as a package, you can use covr’s file_coverage() function - for example:

file_coverage(source_files = c("param.R", "model.R"),
              test_files = c("test-unit.R", "test-back.R"))

Explore the example models

GitHub Click to visit pydesrap_mms repository

GitHub Click to visit pydesrap_stroke repository

A wide variety of tests are available in the tests/ folder.

Data for back tests are generated within notebooks/generate_exp_results.ipynb.

Tests are run and coverage are calculated on pushes to main via GitHub actions (.github/workflows/tests.yaml).

GitHub Click to visit rdesrap_mms repository

GitHub Click to visit rdesrap_stroke repository

A wide variety of tests are available in the tests/ folder.

Data for back tests are generated within notebooks/generate_exp_results.Rmd.

Tests are run via GitHub actions (.github/workflows/R-CMD-check.yaml) and a coverage badge is updated (.github/workflows/test-coverage.yaml).

Test yourself

Write tests! Look at example models for inspiration on what and how to test.

Start writing tests early, and run them often to catch issues as you develop.

Use coverage calculates to help you spot parts of you code not run in any tests.