SimPy: Treatment Centre#

Running the model

This version of the simulation model is in this notebook and can be executed online. You do not need to install anything on your computer. Move you mourse cursor over the rocket ship icon above and choose one of the two options: Binder (launches an external remote instance of Jupyter-Lab) or Thebe (code cells are excutable within the book).

See detailed instructions for more help.


1. Imports#

The model is provided with a conda virtual environment stars_treat_sim. Details are available in the Github repo.

import simpy
simpy.__version__
'4.1.1'
import numpy as np
import pandas as pd
import itertools
import math
import matplotlib.pyplot as plt
import fileinput

2. Constants and defaults for modelling as-is#

2.1 Distribution parameters#

# sign-in/triage parameters
DEFAULT_TRIAGE_MEAN = 3.0

# registration parameters
DEFAULT_REG_MEAN = 5.0
DEFAULT_REG_VAR= 2.0

# examination parameters
DEFAULT_EXAM_MEAN = 16.0
DEFAULT_EXAM_VAR = 3.0
DEFAULT_EXAM_MIN = 0.5

# trauma/stabilisation
DEFAULT_TRAUMA_MEAN = 90.0

# Trauma treatment
DEFAULT_TRAUMA_TREAT_MEAN = 30.0
DEFAULT_TRAUMA_TREAT_VAR = 4.0

# Non trauma treatment
DEFAULT_NON_TRAUMA_TREAT_MEAN = 13.3
DEFAULT_NON_TRAUMA_TREAT_VAR = 2.0

# prob patient requires treatment given trauma
DEFAULT_NON_TRAUMA_TREAT_P = 0.60

# proportion of patients triaged as trauma
DEFAULT_PROB_TRAUMA = 0.12

2.2 Time dependent arrival rates data#

The data for arrival rates varies between clinic opening at 6am and closure at 12am.

# data are held in the Github repo and loaded from there.
NSPP_PATH = 'https://raw.githubusercontent.com/TomMonks/' \
 + 'open-science-for-sim/main/src/notebooks/01_foss_sim/data/ed_arrivals.csv'

# visualise
ax = pd.read_csv(NSPP_PATH).plot(y='arrival_rate', x='period', rot=45,
                                 kind='bar',figsize=(12,5), legend=False)
ax.set_xlabel('hour of day')
ax.set_ylabel('mean arrivals');
../../_images/99a26021d5b436b5848674d9b70910261312e4223fab53ae3f0793d3e87daf49.png

2.3 Resource counts#

Inter count variables representing the number of resources at each activity in the processes.

DEFAULT_N_TRIAGE = 1
DEFAULT_N_REG = 1
DEFAULT_N_EXAM = 3
DEFAULT_N_TRAUMA = 2

# Non-trauma cubicles
DEFAULT_N_CUBICLES_1 = 1

# trauma pathway cubicles
DEFAULT_N_CUBICLES_2 = 1

2.4 Simulation model run settings#

# default random number SET
DEFAULT_RNG_SET = None
N_STREAMS = 20

# default results collection period
DEFAULT_RESULTS_COLLECTION_PERIOD = 60 * 19

# number of replications.
DEFAULT_N_REPS = 5

# Show the a trace of simulated events
# not recommended when running multiple replications
DEFAULT_TRACE_LEVEL = 0

## deprecated...
TRACE = True

3. Trace functionality#

Rather than using the print built in function to output a simulated trace it is desirable to control the if trace messages are shown or not shown. The SimulatedTrace class is partly based on R’s simmer functionality for output. A user sets a trace_level threshold when creating an instance of the class. The class is callable like a function and replaces the print built in e.g

trace = SimulatedTrace()
trace("hello world")

It is then trivial to suppress messages to a user

trace = SimulatedTrace(trace_level=2)

# this will not be printed as log_level is 0 by default
trace("hello world")

# this will NOT be printed in this experiment as log_level is less than 2
trace("level 1 message", trace_level=1)

# this will be printed!
trace("level 2 message", trace_level=2)
class SimulatedTrace:
    '''
    Utility class for printing a trace as the
    simulation model executes.
    '''
    def __init__(self, trace_level=0):
        '''Simulated Trace
        Log events as they happen.

        Params:
        -------
        log_level: int, optional (default=0)
            Minimum log level of a print statement
            in order for it to be logged.
        '''
        self.trace_level = trace_level
        
    def __call__(self, msg, trace_level=0):
        '''Override callable.
        This makes objects behave like functions.
        decorates the print function. conditional
        logic to print output or not.
        
        Params:
        ------
        msg: str
            string to print to screen.
        
        trace_level: int, optional (default=0)
            minimum trace level in order for the message
            to display
        
        '''
        if (trace_level >= self.trace_level):
            print(msg)

4. Distribution classes#

To help with controlling sampling numpy distributions are packaged up into classes that allow easy control of random numbers.

Distributions included:

  • Exponential

  • Log Normal

  • Bernoulli

  • Normal

  • Uniform

class Exponential:
    '''
    Convenience class for the exponential distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, mean, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        mean: float
            The mean of the exponential distribution
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rng = np.random.default_rng(seed=random_seed)
        self.mean = mean
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rng.exponential(self.mean, size=size)

    
class Bernoulli:
    '''
    Convenience class for the Bernoulli distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, p, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        p: float
            probability of drawing a 1
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rng = np.random.default_rng(seed=random_seed)
        self.p = p
        
    def sample(self, size=None):
        '''
        Generate a sample from the exponential distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rng.binomial(n=1, p=self.p, size=size)

class Lognormal:
    """
    Encapsulates a lognormal distirbution
    """
    def __init__(self, mean, stdev, random_seed=None):
        """
        Params:
        -------
        mean: float
            mean of the lognormal distribution
            
        stdev: float
            standard dev of the lognormal distribution
            
        random_seed: int, optional (default=None)
            Random seed to control sampling
        """
        self.rng = np.random.default_rng(seed=random_seed)
        mu, sigma = self.normal_moments_from_lognormal(mean, stdev**2)
        self.mu = mu
        self.sigma = sigma
        
    def normal_moments_from_lognormal(self, m, v):
        '''
        Returns mu and sigma of normal distribution
        underlying a lognormal with mean m and variance v
        source: https://blogs.sas.com/content/iml/2014/06/04/simulate-lognormal
        -data-with-specified-mean-and-variance.html

        Params:
        -------
        m: float
            mean of lognormal distribution
        v: float
            variance of lognormal distribution
                
        Returns:
        -------
        (float, float)
        '''
        phi = math.sqrt(v + m**2)
        mu = math.log(m**2/phi)
        sigma = math.sqrt(math.log(phi**2/m**2))
        return mu, sigma
        
    def sample(self):
        """
        Sample from the normal distribution
        """
        return self.rng.lognormal(self.mu, self.sigma)
class Normal:
    '''
    Convenience class for the normal distribution.
    packages up distribution parameters, seed and random generator.

    Use the minimum parameter to truncate the distribution
    '''
    def __init__(self, mean, sigma, minimum=None, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        mean: float
            The mean of the normal distribution
            
        sigma: float
            The stdev of the normal distribution
            
        minimum: float, optional (default=None)
            Used to truncate the distribution (e.g. to 0.0 or 0.5)
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rng = np.random.default_rng(seed=random_seed)
        self.mean = mean
        self.sigma = sigma
        self.minimum = minimum
        
    def sample(self, size=None):
        '''
        Generate a sample from the normal distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        samples = self.rng.normal(self.mean, self.sigma, size=size)

        if self.minimum is None:
            return samples
        elif size is None:
            return max(self.minimum, samples)
        else:
            # index of samples with negative value
            neg_idx = np.where(samples < 0)[0]
            samples[neg_idx] = self.minimum
            return samples

    
class Uniform():
    '''
    Convenience class for the Uniform distribution.
    packages up distribution parameters, seed and random generator.
    '''
    def __init__(self, low, high, random_seed=None):
        '''
        Constructor
        
        Params:
        ------
        low: float
            lower range of the uniform
            
        high: float
            upper range of the uniform
        
        random_seed: int, optional (default=None)
            A random seed to reproduce samples.  If set to none then a unique
            sample is created.
        '''
        self.rand = np.random.default_rng(seed=random_seed)
        self.low = low
        self.high = high
        
    def sample(self, size=None):
        '''
        Generate a sample from the uniform distribution
        
        Params:
        -------
        size: int, optional (default=None)
            the number of samples to return.  If size=None then a single
            sample is returned.
        '''
        return self.rand.uniform(low=self.low, high=self.high, size=size)

5. Model parameterisation#

For convienience a container class is used to hold the large number of model parameters. The Scenario class includes defaults these can easily be changed and at runtime to experiments with different designs.

class Scenario:
    '''
    Container class for scenario parameters/arguments
    
    Passed to a model and its process classes
    '''
    def __init__(self, random_number_set=DEFAULT_RNG_SET,
                 n_triage=DEFAULT_N_TRIAGE,
                 n_reg=DEFAULT_N_REG,
                 n_exam=DEFAULT_N_EXAM,
                 n_trauma=DEFAULT_N_TRAUMA,
                 n_cubicles_1=DEFAULT_N_CUBICLES_1,
                 n_cubicles_2=DEFAULT_N_CUBICLES_2,
                 triage_mean=DEFAULT_TRIAGE_MEAN,
                 reg_mean=DEFAULT_REG_MEAN,
                 reg_var=DEFAULT_REG_VAR,
                 exam_mean=DEFAULT_EXAM_MEAN,
                 exam_var=DEFAULT_EXAM_VAR,
                 exam_min=DEFAULT_EXAM_MIN,
                 trauma_mean=DEFAULT_TRAUMA_MEAN,
                 trauma_treat_mean=DEFAULT_TRAUMA_TREAT_MEAN,
                 trauma_treat_var=DEFAULT_TRAUMA_TREAT_VAR,
                 non_trauma_treat_mean=DEFAULT_NON_TRAUMA_TREAT_MEAN,
                 non_trauma_treat_var=DEFAULT_NON_TRAUMA_TREAT_VAR,
                 non_trauma_treat_p=DEFAULT_NON_TRAUMA_TREAT_P,
                 prob_trauma=DEFAULT_PROB_TRAUMA,
                 trace_level=DEFAULT_TRACE_LEVEL):
        '''
        Create a scenario to parameterise the simulation model
        
        Parameters:
        -----------
        random_number_set: int, optional (default=DEFAULT_RNG_SET)
            Set to control the initial seeds of each stream of pseudo
            random numbers used in the model.
        
        n_triage: int
            The number of triage cubicles
        
        n_reg: int
            The number of registration clerks
            
        n_exam: int
            The number of examination rooms
            
        n_trauma: int
            The number of trauma bays for stabilisation
            
        n_cubicles_1: int
            The number of non-trauma treatment cubicles
            
        n_cubicles_2: int
            The number of trauma treatment cubicles
            
        triage_mean: float
            Mean duration of the triage distribution (Exponential)
            
        reg_mean: float
            Mean duration of the registration distribution (Lognormal)
            
        reg_var: float
            Variance of the registration distribution (Lognormal)
            
        exam_mean: float
            Mean of the examination distribution (Normal)
            
        exam_var: float
            Variance of the examination distribution (Normal)

        exam_min: float
            The minimum value that an examination can take (Truncated Normal)
            
        trauma_mean: float
            Mean of the trauma stabilisation distribution (Exponential)
            
        trauma_treat_mean: float
            Mean of the trauma cubicle treatment distribution (Lognormal)
            
        trauma_treat_var: float
            Variance of the trauma cubicle treatment distribution (Lognormal)
        
        non_trauma_treat_mean: float
            Mean of the non trauma treatment distribution
            
        non_trauma_treat_var: float
            Variance of the non trauma treatment distribution
        
        non_trauma_treat_p: float
            Probability non trauma patient requires treatment
            
        prob_trauma: float
            probability that a new arrival is a trauma patient.
            
        trace_level: int
            minimum log level of the experiment. Controls
            display of simulated trace.
        '''
        # sampling
        self.random_number_set = random_number_set
        
        # store parameters for sampling
        self.triage_mean = triage_mean
        self.reg_mean = reg_mean
        self.reg_var = reg_var
        self.exam_mean= exam_mean
        self.exam_var = exam_var
        self.exam_min = exam_min
        self.trauma_mean = trauma_mean
        self.trauma_treat_mean = trauma_treat_mean
        self.trauma_treat_var = trauma_treat_var
        self.non_trauma_treat_mean = non_trauma_treat_mean
        self.non_trauma_treat_var = non_trauma_treat_var
        self.non_trauma_treat_p = non_trauma_treat_p
        self.prob_trauma = prob_trauma
        
        self.trace = SimulatedTrace(trace_level)
        
        self.init_sampling()
        
        # count of each type of resource
        self.init_resourse_counts(n_triage, n_reg, n_exam, n_trauma,
                                  n_cubicles_1, n_cubicles_2)
    
    def set_random_no_set(self, random_number_set):
        '''
        Controls the random sampling 
        Parameters:
        ----------
        random_number_set: int
            Used to control the set of pseudo random numbers
            used by the distributions in the simulation.
        '''
        self.random_number_set = random_number_set
        self.init_sampling()

    def init_resourse_counts(self, n_triage, n_reg, n_exam, n_trauma,
                             n_cubicles_1, n_cubicles_2):
        '''
        Init the counts of resources to default values...
        '''
        self.n_triage = n_triage
        self.n_reg = n_reg
        self.n_exam = n_exam
        self.n_trauma = n_trauma
        
        # non-trauma (1), trauma (2) treatment cubicles
        self.n_cubicles_1 = n_cubicles_1
        self.n_cubicles_2 = n_cubicles_2

    def init_sampling(self):
        '''
        Create the distributions used by the model and initialise 
        the random seeds of each.
        '''       
        # MODIFICATION. Better method for producing n non-overlapping streams
        seed_sequence = np.random.SeedSequence(self.random_number_set)
    
        # Generate n high quality child seeds
        self.seeds = seed_sequence.spawn(N_STREAMS)
        
        # create distributions
        
        # Triage duration
        self.triage_dist = Exponential(self.triage_mean, 
                                       random_seed=self.seeds[0])
        
        # Registration duration (non-trauma only)
        self.reg_dist = Lognormal(self.reg_mean, 
                                  np.sqrt(self.reg_var),
                                  random_seed=self.seeds[1])
        
        # Evaluation (non-trauma only)
        self.exam_dist = Normal(self.exam_mean,
                                np.sqrt(self.exam_var),
                                minimum=self.exam_min,
                                random_seed=self.seeds[2])
        
        # Trauma/stablisation duration (trauma only)
        self.trauma_dist = Exponential(self.trauma_mean, 
                                       random_seed=self.seeds[3])
        
        # Non-trauma treatment
        self.nt_treat_dist = Lognormal(self.non_trauma_treat_mean, 
                                       np.sqrt(self.non_trauma_treat_var),
                                       random_seed=self.seeds[4])
        
        # treatment of trauma patients
        self.treat_dist = Lognormal(self.trauma_treat_mean, 
                                    np.sqrt(self.trauma_treat_var),
                                    random_seed=self.seeds[5])
        
        # probability of non-trauma patient requiring treatment
        self.nt_p_treat_dist = Bernoulli(self.non_trauma_treat_p, 
                                         random_seed=self.seeds[6])
        
        
        # probability of non-trauma versus trauma patient
        self.p_trauma_dist = Bernoulli(self.prob_trauma, 
                                       random_seed=self.seeds[7])
        
        # init sampling for non-stationary poisson process
        self.init_nspp()
        
    def init_nspp(self):
        
        # read arrival profile
        self.arrivals = pd.read_csv(NSPP_PATH)
        self.arrivals['mean_iat'] = 60 / self.arrivals['arrival_rate']
       
        # maximum arrival rate (smallest time between arrivals)
        self.lambda_max = self.arrivals['arrival_rate'].max()
        
        # thinning exponential
        self.arrival_dist = Exponential(60.0 / self.lambda_max,
                                        random_seed=self.seeds[8])
        
        # thinning uniform rng
        self.thinning_rng = Uniform(low=0.0, high=1.0, 
                                    random_seed=self.seeds[9])

6. Patient Pathways Process Logic#

simpy uses a process based worldview. We can easily create whatever logic - simple or complex for the model. Here the process logic for trauma and non-trauma patients is seperated into two classes TraumaPathway and NonTraumaPathway.

class TraumaPathway:
    '''
    Encapsulates the process a patient with severe injuries or illness.
    
    These patients are signed into the ED and triaged as having severe injuries
    or illness.
    
    Patients are stabilised in resus (trauma) and then sent to Treatment.  
    Following treatment they are discharged.
    '''
    def __init__(self, identifier, env, args):
        '''
        Constructor method
        
        Params:
        -----
        identifier: int
            a numeric identifier for the patient.
            
        env: simpy.Environment
            the simulation environment
            
        args: Scenario
            Container class for the simulation parameters
            
        '''
        self.identifier = identifier
        self.env = env
        self.args = args
        self.trace = args.trace
        
        # metrics
        self.arrival = -np.inf
        self.wait_triage = -np.inf
        self.wait_trauma = -np.inf
        self.wait_treat = -np.inf
        self.total_time = -np.inf
        
        self.triage_duration = -np.inf
        self.trauma_duration = -np.inf
        self.treat_duration = -np.inf
        
    def execute(self):
        '''
        simulates the major treatment process for a patient
        
        1. request and wait for sign-in/triage
        2. trauma
        3. treatment
        '''
        # record the time of arrival and entered the triage queue
        self.arrival = self.env.now

        # request sign-in/triage 
        with self.args.triage.request() as req:
            yield req
            # record the waiting time for triage
            self.wait_triage = self.env.now - self.arrival     
            
            self.trace(f'patient {self.identifier} triaged to trauma '
                       f'{self.env.now:.3f}')
        
            # sample triage duration.
            self.triage_duration = self.args.triage_dist.sample()
            yield self.env.timeout(self.triage_duration)
            self.triage_complete()
            
        # record the time that entered the trauma queue
        start_wait = self.env.now
    
        # request trauma room 
        with self.args.trauma.request() as req:
            yield req
            
            # record the waiting time for trauma
            self.wait_trauma = self.env.now - start_wait
            
            # sample stablisation duration.
            self.trauma_duration = self.args.trauma_dist.sample()
            yield self.env.timeout(self.trauma_duration)
            
            self.trauma_complete()
            
        # record the time that entered the treatment queue
        start_wait = self.env.now
    
        # request treatment cubicle 
        with self.args.cubicle_2.request() as req:
            yield req
            
            # record the waiting time for trauma
            self.wait_treat = self.env.now - start_wait
            self.trace(f'treatment of patient {self.identifier} at '
                       f'{self.env.now:.3f}')
            
            # sample treatment duration.
            self.treat_duration = self.args.treat_dist.sample()
            yield self.env.timeout(self.treat_duration)
            
            self.treatment_complete()
        
        # total time in system
        self.total_time = self.env.now - self.arrival     
            
    def triage_complete(self):
        '''
        Triage complete event
        '''
        self.trace(f'triage {self.identifier} complete {self.env.now:.3f}; '
                   f'waiting time was {self.wait_triage:.3f}')
          
    def trauma_complete(self):
        '''
        Patient stay in trauma is complete.
        '''
        self.trace(f'stabilisation of patient {self.identifier} at '
                   f'{self.env.now:.3f}')
    
    def treatment_complete(self):
        '''
        Treatment complete event
        '''
        self.trace(f'patient {self.identifier} treatment complete {self.env.now:.3f}; '
                   f'waiting time was {self.wait_treat:.3f}')
class NonTraumaPathway:
    '''
    Encapsulates the process a patient with minor injuries and illness.
    
    These patients are signed into the ED and triaged as having minor 
    complaints and streamed to registration and then examination. 
    
    Post examination 40% are discharged while 60% proceed to treatment.  
    Following treatment they are discharged.
    '''
    def __init__(self, identifier, env, args):
        '''
        Constructor method
        
        Params:
        -----
        identifier: int
            a numeric identifier for the patient.
            
        env: simpy.Environment
            the simulation environment
            
        args: Scenario
            Container class for the simulation parameters
            
        '''
        self.identifier = identifier
        self.env = env
        self.args = args
        self.trace = args.trace
        
        # triage resource
        self.triage = args.triage
        
        # metrics
        self.arrival = -np.inf
        self.wait_triage = -np.inf
        self.wait_reg = -np.inf
        self.wait_exam = -np.inf
        self.wait_treat = -np.inf
        self.total_time = -np.inf
        
        self.triage_duration = -np.inf
        self.reg_duration = -np.inf
        self.exam_duration = -np.inf
        self.treat_duration = -np.inf
        
        
    def execute(self):
        '''
        simulates the non-trauma/minor treatment process for a patient
        
        1. request and wait for sign-in/triage
        2. patient registration
        3. examination
        4.1 40% discharged
        4.2 60% treatment then discharge
        '''
        # record the time of arrival and entered the triage queue
        self.arrival = self.env.now

        # request sign-in/triage 
        with self.triage.request() as req:
            yield req
            
            # record the waiting time for triage
            self.wait_triage = self.env.now - self.arrival
            self.trace(f'patient {self.identifier} triaged to minors '
                       f'{self.env.now:.3f}')
            
            # sample triage duration.
            self.triage_duration = self.args.triage_dist.sample()
            yield self.env.timeout(self.triage_duration)
            
            self.trace(f'triage {self.identifier} complete {self.env.now:.3f}; '
                       f'waiting time was {self.wait_triage:.3f}')
            
        # record the time that entered the registration queue
        start_wait = self.env.now
    
        # request registration clert 
        with self.args.registration.request() as req:
            yield req
            
            # record the waiting time for registration
            self.wait_reg = self.env.now - start_wait
            self.trace(f'registration of patient {self.identifier} at '
                       f'{self.env.now:.3f}')
            
            # sample registration duration.
            self.reg_duration = self.args.reg_dist.sample()
            yield self.env.timeout(self.reg_duration)
            
            self.trace(f'patient {self.identifier} registered at'
                       f'{self.env.now:.3f}; '
                       f'waiting time was {self.wait_reg:.3f}')
            
        # record the time that entered the evaluation queue
        start_wait = self.env.now
    
        # request examination resource
        with self.args.exam.request() as req:
            yield req
            
            # record the waiting time for registration
            self.wait_exam = self.env.now - start_wait
            self.trace(f'examination of patient {self.identifier} begins '
                       f'{self.env.now:.3f}')
            
            # sample examination duration.
            self.exam_duration = self.args.exam_dist.sample()
            yield self.env.timeout(self.exam_duration)
            
            self.trace(f'patient {self.identifier} examination complete ' 
                       f'at {self.env.now:.3f};'
                       f'waiting time was {self.wait_exam:.3f}')
            
        # sample if patient requires treatment?
        self.require_treat = self.args.nt_p_treat_dist.sample()
        
        if self.require_treat:
        
            # record the time that entered the treatment queue
            start_wait = self.env.now
    
            # request treatment cubicle
            with self.args.cubicle_1.request() as req:
                yield req

                # record the waiting time for treatment
                self.wait_treat = self.env.now - start_wait
                self.trace(f'treatment of patient {self.identifier} begins '
                           f'{self.env.now:.3f}')

                # sample treatment duration.
                self.treat_duration = self.args.nt_treat_dist.sample()
                yield self.env.timeout(self.treat_duration)

                self.trace(f'patient {self.identifier} treatment complete '
                           f'at {self.env.now:.3f};'
                           f'waiting time was {self.wait_treat:.3f}')
                
        # total time in system
        self.total_time = self.env.now - self.arrival      

7. Main model class#

The main class that a user interacts with to run the model is TreatmentCentreModel. This implements a .run() method, contains a simple algorithm for the non-stationary poission process for patients arrivals and inits instances of TraumaPathway or NonTraumaPathway depending on the arrival type.

class TreatmentCentreModel:
    '''
    The treatment centre model
    
    Patients arrive at random to a treatment centre, are triaged
    and then processed in either a trauma or non-trauma pathway.
    '''
    def __init__(self, args):
        self.env = simpy.Environment()
        self.args = args
        self.trace = args.trace
        self.init_resources()
        
        self.patients = []
        self.trauma_patients = []
        self.non_trauma_patients = []

        self.rc_period = None
        self.results = None
        
    def init_resources(self):
        '''
        Init the number of resources
        and store in the arguments container object
        
        Resource list:
        1. Sign-in/triage bays
        2. registration clerks
        3. examination bays
        4. trauma bays
        5. non-trauma cubicles (1)
        6. trauma cubicles (2)
         
        '''
        # sign/in triage
        self.args.triage = simpy.Resource(self.env, 
                                          capacity=self.args.n_triage)
        
        # registration
        self.args.registration = simpy.Resource(self.env, 
                                                capacity=self.args.n_reg)
        
        # examination
        self.args.exam = simpy.Resource(self.env, 
                                        capacity=self.args.n_exam)
        
        # trauma
        self.args.trauma = simpy.Resource(self.env, 
                                          capacity=self.args.n_trauma)
        
        # non-trauma treatment
        self.args.cubicle_1 = simpy.Resource(self.env, 
                                              capacity=self.args.n_cubicles_1)
            
        # trauma treatment
        self.args.cubicle_2 = simpy.Resource(self.env, 
                                              capacity=self.args.n_cubicles_2)
        
        
        
    def run(self, results_collection_period=DEFAULT_RESULTS_COLLECTION_PERIOD):
        '''
        Conduct a single run of the model in its current 
        configuration
        
        
        Parameters:
        ----------
        results_collection_period, float, optional
            default = DEFAULT_RESULTS_COLLECTION_PERIOD
            
        warm_up, float, optional (default=0)
        
            length of initial transient period to truncate
            from results.
            
        Returns:
        --------
            None
        '''
        # setup the arrival generator process
        self.env.process(self.arrivals_generator())
        
        # store rc perio
        self.rc_period = results_collection_period
        
        # run
        self.env.run(until=results_collection_period)
        
    
    def arrivals_generator(self):  
        ''' 
        Simulate the arrival of patients to the model
        
        Patients either follow a TraumaPathway or
        NonTraumaPathway simpy process.
        
        Non stationary arrivals implemented via Thinning acceptance-rejection 
        algorithm.
        '''
        for patient_count in itertools.count():

            # this give us the index of dataframe to use
            t = int(self.env.now // 60) % self.args.arrivals.shape[0]
            lambda_t = self.args.arrivals['arrival_rate'].iloc[t]

            #set to a large number so that at least 1 sample taken!
            u = np.Inf
            
            interarrival_time = 0.0

            # reject samples if u >= lambda_t / lambda_max
            while u >= (lambda_t / self.args.lambda_max):
                interarrival_time += self.args.arrival_dist.sample()
                u = self.args.thinning_rng.sample()

            # iat
            yield self.env.timeout(interarrival_time)
            
            self.trace(f'patient {patient_count} arrives at: {self.env.now:.3f}')
            
            # sample if the patient is trauma or non-trauma
            trauma = self.args.p_trauma_dist.sample()
            if trauma:
                # create and store a trauma patient to update KPIs.
                new_patient = TraumaPathway(patient_count, self.env, self.args)
                self.trauma_patients.append(new_patient)
            else:
                # create and store a non-trauma patient to update KPIs.
                new_patient = NonTraumaPathway(patient_count, self.env, 
                                               self.args)
                self.non_trauma_patients.append(new_patient)
                
            # start the pathway process for the patient
            self.env.process(new_patient.execute())

8. Logic to process end of run results.#

the class SimulationSummary accepts a TraumaCentreModel. At the end of a run it can be used calculate mean queuing times and the percentage of the total run that a resource was in use.

class SimulationSummary:
    '''
    End of run result processing logic of the simulation model
    '''
    def __init__(self, model):
        '''
        Constructor
        
        Params:
        ------
        model: TraumaCentreModel
            The model.
        '''
        self.model = model
        self.args = model.args
        self.results = None
    
    def process_run_results(self):
        '''
        Calculates statistics at end of run.
        '''
        self.results = {}
        # list of all patients 
        patients = self.model.non_trauma_patients + self.model.trauma_patients
        
        # mean triage times (both types of patient)
        mean_triage_wait = self.get_mean_metric('wait_triage', patients)
        
        # triage utilisation (both types of patient)
        triage_util = self.get_resource_util('triage_duration', 
                                             self.args.n_triage, 
                                             patients)
        
        # mean waiting time for registration (non_trauma)
        mean_reg_wait = self.get_mean_metric('wait_reg', 
                                             self.model.non_trauma_patients)
        
        # registration utilisation (trauma)
        reg_util = self.get_resource_util('reg_duration', 
                                          self.args.n_reg, 
                                          self.model.non_trauma_patients)
        
        # mean waiting time for examination (non_trauma)
        mean_wait_exam = self.get_mean_metric('wait_exam', 
                                              self.model.non_trauma_patients)
        
        # examination utilisation (non-trauma)
        exam_util = self.get_resource_util('exam_duration', 
                                            self.args.n_exam, 
                                            self.model.non_trauma_patients)
        
        
        # mean waiting time for treatment (non-trauma) 
        mean_treat_wait = self.get_mean_metric('wait_treat', 
                                               self.model.non_trauma_patients)
        
        # treatment utilisation (non_trauma)
        treat_util1 = self.get_resource_util('treat_duration', 
                                             self.args.n_cubicles_1, 
                                             self.model.non_trauma_patients)
        
        # mean total time (non_trauma)
        mean_total = self.get_mean_metric('total_time', 
                                          self.model.non_trauma_patients)
                                    
        # mean waiting time for trauma 
        mean_trauma_wait = self.get_mean_metric('wait_trauma', 
                                                self.model.trauma_patients)
        
        # trauma utilisation (trauma)
        trauma_util = self.get_resource_util('trauma_duration', 
                                             self.args.n_trauma, 
                                             self.model.trauma_patients)
        
        # mean waiting time for treatment (rauma) 
        mean_treat_wait2 = self.get_mean_metric('wait_treat', 
                                                self.model.trauma_patients)
        
        # treatment utilisation (trauma)
        treat_util2 = self.get_resource_util('treat_duration', 
                                             self.args.n_cubicles_2, 
                                             self.model.trauma_patients)

        # mean total time (trauma)
        mean_total2 = self.get_mean_metric('total_time', 
                                           self.model.trauma_patients)
        
        
        self.results = {'00_arrivals':len(patients),
                        '01a_triage_wait': mean_triage_wait,
                        '01b_triage_util': triage_util,
                        '02a_registration_wait':mean_reg_wait,
                        '02b_registration_util': reg_util,
                        '03a_examination_wait':mean_wait_exam,
                        '03b_examination_util': exam_util,
                        '04a_treatment_wait(non_trauma)':mean_treat_wait,
                        '04b_treatment_util(non_trauma)':treat_util1,
                        '05_total_time(non-trauma)':mean_total,
                        '06a_trauma_wait':mean_trauma_wait,
                        '06b_trauma_util':trauma_util,
                        '07a_treatment_wait(trauma)':mean_treat_wait2,
                        '07b_treatment_util(trauma)':treat_util2,
                        '08_total_time(trauma)':mean_total2,
                        '09_throughput': self.get_throughput(patients)}
        
    def get_mean_metric(self, metric, patients):
        '''
        Calculate mean of the performance measure for the
        select cohort of patients,
        
        Only calculates metrics for patients where it has been 
        measured.
        
        Params:
        -------
        metric: str
            The name of the metric e.g. 'wait_treat'
            
        patients: list
            A list of patients
        '''
        mean = np.array([getattr(p, metric) for p in patients 
                         if getattr(p, metric) > -np.inf]).mean()
        return mean
    
    
    def get_resource_util(self, metric, n_resources, patients):
        '''
        Calculate proportion of the results collection period
        where a resource was in use.
        
        Done by tracking the duration by patient.
        
        Only calculates metrics for patients where it has been 
        measured.
        
        Params:
        -------
        metric: str
            The name of the metric e.g. 'treatment_duration'
            
        patients: list
            A list of patients
        '''
        total = np.array([getattr(p, metric) for p in patients 
                         if getattr(p, metric) > -np.inf]).sum() 
        
        return total / (self.model.rc_period * n_resources)
    
    def get_throughput(self, patients):
        '''
        Returns the total number of patients that have successfully
        been processed and discharged in the treatment centre
        (they have a total time record)
        
        Params:
        -------
        patients: list
            list of all patient objects simulated.
        
        Returns:
        ------
        float
        '''
        return len([p for p in patients if p.total_time > -np.inf])
    
    def summary_frame(self):
        '''
        Returns run results as a pandas.DataFrame
        
        Returns:
        -------
        pd.DataFrame
        '''
        #append to results df
        if self.results is None:
            self.process_run_results()

        df = pd.DataFrame({'1':self.results})
        df = df.T
        df.index.name = 'rep'
        return df
    

9. Model execution#

We note that there are many ways to setup a simpy model and execute it (that is part of its fantastic flexibility). The organisation of code we show below is based on our experience of using the package in practice. The approach also allows for easy parallisation over multiple CPU cores using joblib.

We include two functions. single_run() and multiple_replications. The latter is used to repeatedly call and process the results from single_run.

def single_run(scenario, rc_period=DEFAULT_RESULTS_COLLECTION_PERIOD, 
               random_no_set=DEFAULT_RNG_SET):
    '''
    Perform a single run of the model and return the results
    
    Parameters:
    -----------
    
    scenario: Scenario object
        The scenario/paramaters to run
        
    rc_period: int
        The length of the simulation run that collects results
        
    random_no_set: int or None, optional (default=DEFAULT_RNG_SET)
        Controls the set of random seeds used by the stochastic parts of the 
        model.  Set to different ints to get different results.  Set to None
        for a random set of seeds.
        
    Returns:
    --------
        pandas.DataFrame:
        results from single run.
    '''  
        
    # set random number set - this controls sampling for the run.
    scenario.set_random_no_set(random_no_set)

    # create an instance of the model
    model = TreatmentCentreModel(scenario)

    # run the model
    model.run(results_collection_period=rc_period)
        
    # run results
    summary = SimulationSummary(model)
    summary_df = summary.summary_frame()
    
    return summary_df
def multiple_replications(scenario, rc_period=DEFAULT_RESULTS_COLLECTION_PERIOD, 
                          n_reps=5):
    '''
    Perform multiple replications of the model.
    
    Params:
    ------
    scenario: Scenario
        Parameters/arguments to configurethe model
    
    rc_period: float, optional (default=DEFAULT_RESULTS_COLLECTION_PERIOD)
        results collection period.  
        the number of minutes to run the model to collect results

    n_reps: int, optional (default=DEFAULT_N_REPS)
        Number of independent replications to run.
        
    Returns:
    --------
    pandas.DataFrame
    '''

    results = [single_run(scenario, rc_period, random_no_set=rep) 
               for rep in range(n_reps)]
    
    #format and return results in a dataframe
    df_results = pd.concat(results)
    df_results.index = np.arange(1, len(df_results)+1)
    df_results.index.name = 'rep'
    return df_results

9.1 Single run of the model#

The script below performs a single replication of the simulation model.

Try:

  • Changing the random_no_set of the single_run call.

  • Assigning the value True to TRACE

# Change this to True to see a trace...
#TRACE = False

# create the default scenario. 
args = Scenario(trace_level=2)

# use the single_run() func
# try changing `random_no_set` to see different run results
print('Running simulation ...', end=' => ')
results = single_run(args, random_no_set=42)
print('simulation complete.')

# show results (transpose replication for easier view)
results.T
Running simulation ... => simulation complete.
rep 1
00_arrivals 209.000000
01a_triage_wait 16.622674
01b_triage_util 0.527512
02a_registration_wait 111.161345
02b_registration_util 0.801061
03a_examination_wait 24.927965
03b_examination_util 0.851285
04a_treatment_wait(non_trauma) 172.435861
04b_treatment_util(non_trauma) 0.845652
05_total_time(non-trauma) 248.848441
06a_trauma_wait 201.144403
06b_trauma_util 0.919830
07a_treatment_wait(trauma) 22.620880
07b_treatment_util(trauma) 0.493576
08_total_time(trauma) 310.384648
09_throughput 155.000000

9.2 Multiple independent replications#

Given the set up it is now easy to perform multiple replications of the model.

%%time
args = Scenario(trace_level=2)

#run multiple replications.
#by default it runs 5 replications.
print('Running multiple replications', end=' => ')
results  = multiple_replications(args, n_reps=50)
print('done.\n')
results.head(3)
Running multiple replications => 
done.

CPU times: user 1.32 s, sys: 60.9 ms, total: 1.38 s
Wall time: 4.83 s
00_arrivals 01a_triage_wait 01b_triage_util 02a_registration_wait 02b_registration_util 03a_examination_wait 03b_examination_util 04a_treatment_wait(non_trauma) 04b_treatment_util(non_trauma) 05_total_time(non-trauma) 06a_trauma_wait 06b_trauma_util 07a_treatment_wait(trauma) 07b_treatment_util(trauma) 08_total_time(trauma) 09_throughput
rep
1 230.0 24.280943 0.613250 103.242292 0.854504 31.089680 0.861719 152.483394 0.890904 234.759918 236.508444 1.028887 13.701783 0.607996 346.698079 171.0
2 227.0 57.120114 0.621348 90.002385 0.836685 14.688492 0.847295 120.245474 0.912127 233.882040 133.813901 0.834124 3.715446 0.367507 301.521195 161.0
3 229.0 28.659383 0.573698 112.242503 0.848514 21.374092 0.856306 94.019885 0.868888 208.361290 276.422566 0.874245 12.252175 0.464740 440.515502 167.0
# summarise the results (2.dp)
results.mean().round(2)
00_arrivals                       227.72
01a_triage_wait                    35.24
01b_triage_util                     0.61
02a_registration_wait             105.57
02b_registration_util               0.84
03a_examination_wait               25.55
03b_examination_util                0.85
04a_treatment_wait(non_trauma)    136.66
04b_treatment_util(non_trauma)      0.87
05_total_time(non-trauma)         234.34
06a_trauma_wait                   151.68
06b_trauma_util                     0.83
07a_treatment_wait(trauma)         14.31
07b_treatment_util(trauma)          0.50
08_total_time(trauma)             292.28
09_throughput                     162.16
dtype: float64

9.3 Visualise replications#

fig, ax = plt.subplots(2, 1, figsize=(12,4))
ax[0].hist(results['01a_triage_wait']);
ax[0].set_ylabel('wait for triage')
ax[1].hist(results['02a_registration_wait']);
ax[1].set_ylabel('wait for registration');
../../_images/f5fe031021211fd6158e17f67c6445a4b0ae50d0be356499ee3c32321f31b7f6.png

## 10. Scenario Analysis

The structured approach we took to organising our simpy model allows us to easily experiment with alternative scenarios. We could employ a formal experimental design if needed. For simplicity here we will limit ourselves by running user chosen competing scenarios and compare their mean performance to the base case.

Note that we have our simpy model includes an implementation of Common Random Numbers across scenarios.

def get_scenarios(trace_level=2):
    '''
    Creates a dictionary object containing
    objects of type `Scenario` to run.
    
    Returns:
    --------
    dict
        Contains the scenarios for the model
    '''
    scenarios = {}
    scenarios['base'] = Scenario(trace_level=trace_level)
    
    # extra triage capacity
    scenarios['triage+1'] = Scenario(n_triage=DEFAULT_N_TRIAGE+1,
                                     trace_level=trace_level)
        
    # extra examination capacity
    scenarios['exam+1'] = Scenario(n_exam=DEFAULT_N_EXAM+1,
                                   trace_level=trace_level)
    
    # extra non-trauma treatment capacity
    scenarios['treat+1'] = Scenario(n_cubicles_1=DEFAULT_N_CUBICLES_1+1,
                                    trace_level=trace_level)
    
    scenarios['triage+exam'] = Scenario(n_triage=DEFAULT_N_TRIAGE+1,
                                        n_exam=DEFAULT_N_EXAM+1,
                                        trace_level=trace_level)
    
    return scenarios
def run_scenario_analysis(scenarios, rc_period, n_reps):
    '''
    Run each of the scenarios for a specified results
    collection period and replications.
    
    Params:
    ------
    scenarios: dict
        dictionary of Scenario objects
        
    rc_period: float
        model run length
        
    n_rep: int
        Number of replications
    
    '''
    print('Scenario Analysis')
    print(f'No. Scenario: {len(scenarios)}')
    print(f'Replications: {n_reps}')

    scenario_results = {}
    for sc_name, scenario in scenarios.items():
        
        print(f'Running {sc_name}', end=' => ')
        replications = multiple_replications(scenario, rc_period=rc_period,
                                              n_reps=n_reps)
        print('done.\n')
        
        #save the results
        scenario_results[sc_name] = replications
    
    print('Scenario analysis complete.')
    return scenario_results

10.1 Script to run scenario analysis#

#number of replications
N_REPS = 20

#get the scenarios
scenarios = get_scenarios()

#run the scenario analysis
scenario_results = run_scenario_analysis(scenarios, 
                                         DEFAULT_RESULTS_COLLECTION_PERIOD,
                                         N_REPS)
Scenario Analysis
No. Scenario: 5
Replications: 20
Running base => 
done.

Running triage+1 => 
done.

Running exam+1 => 
done.

Running treat+1 => 
done.

Running triage+exam => 
done.

Scenario analysis complete.
def scenario_summary_frame(scenario_results):
    '''
    Mean results for each performance measure by scenario
    
    Parameters:
    ----------
    scenario_results: dict
        dictionary of replications.  
        Key identifies the performance measure
        
    Returns:
    -------
    pd.DataFrame
    '''
    columns = []
    summary = pd.DataFrame()
    for sc_name, replications in scenario_results.items():
        summary = pd.concat([summary, replications.mean()], axis=1)
        columns.append(sc_name)

    summary.columns = columns
    return summary
# as well as rounding you may want to rename the cols/rows to 
# more readable alternatives.
summary_frame = scenario_summary_frame(scenario_results)
summary_frame.round(2)
base triage+1 exam+1 treat+1 triage+exam
00_arrivals 227.25 227.25 227.25 227.25 227.25
01a_triage_wait 32.56 1.26 32.56 32.56 1.26
01b_triage_util 0.61 0.31 0.61 0.61 0.31
02a_registration_wait 104.69 131.82 104.69 104.69 131.82
02b_registration_util 0.85 0.85 0.85 0.85 0.85
03a_examination_wait 23.36 24.44 0.14 23.36 0.14
03b_examination_util 0.86 0.86 0.67 0.86 0.67
04a_treatment_wait(non_trauma) 130.73 133.09 144.50 2.15 147.81
04b_treatment_util(non_trauma) 0.88 0.88 0.88 0.62 0.88
05_total_time(non-trauma) 229.04 226.38 218.29 187.98 215.06
06a_trauma_wait 166.98 189.67 166.98 166.98 189.67
06b_trauma_util 0.84 0.86 0.84 0.84 0.86
07a_treatment_wait(trauma) 14.39 14.77 14.39 14.39 14.77
07b_treatment_util(trauma) 0.52 0.52 0.52 0.52 0.52
08_total_time(trauma) 306.46 298.22 306.46 306.46 298.22
09_throughput 165.85 166.65 169.10 196.85 169.85

11. Script to produce formatted LaTeX table for paper#

HEADER_URL = 'data/tbl_row_headers.csv'
WAIT = 'wait'

# filter for waiting times only and round to 2dp
waiting_times = summary_frame[summary_frame.index.str.contains(WAIT)].round(2)

# load formatted table headers
row_headers = pd.read_csv(HEADER_URL)

# merge and format headers
waiting_times = waiting_times.reset_index()
waiting_times = pd.concat([row_headers, waiting_times], axis=1)
waiting_times = waiting_times.set_index(row_headers.columns[0])
waiting_times = waiting_times.drop(['index'], axis=1)
waiting_times = waiting_times.reset_index()
waiting_times
Mean waiting time (mins) base triage+1 exam+1 treat+1 triage+exam
0 Triage 32.56 1.26 32.56 32.56 1.26
1 Registation 104.69 131.82 104.69 104.69 131.82
2 Examination 23.36 24.44 0.14 23.36 0.14
3 Non-trauma treatment 130.73 133.09 144.50 2.15 147.81
4 Trauma stabilisation 166.98 189.67 166.98 166.98 189.67
5 Trauma treatment 14.39 14.77 14.39 14.39 14.77
# output to file as LaTeX
OUTPUT_FILE = 'output/table_3.txt'
LABEL = 'tab:table3'
CAPTION = 'Simulation results that can be verified by our example reproducible pipeline.'

waiting_times.to_latex(OUTPUT_FILE, index=False, label=LABEL, caption=CAPTION)

# modify LaTeX file -> convert \caption to \tbl
with fileinput.FileInput(OUTPUT_FILE, inplace=1) as latex_file:
    for line in latex_file:
        line = line.replace('caption', 'tbl')
        print(line, end='')

# view LaTeX to verify
with open(OUTPUT_FILE, "r+") as file1:
    print(file1.read())
\begin{table}
\tbl{Simulation results that can be verified by our example reproducible pipeline.}
\label{tab:table3}
\begin{tabular}{lrrrrr}
\toprule
Mean waiting time (mins) & base & triage+1 & exam+1 & treat+1 & triage+exam \\
\midrule
Triage & 32.560000 & 1.260000 & 32.560000 & 32.560000 & 1.260000 \\
Registation & 104.690000 & 131.820000 & 104.690000 & 104.690000 & 131.820000 \\
Examination & 23.360000 & 24.440000 & 0.140000 & 23.360000 & 0.140000 \\
Non-trauma treatment & 130.730000 & 133.090000 & 144.500000 & 2.150000 & 147.810000 \\
Trauma stabilisation & 166.980000 & 189.670000 & 166.980000 & 166.980000 & 189.670000 \\
Trauma treatment & 14.390000 & 14.770000 & 14.390000 & 14.390000 & 14.770000 \\
\bottomrule
\end{tabular}
\end{table}

End#