Source code for gameanalysis.aggfn

"""An action graph game with additive function nodes"""
import itertools

import numpy as np
import scipy.stats as spt

from gameanalysis import rsgame
from gameanalysis import utils


class _AgfnGame(
    rsgame._CompleteGame
):  # pylint: disable=too-many-instance-attributes,protected-access
    """Action graph with function nodes game

    Action node utilities have additive structure. Function nodes are
    contribution-independent. Graph is bipartite so that function nodes have
    in-edges only from action nodes and vise versa.

    Parameters
    ----------
    role_names : (str,)
        The name of each role.
    strat_names : ((str,),)
        The name of each strategy per role.
    num_role_players : ndarray
        The number of players for each role.
    action_weights : ndarray
        Each entry specifies the incoming weight in the action graph for the
        action node (column).  Must have shape (num_functions, num_strats). The
        action weights for a particular function can't be all zero, otherwise
        that function should not exist.
    function_inputs : ndarray
        Each entry specifies whether the action node (row) is an input to the
        function node (col). Must have shape (num_strats, num_functions).
    function_table : ndarray
        Value of arbitrary functions for a number of players activating the
        function. This can either have shape (num_functions, num_players + 1)
        or (num_functions,) + tuple(num_role_players + 1). The former treats
        different roles ass simply different strategy sets, the later treats
        each nodes inputs as distinct, and so each function maps from the
        number of inputs from each role.
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        role_names,
        strat_names,
        num_role_players,
        action_weights,
        function_inputs,
        function_table,
        offsets,
    ):
        super().__init__(role_names, strat_names, num_role_players)
        self.num_functions, *_ = function_table.shape
        self.action_weights = action_weights
        self.action_weights.setflags(write=False)
        self.function_inputs = function_inputs
        self.function_inputs.setflags(write=False)
        self.function_table = function_table
        self.function_table.setflags(write=False)
        self.offsets = offsets
        self.offsets.setflags(write=False)

        # Pre-compute derivative info
        self._dinputs = np.zeros(
            (self.num_strats, self.num_functions, self.num_roles), bool
        )
        self._dinputs[
            np.arange(self.num_strats), :, self.role_indices
        ] = self.function_inputs
        self._dinputs.setflags(write=False)

        # Compute other bookmarking stuff
        self._basis = np.insert(
            np.cumprod(self.num_role_players[:0:-1] + 1)[::-1], self.num_roles - 1, 1
        )
        self._func_offset = np.arange(self.num_functions) * np.prod(
            self.num_role_players + 1
        )

    @utils.memoize
    def min_strat_payoffs(self):
        """Returns a lower bound on the payoffs."""
        node_table = self.function_table.reshape((self.num_functions, -1))
        minima = node_table.min(1, keepdims=True)
        maxima = node_table.max(1, keepdims=True)
        eff_min = np.where(self.action_weights > 0, minima, maxima)
        mins = np.einsum("ij,ij->j", eff_min, self.action_weights) + self.offsets
        mins.setflags(write=False)
        return mins.view()

    @utils.memoize
    def max_strat_payoffs(self):
        """Returns an upper bound on the payoffs."""
        node_table = self.function_table.reshape((self.num_functions, -1))
        minima = node_table.min(1, keepdims=True)
        maxima = node_table.max(1, keepdims=True)
        eff_max = np.where(self.action_weights > 0, maxima, minima)
        maxs = np.einsum("ij,ij->j", eff_max, self.action_weights) + self.offsets
        maxs.setflags(write=False)
        return maxs.view()

    def get_payoffs(self, profiles):
        """Returns an array of profile payoffs."""
        profiles = np.asarray(profiles, int)
        function_inputs = np.add.reduceat(
            profiles[..., None, :] * self.function_inputs.T, self.role_starts, -1
        )
        inds = function_inputs.dot(self._basis) + self._func_offset
        function_outputs = self.function_table.ravel()[inds]
        payoffs = function_outputs.dot(self.action_weights) + self.offsets
        payoffs[profiles == 0] = 0
        return payoffs

    # TODO override get_dev_payoffs to be more efficient, i.e. only compute the
    # dev payoff.

    def deviation_payoffs(
        self, mixture, *, jacobian=False, **_
    ):  # pylint: disable=too-many-locals
        """Get the deviation payoffs"""
        mixture = np.asarray(mixture, float)
        role_node_probs = np.minimum(
            np.add.reduceat(mixture[:, None] * self.function_inputs, self.role_starts),
            1,
        )[..., None]
        table_probs = np.ones(
            (self.num_roles, self.num_functions) + tuple(self.num_role_players + 1),
            float,
        )
        for i, (num_play, probs) in enumerate(
            zip(self.num_role_players, role_node_probs)
        ):
            role_probs = spt.binom.pmf(np.arange(num_play + 1), num_play, probs)
            dev_role_probs = spt.binom.pmf(np.arange(num_play + 1), num_play - 1, probs)

            new_shape = [self.num_functions] + [1] * self.num_roles
            new_shape[i + 1] = num_play + 1
            role_probs.shape = new_shape
            dev_role_probs.shape = new_shape

            table_probs[:i] *= role_probs
            table_probs[i] *= dev_role_probs
            table_probs[i + 1 :] *= role_probs

        dev_probs = table_probs.repeat(self.num_role_strats, 0)
        for role, (rinps, rdev_probs) in enumerate(
            zip(
                np.split(self.function_inputs, self.role_starts[1:], 0),
                np.split(dev_probs, self.role_starts[1:], 0),
            )
        ):
            rdev_probs[rinps] = np.roll(rdev_probs[rinps], 1, role + 1)
        dev_vals = np.reshape(
            dev_probs * self.function_table, (self.num_strats, self.num_functions, -1)
        )
        devs = np.einsum("ijk,ji->i", dev_vals, self.action_weights) + self.offsets

        if not jacobian:
            return devs

        deriv = np.empty(
            (self.num_roles, self.num_roles, self.num_functions)
            + tuple(self.num_role_players + 1),
            float,
        )
        for i, (num_play, probs, zprob) in enumerate(
            zip(self.num_role_players, role_node_probs, self.zero_prob)
        ):
            # TODO This zprob threshold causes large errors in the jacobian
            # when we look at sparse mixtures. This should probably be
            # addressed, but it's unclear how without making this significantly
            # slower.
            configs = np.arange(num_play + 1)
            der = configs / (probs + zprob) - configs[::-1] / (1 - probs + zprob)
            dev_der = np.insert(
                configs[:-1] / (probs + zprob) - configs[-2::-1] / (1 - probs + zprob),
                num_play,
                0,
                1,
            )

            new_shape = [self.num_functions] + [1] * self.num_roles
            new_shape[i + 1] = num_play + 1
            der.shape = new_shape
            dev_der.shape = new_shape

            deriv[:i, i] = der
            deriv[i, i] = dev_der
            deriv[i + 1 :, i] = der

        dev_deriv = np.rollaxis(deriv, 2, 1).repeat(self.num_role_strats, 0)
        for role, (rinps, rdev_deriv) in enumerate(
            zip(
                np.split(self.function_inputs, self.role_starts[1:], 0),
                np.split(dev_deriv, self.role_starts[1:], 0),
            )
        ):
            rdev_deriv[rinps] = np.roll(rdev_deriv[rinps], 1, role + 2)

        dev_values = dev_probs[:, :, None] * dev_deriv * self.function_table[:, None]
        dev_values.shape = (self.num_strats, self.num_functions, self.num_roles, -1)
        jac = np.einsum(
            "iklm,jkl,ki->ij", dev_values, self._dinputs, self.action_weights
        )
        return devs, jac

    def _add_constant(self, constant):
        off = np.broadcast_to(constant, self.num_roles).repeat(self.num_role_strats)
        return _AgfnGame(
            self.role_names,
            self.strat_names,
            self.num_role_players,
            self.action_weights,
            self.function_inputs,
            self.function_table,
            self.offsets + off,
        )

    def _multiply_constant(self, constant):
        mul = np.broadcast_to(constant, self.num_roles).repeat(self.num_role_strats)
        return _AgfnGame(
            self.role_names,
            self.strat_names,
            self.num_role_players,
            self.action_weights * mul,
            self.function_inputs,
            self.function_table,
            self.offsets * mul,
        )

    def _add_game(self, othr):
        try:
            return _AgfnGame(
                self.role_names,
                self.strat_names,
                self.num_role_players,
                np.concatenate([self.action_weights, othr.action_weights]),
                np.concatenate([self.function_inputs, othr.function_inputs], 1),
                np.concatenate([self.function_table, othr.function_table]),
                self.offsets + othr.offsets,
            )
        except AttributeError:
            return NotImplemented

    def restrict(self, restriction):
        restriction = np.asarray(restriction, bool)
        base = rsgame.empty_copy(self).restrict(restriction)
        action_weights = self.action_weights[:, restriction]
        func_mask = np.any(~np.isclose(action_weights, 0), 1)
        return _AgfnGame(
            base.role_names,
            base.strat_names,
            base.num_role_players,
            action_weights[func_mask],
            self.function_inputs[:, func_mask][restriction],
            self.function_table[func_mask],
            self.offsets[restriction],
        )

    def to_json(self):
        res = super().to_json()

        res["function_inputs"] = [
            self.restriction_to_json(finp) for finp in self.function_inputs.T
        ]

        res["action_weights"] = [self.payoff_to_json(ws) for ws in self.action_weights]

        # XXX This will fail if a role has the name 'value', do we care?
        res["function_tables"] = [
            [
                dict(zip(self.role_names, (c.item() for c in counts)), value=val)
                for val, *counts in zip(
                    tab.ravel(), *np.indices(tab.shape).reshape(self.num_roles, -1)
                )
                if val != 0
            ]
            for tab in self.function_table
        ]

        if not np.allclose(self.offsets, 0):
            res["offsets"] = self.payoff_to_json(self.offsets)

        res["type"] = "aggfn.3"
        return res

    def __repr__(self):
        return "{old}, {nfuncs:d})".format(
            old=super().__repr__()[:-1], nfuncs=self.num_functions
        )

    def __eq__(self, othr):
        return (
            super().__eq__(othr)
            and self.num_functions == othr.num_functions
            and np.allclose(self.offsets, othr.offsets)
            and utils.allclose_perm(
                np.concatenate(
                    [
                        self.action_weights,
                        self.function_inputs.T,
                        self.function_table.reshape(self.num_functions, -1),
                    ],
                    1,
                ),
                np.concatenate(
                    [
                        othr.action_weights,
                        othr.function_inputs.T,
                        othr.function_table.reshape(othr.num_functions, -1),
                    ],
                    1,
                ),
            )
        )

    @utils.memoize
    def __hash__(self):
        return hash(
            (
                super().__hash__(),
                np.sort(utils.axis_to_elem(self.function_inputs.T)).tobytes(),
            )
        )


[docs]def aggfn( # pylint: disable=too-many-arguments num_role_players, num_role_strats, action_weights, function_inputs, function_table, offsets=None, ): """Create an Aggfn with default names Parameters ---------- num_role_players : ndarray The number of players per role. num_role_strats : ndarray The number of strategies per role. action_weights : ndarray, float The action weights. function_inputs : ndarray, bool The input mask for each function. function_table : ndarray, float The function value relative to number of incoming edges. offsets : ndarray, float, optional A constant offset for each strategies payoff. Constant functions are not allowed in the function table as they are clutter, instead, constant functions can be specified here. """ return aggfn_replace( rsgame.empty(num_role_players, num_role_strats), action_weights, function_inputs, function_table, offsets, )
[docs]def aggfn_names( # pylint: disable=too-many-arguments role_names, num_role_players, strat_names, action_weights, function_inputs, function_table, offsets=None, ): """Create an Aggfn with specified names Parameters ---------- role_names : [str] The name of each role. num_role_players : ndarray The number of players for each role. strat_names : [[str]] The name of each strategy for each role. action_weights : ndarray The mapping of each function to the strategy weight for a player. function_inpits : ndarray The mask indicating which strategies are inputs to which function. offsets : ndarray, float, optional A constant offset for each strategies payoff. Constant functions are not allowed in the function table as they are clutter, instead, constant functions can be specified here. """ return aggfn_replace( rsgame.empty_names(role_names, num_role_players, strat_names), action_weights, function_inputs, function_table, offsets, )
# TODO Make aggfn_copy method that will clone the aggfn game if it is one, # else, it will regress on profiles to compute one.
[docs]def aggfn_replace( copy_game, action_weights, function_inputs, function_table, offsets=None ): """Replace an existing game with an Aggfn Parameters ---------- copy_game : RsGame The game to take game structure from. action_weights : ndarray-like The weights of each function to player payoffs. function_inputs : ndarray-like The mask of each strategy to function. function_table : ndarray-like The lookup table of number of incoming edges to function value. offsets : ndarray, float, optional A constant offset for each strategies payoff. Constant functions are not allowed in the function table as they are clutter, instead, constant functions can be specified here. """ if offsets is None: offsets = np.zeros(copy_game.num_strats) action_weights = np.asarray(action_weights, float) function_inputs = np.asarray(function_inputs, bool) function_table = np.asarray(function_table, float) offsets = np.asarray(offsets, float) num_funcs, *one_plays = function_table.shape utils.check(num_funcs > 0, "must have at least one function") utils.check( action_weights.shape == (num_funcs, copy_game.num_strats), "action_weights must have shape (num_functions, num_strats) but got " "{}", action_weights.shape, ) utils.check( function_inputs.shape == (copy_game.num_strats, num_funcs), "function_inputs must have shape (num_strats, num_functions) but got " "{}", function_inputs.shape, ) utils.check( not function_inputs.all(0).any(), "can't have a function with input from every strategy", ) utils.check( function_inputs.any(0).all(), "every function must take input from at least one strategy", ) utils.check( one_plays == list(copy_game.num_role_players + 1), "function_table must have shape " "(num_functions, ... num_role_players + 1) but got {}", function_table.shape, ) utils.check( not np.isclose( function_table.reshape((num_funcs, -1))[:, 0, None], function_table.reshape((num_funcs, -1)), ) .all(1) .any(), "a function can't be constant (all identical values)", ) utils.check( not np.isclose(action_weights, 0).all(1).any(), "a function can't have actions weights of all zero", ) utils.check( offsets.shape == (copy_game.num_strats,), "offsets must have shape (num_strats,) but got {}", offsets.shape, ) return _AgfnGame( copy_game.role_names, copy_game.strat_names, copy_game.num_role_players, action_weights, function_inputs, function_table, offsets, )
[docs]def aggfn_funcs( # pylint: disable=too-many-arguments num_role_players, num_role_strats, action_weights, function_inputs, functions, offsets=None, ): """Construct and Aggfn with functions This is generally less efficient than just constructing the function table using vectorized operations or an existing function table. Parameters ---------- num_role_players : ndarray The number of players per role. num_role_strats : ndarray The number of strategies per role. action_weights : ndarray, float The action weights. function_inputs : ndarray, bool The input mask for each function. functions : [(nr1, nr2, ...) -> float] List of functions that maps the player per role activations to a single value. The number of ordered arguments will be inferred from each function. """ utils.check(functions, "must have at least one function") num_functions = len(functions) base = rsgame.empty(num_role_players, num_role_strats) function_table = np.empty( (num_functions,) + tuple(base.num_role_players + 1), float ) for func, tab in zip(functions, function_table): for play in itertools.product(*map(range, base.num_role_players + 1)): tab[play] = func(*play) return aggfn_replace(base, action_weights, function_inputs, function_table, offsets)
[docs]def aggfn_json(json): # pylint: disable=too-many-locals """Read an Aggfn from json Json versions of the game will generally have 'type': 'aggfn...' in them, but as long as the proper fields exist, this will succeed.""" base = rsgame.empty_json(json) _, version = json.get("type", ".3").split(".", 1) utils.check(version == "3", "parsing versions below 3 is currently unsupported") num_functions = len(json["function_tables"]) function_inputs = np.empty((base.num_strats, num_functions), bool) action_weights = np.empty((num_functions, base.num_strats)) function_table = np.empty((num_functions,) + tuple(base.num_role_players + 1)) offsets = np.empty(base.num_strats) base.payoff_from_json(json.get("offsets", {}), offsets) for inps, jinps in zip(function_inputs.T, json["function_inputs"]): base.restriction_from_json(jinps, inps, verify=False) for weights, jweights in zip(action_weights, json["action_weights"]): base.payoff_from_json(jweights, weights) function_table.fill(0) for table, jtable in zip(function_table, json["function_tables"]): for elem in jtable: copy = elem.copy() value = copy.pop("value") table[tuple(int(i) for i in base.role_from_json(copy))] = value return aggfn_replace(base, action_weights, function_inputs, function_table, offsets)