pymdp Fundamentals

Open in Colab

Author: Conor Heins

Note: When running this notebook in Google Colab, you may have to run !pip install inferactively-pymdp at the top of the notebook, before you can import pymdp. That cell is left commented out below, in case you are running this notebook from Google Colab.

# ! pip install inferactively-pymdp

Very brief pymdp and active inference background

pymdp is a Python library, written primarily in NumPy, for solving Partially-Observed Markov Decision Processes (POMDPs) using active inference, a modelling framework for perception, learning and action, originally derived from the Free Energy Principle. POMDP models have traditionally been used in disciplines ranging from control theory and robotics, to biological fields concerned with empirically modelling human and animal behavior.

For a theoretical overview of active inference and the motivations for developing this package, please see our companion paper: “pymdp: A Python library for active inference in discrete state spaces”.

More in-depth coverage of applying active inference to POMDPs can be found in the following two papers: Active Inference: A Process Theory and Active Inference in Discrete State Spaces: A Synthesis

Below are links to a few useful papers, talks, and blog posts that concern active inference and the free energy principle more broadly:

Also see Jared Tumiel’s blog post and Beren Millidge’s github repository for more literature and pedagogical materials related to active inference and the free energy principle.

Data structures in pymdp

In this tutorial notebook we’ll be walking through the core functionalities and data structures used by pymdp, most of which will be easy to grasp for anyone who’s familiar with array programming in numpy or even MATLAB, where much of the existing code related to active inference currently exists.

We’ll start by running through a full usage example of quickly building and running a random active inference agent in pymdp.

In the interest of focusing on data structures and syntax, we’ll just initialize the agent to have a random POMDP generative model – we don’t need to analogize this generative model to any hypothetical task environment that our agent will be behaving within. The agent will just have random beliefs about how the world evolves and how it gives rise to observations. The focus here is on how we store these objects (e.g. components of the generative model) in pymdp.

For more in-depth tutorials that actually work through full, tutorial-style examples of building active inference agents and running them, please see the Active inference from scratch tutorial and Using the agent class tutorial.

Other examples in the Examples section (e.g. the T-Maze demo or the Epistemic chaining demo provide walk-throughs of building active inference agents in particular task environments.

""" Quickly build a random active inference agent, give it an observation and have it do hidden state and policy inference """

import pymdp
from pymdp import utils
from pymdp.agent import Agent

num_obs = [3, 5] # observation modality dimensions
num_states = [4, 2, 3] # hidden state factor dimensions
num_controls = [4, 1, 1] # control state factor dimensions
A_array = utils.random_A_matrix(num_obs, num_states) # create sensory likelihood (A matrix)
B_array = utils.random_B_matrix(num_states, num_controls) # create transition likelihood (B matrix)

C_vector = utils.obj_array_uniform(num_obs) # uniform preferences

# instantiate a quick agent using your A, B and C arrays
my_agent = Agent( A = A_array, B = B_array, C = C_vector)

# give the agent a random observation and get the optimized posterior beliefs

observation = [1, 4] # a list specifying the indices of the observation, for each observation modality

qs = my_agent.infer_states(observation) # get posterior over hidden states (a multi-factor belief)

# Do active inference

q_pi, neg_efe = my_agent.infer_policies() # return the policy posterior and return (negative) expected free energies of each policy as well

action = my_agent.sample_action() # sample an action from the posterior over policies

numpy object arrays

In pymdp, both generative model distributions (e.g. A or B) as well as posterior distributions over hidden states (e.g. qs) and observations are represented as what we call “object arrays”. Note that other conventions may refer to these arrays as “arrays-of-arrays”, “nested arrays”, or “ragged/jagged arrays.”

Object arrays differ from more “classical” numpy.ndarrays that have numerical (e.g. np.float64) data types. The typical numpy.ndarray of float or integer data type, can only have scalar values as its entries. Object arrays (numpy.ndarray with dtype = object) however, do not have this restriction. Their array elements can be arbitrary Python data structures or objects. The entries of an object array can thus have arbitrary type and dimensionality.

Deeper dive on object arrays

Object arrays can be initialized standard numpy constructors, but by also explicitly declaring dtype = object.

my_empty_array = np.empty(5, dtype = object)

The pymdp.utils module provides a set of functions in that allow you to quickly construct, convert between, and perform mathematical operations on these object arrays. Fundamentally underpinning these utils operations is the assumption that the elements of the input object array are numpy.ndarrays with numerical data type. In other words, we simply use object arrays to store collections of multi-dimensional arrays or standard numpy.ndarrays. This construction is chosen to allow pymdp to easily perform probabilistic computations on collections of multi-dimensional arrays, where each constituent array within the collection represents a particular factor or marginal distribution within a factorized representation. In the next section we’ll get into why this constructino is useful and how it’s mathematically motivated.

Constructing factorized distributions with object arrays

For example, let’s imagine a joint categorical probability distribution over some variable $\mathbf{s}$ that factorizes into the product of a set of 3 marginal (i.e. univariate) distributions. Mathematically, we can express that factorization as:

$$ P(\mathbf{s}) = \prod_i^3 P(s_i) $$

In pymdp, instead of encoding this distribution as the fully-enumerated product of marginals $P(\mathbf{s}) = P(s_1) \times P(s_2) \times P(s_3)$, we instead exploit the factorization and only store the separate marginals $P(s_i)$ of the full distribution. Specifically, we store each marginal as a single numpy.ndarray of ndim == 1 (i.e. a row vector) in a different entry of a numpy.ndarray of dtype object. The object array representation be easily constructed and manipulated using utils functions.

num_factors = 3 # we have 3 factors total

dim_of_each_factor = [2, 2, 4] # factor 1 has 2 levels, factor 2 has 2 levels, factor 4 has 4 levels

P_s = utils.random_single_categorical(dim_of_each_factor)

print(P_s[0])
[0.73090597 0.26909403]

Here we used the utility function random_single_categorical(shape_list) to create a random collection of marginal categorical distributions, representing the different factorized components of the full distribution $P(\mathbf{s})$.

Each of these marginal categorical distributions is stored in an according entry of the object array P_s.

This representation is advantageous for several reasons, but one of the main advantages is sparsity / memory overhead. If we had to store the entire joint distribution $P(\mathbf{s})$ as a product of all the marginals, we could either store that as a single 1D categorical distribution with 2 * 2 * 4 different levels (the combinatorics of all the hidden state dimensionalities), or as a multi-dimensional array with size (2, 2, 4), where each entry of the joint Ps[i,j,k] stores the joint probability of $P(s^1 = i, s^2 = j, s^3 = k)$. Either way, we’d have to store polynomially more values $2 \times 2 \times 4$ is twice the size of $2 + 2 + 4$. But since we know that the marginals are by construction independent, we can just store them separately without losing any information.

Side note: Here’s an alternative way to generate multi-factor categorical distributions.

if you knew what your factors were to begin with, and you wanted to construct the full distribution from the factors, you could use another utility function to build the object array from a list of the factorized distributions.

import numpy as np

ps1 = np.array([0.5, 0.5])
ps2 = np.array([0.75, 0.25])
ps3 = np.array([0.5, 1./6., 1./6., 1./6.])

P_s = utils.obj_array_from_list([ps1, ps2, ps3])

print(P_s[2])
[0.5        0.16666667 0.16666667 0.16666667]

Object arrays and conditional distributions

Understanding the representation of factorized probability distributions as object arrays is critical for understanding and constructing generative models in pymdp. This object array representation applies not only to multi-factor, unconditional categorical distributions (like the example above: a joint categorical distribution being factorized into a product of univariate marginals), but also is used to represent collections of conditional categorical distributions.

In particular, we use object arrays to encode the observation and transition models (also known as the observation likelihood and transition likelihood) of the agent’s generative model. Object arrays are necessary in this case, due to the convention of factorizing the observation space into multiple observation modalities and the hidden states into multiple hidden state factors. Mathematically, this can be expressed as follows:

$$ \mathbf{o}t = \prod{i = 1}^{M} \delta_{O_i}(o^{i}t) \hspace{15 mm} \mathbf{s}t = \prod{i = 1}^{F} \delta{S_i}(s^{i}_t) \ $$

where any particular observation $\mathbf{o}_t$ is actually factorized into a product of Dirac delta functions over different modality-specific observations $o^m_t$, i.e. “one-hot” vectors or unit vectors in $\mathbb{R}^{O_m}$. The same factorization applies to multi-factorial hidden states $\mathbf{s}t$. The notation $\delta{X_i}(x^{i})$ denotes a Dirac delta function with discrete support $X_i$ evaluated at point $x^i$.

This factorization of observations across modalities and hidden states across hidden state factors, carries forward into the specification of the A and B arrays, the representation of the conditional distributions $P(\mathbf{o}_t|\mathbf{s}t)$ and $P(\mathbf{s}t|\mathbf{s}{t-1}, \mathbf{u}{t-1})$ in pymdp. These two arrays of conditional distributions can also be factorized by modality and factor, respectively.

The A array, for instance, contains the agent’s observation model, that relates hidden states $\mathbf{s}_t$ to observations $\mathbf{o}_t$:

$$ \mathbf{A} = {A^1, A^2, …, A^M }, \hspace{5mm} A^m = P(o^m_t | s^1_t, s^2_t, …, s^F_t) $$

Therefore, we represent the A array as an object array whose constituent arrays are multidimensional numpy.ndarrays that encode conjunctive relationships between combinations of hidden states and observations.

This is best explained using the example in the original example code at the top of this tutorial. It is custom to build lists of the dimensionalities of the modalities (resp. hidden state factors) of your model, typically using lists named num_obs (for the dimensionalities of the observation modalities) and num_states (dimensionalities of the hidden state factors). These lists can then be used to automatically construct the A array with the correct shape.

num_obs = [3, 5] # observation modality dimensions
num_states = [4, 2, 3] # hidden state factor dimensions

A_array = utils.random_A_matrix(num_obs, num_states) # create sensory likelihood (A array)

for m, _ in enumerate(num_obs):
    print(f'Dimensionality of A array for modality {m + 1}: {A_array[m].shape}')
Dimensionality of A array for modality 1: (3, 4, 2, 3)
Dimensionality of A array for modality 2: (5, 4, 2, 3)

For example, A_array[0] stores the conditional relationships between the hidden states $\mathbf{s}$ and observations within the first modality $o^1_t$, which has dimensionality 3. This explains why the shape of A_array[0] is (3, 4, 2, 3) – it stores the conditional relationships between each setting of the hidden state factors (which have dimensionalities [4, 2, 3]) and the observations within the first modality, which has dimensionality 3. Crucially, each sub-array A[m] stores the conditional dependencies between all the hidden state factor combinations (configurations of $s^1, s^2, …, s^F$) and the observations along modality m.

In this case, we used the pymdp.utils function random_A_matrix() to generate a random A array, but in most cases users will want to design their own bespoke observation models (or at least initialize them to some reasonable starting values). In such a scenario, the usual route is to initialize the A array to a series of identically-valued multidimensional arrays (e.g. arrays filled with 0’s or uniform values), and then fill out the conditional probability entries “by hand”, according to the task the user is interested in modelling.

For this purpose, utility functions like obj_array_zeros and obj_array_uniform come in handy. These functions takes as inputs list of shapes, where each shape contains the dimensionality (e.g. [2, 3, 4]) of one of the multi-dimensional arrays that will populate the final object array. For example, creating this shape list for the A array, given num_obs and num_states is quite straightforward:

A_shapes = [[o_dim] + num_states for o_dim in num_obs]

# initialize the A array to all 0's
empty_A = utils.obj_array_zeros(A_shapes)

# initialize the A array to uniform distributions as columns
# uniform_A = utils.obj_array_uniform(A_shapes)

# then fill out your A matrix elements as appropriate, depending on the task
# empty_A[0][:,0,1,2] = np.array([0.7, 0.15, 0.15])
# empty_A[0][:,...] = ....

Similarly, we can create B arrays that encode the temporal dependencies between the hidden state factors over time, further conditioned on control factors (or state factors that correspond to action-states). Mathematically, the B arrays are expressed as follows:

$$ \mathbf{B} = {B^1, B^2, …, B^F }, \hspace{5mm} B^f = P(s^f_t | s^f_{t-1}, u^f_{t-1}) $$

where $u^f_{t-1}$ denotes the control state for control factor $f$, taken at time $t-1$.

We can construct these matrices this by introducing an additional list num_controls that stores the dimensionalities of each control factor, which in combination with num_states can be used to specify the B array.

num_states = [4, 2, 3] # hidden state factor dimensions
num_controls = [4, 1, 1] # control state factor dimensions

B_array = utils.random_B_matrix(num_states, num_controls) # create transition likelihood (B matrix)

for f, _ in enumerate(num_states):
    print(f'Dimensionality of B array for hidden state factor {f + 1}: {B_array[f].shape}')
Dimensionality of B array for hidden state factor 1: (4, 4, 4)
Dimensionality of B array for hidden state factor 2: (2, 2, 1)
Dimensionality of B array for hidden state factor 3: (3, 3, 1)

As we can see here, each factor-specific B array (e.g. B[2]) has shape (num_states[f], num_states[f], num_controls[f]), encoding the conditional dependencies between states for a given factor f at subsequent timepoints, further conditioned on actions or control states along that factor. This construction means that hidden state factors are statistically independent of one another – i.e. hidden state factor f is only affected by its own state at the previous timestep, as well as the state of the f-th control factor.

And of course, as with the A array, we can also pre-allocate our B arrays to have the correct shape and then fill them out by hand, using the following template:

B_shapes = [[s_dim, s_dim, num_controls[f]] for f, s_dim in enumerate(num_states)]

# initialize the B array to all 0's
empty_B = utils.obj_array_zeros(B_shapes)

# initialize the B array to uniform distributions as columns
# uniform_B = utils.obj_array_uniform(B_shapes)

# then assign your B matrix elements as appropriate, depending on the task
# empty_B[0][0,:,0] = 1.
# empty_B[0][1,:,1] = 1.

Handy functions for dealing with object arrays.

The utils library is also equipped with a set of handy functions for dealing with object arrays – this includes operations like normalization and sampling.

Normalization

For instance, let’s imagine you generated an A array by hand with the following structure:

num_obs = [3]

num_states = [3, 2]

A_array = utils.obj_array(len(num_obs)) # create an empty object array with as many elements as there are observation modalities (here, only 1 modality)

A_array[0] = np.zeros(num_obs + num_states) # initialize the likelihood mapping for the single modality

A_array[0][:,:,0] = np.eye(3) # assign the mapping between hidden states of factor 0 and observations, given factor 1 == 0
A_array[0][0,:,1] = 0.5       # assign the mapping between hidden states of factor 0 and observation of level 0, given factor 1 == 1

This A array corresponds to an observation model with a single modality and two hidden state factors.

We can check if this A array is a proper conditional distribution by using the utils function is_normalized, which checks to make sure each column of each sub-array within A sums to 1

utils.is_normalized(A_array)
False

This A is not properly normalized because of this line:

A_array[0][0,:,1] = 0.5   

This is problematic, because we’ve only assigned a single outcome to have 50% probability under all the hidden state configurations of factor 0, given factor 1 == level 1. That means each column of A_array[0][:,:,1] is not a proper conditional distribution, since there’s still 50% of ‘unaccounted’ probability. In particular, the distributions encoded in the columns of this array do not each sum to 1.

We can fix this by hand (by changing our code to allocate the remaining 50% of probability for each of those columns) or, depending on the intended final distributions, by using the norm_dist_obj_arr function, which column-normalizes every numpy.ndarray contained within an object array.

A_array = utils.norm_dist_obj_arr(A_array)

print(f'P(o0 | s0, s1 == 1) after normalization: \n {A_array[0][:,:,1]}\n')

print(f'Is the full distribution is now normalized? {"Yes!" if utils.is_normalized(A_array) else "No!"}')
P(o0 | s0, s1 == 1) after normalization: 
 [[1. 1. 1.]
 [0. 0. 0.]
 [0. 0. 0.]]

Is the full distribution is now normalized? Yes!

Sampling

Sometimes we will also want to sample from categorical probability distributions, for instance when sampling observations from the generative process or sampling actions from the agent’s posterior over policies. We have two functions in utils, sample and sample_obj_arr that can deal with sampling from either single probability distributions (numpy.ndarray of ndim == 1) as well as collections of factorized distributions, stored in object arrays.

# sample from a single random categorical distribution

my_categorical = np.random.rand(3)
my_categorical = utils.norm_dist(my_categorical) # the `utils.norm_dist` function works on single vectors or matrices that represent non-factorized probability distributions
sampled_outcome = utils.sample(my_categorical)
print(sampled_outcome)
0
# sample from a multi-factor categorical distribution

my_multifactor_categorical = utils.random_single_categorical([3, 4, 10]) # creates a collection of 3 marginal categorical distributions, each of different dimension

sampled_outcome_list = utils.sample_obj_array(my_multifactor_categorical)

print(sampled_outcome_list)
[1, 0, 5]

Note that when sampling from categorical distributions, the returned values are integers that represent the index of the support (i.e. which “outcome” or “level”) that is sampled. In the case of multi-factor distributions, the returned value is a list of integers, one sampled outcome per marginal categorical distribution

Conditional expectation

We can also compute conditional expectations of single random variables that depend on a multi-factor random variables using a function from the pymdp.maths utility package called spm_dot, that is based on the original function of the same name from SPM, where active inference was first implemented.

Let’s imagine we have an observation model as above, with one observation modality with dimensionality 3 and two hidden state factors, with dimensionalities 3 and 2. Given a factorized distribution over the two hidden state factors (i.e. an object array of marginal categorical distributions, one with 3 levels and the other with 2 levels), we want to generate a conditional expectation over the values of the observation distribution with dimension 3. In other words, we want to evaluate:

$$ \mathbb{E}{P(\mathbf{s})}[P(o | \mathbf{s})] = \mathbb{E}{P(s^1)P(s^2)}[P(o^1 | s^1, s^2)] $$

given a conditional distribution $P(o^1 | s^1, s^2)$ and some “priors” $P(s^1), P(s^2)$ over hidden states.

Implementing this conditional expectation in practice can be written as a multi-dimensional dot product of the modality-specific A array (A[0]) with each of the hidden state factor distributions, $P(s^1)$ and $P(s^2)$. Under the hood, the spm_dot function of pymdp.maths achieves this using an optimized numpy function called einsum, or einstein summation. Let’s see what computing this conditional expectation with spm_dot looks like in practice:

from pymdp import maths

num_obs = [3] # dimensionalities of observation modalities (just one here, for simplicity)
num_states = [3, 2] # dimensionalities of hidden state factors 

A_shapes = [num_obs + num_states] # shape of the A arrays (only one shape needed, (3, 3, 2))
A_array = utils.obj_array_zeros(A_shapes) # create our conditional distribution using the provided shape list

A_array[0][:,:,0] = np.eye(3) # when s2 == 0, s1 fully determines the value of o1
A_array[0][1,:,1] = 1.        # when s2 == 1, the value of o1 is always 2, regardless of the setting of s1

Ps = utils.obj_array_from_list([np.array([0.5, 0.0, 0.5]), np.array([0.75, 0.25])]) # create our "prior" or beliefs about hidden states, that will be used in the expectation

E_o_ps = maths.spm_dot(A_array[0], Ps)

print(f"Expected distribution over observations, given the value of Ps:\n {E_o_ps.reshape(-1,1)}")
Expected distribution over observations, given the value of Ps:
 [[0.375]
 [0.25 ]
 [0.375]]