Source code for gameanalysis.paygame

"""Module for games with potentially sparse payoff data"""
# pylint: disable=too-many-lines
import contextlib
import itertools
import logging
from collections import abc

import numpy as np
import numpy.random as rand
import scipy.special as sps

from gameanalysis import rsgame
from gameanalysis import utils


# TODO For real games, there does seem to be a memory benefit (3-10x) for using
# sparse matrices. This is likely due to the fact that for real games we
# prioritize low support sampling. scipy sparse isn't a great api for this
# usage, but maybe there are things we can do to make this more feasible. Any
# implementation should probably still be based around scipy sparse, so we
# should check speed too before doing anything drastic.
# However, it is worth noting that the density of a complete profile or payoff
# matrix is \frac{\sum_r \frac{s_r n_r}{s_r + n_r - 1}}{\sum_r s_r}. This means
# that the density goes asymptotically to 1 as the number of players increases,
# but to 0 as the strategies goes to infinity, however, strategies are
# generally more fixed, and players are more limiting. Also, for a single role
# game, the number of strategies would have to be more than 3x the number of
# players to get a benefit, which is infeasible in most circumstances. What
# this ultimately implies is that there's not an asymptotic argument to support
# sparsity, so it should probably be done on a case by case basis.


class _Game(rsgame._RsGame):  # pylint: disable=protected-access
    """Role-symmetric data game representation

    This representation uses a sparse mapping from profiles to payoffs for role
    symmetric games. This allows it to capture arbitrary games, as well as
    games that are generated from data.  Payoffs for specific players in a
    profile can be nan to indicate they are missing. The profiles will not be
    listed in `num_complete_profiles` or counted as `in` the game, but their
    data can be accessed via `get_payoffs`, and they will be used for
    calculating deviation payoffs if possible.

    Parameters
    ----------
    role_names : (str,)
        The name of each role.
    strat_names : ((str,),)
        The name of each strategy for each role.
    num_role_players : ndarray
        The number of players per role.
    profiles : ndarray, (num_payoffs, num_strats)
        The profiles for the game. These must be unique, and all valid for the
        game.
    payoffs : ndarray, (num_payoffs, num_strats)
        The payoffs for the game. This must contain zeros for profile, strategy
        pairs that are not played (i.e. zero). All valid payoffs for a profile
        can't be nan, the profile should be omitted instead.
    """

    def __init__(  # pylint: disable=too-many-arguments
        self, role_names, strat_names, num_role_players, profiles, payoffs
    ):
        super().__init__(role_names, strat_names, num_role_players)
        self._profiles = profiles
        self._profiles.setflags(write=False)
        self._payoffs = payoffs
        self._payoffs.setflags(write=False)
        self._num_profiles = profiles.shape[0]

        # compute log dev reps
        player_factorial = np.sum(sps.gammaln(profiles + 1), 1)
        totals = np.sum(sps.gammaln(self.num_role_players + 1)) - player_factorial
        with np.errstate(divide="ignore"):
            self._dev_reps = (
                totals[:, None]
                + np.log(profiles)
                - np.log(self.num_role_players).repeat(self.num_role_strats)
            )
        self._dev_reps.setflags(write=False)

        # Add profile lookup
        self._profile_map = dict(zip(map(utils.hash_array, profiles), payoffs))
        if np.isnan(payoffs).any():
            self._complete_profiles = frozenset(
                prof
                for prof, pay in self._profile_map.items()
                if not np.isnan(pay).any()
            )
        else:  # Don't need to store duplicate lookup object
            self._complete_profiles = self._profile_map
        self._num_complete_profiles = len(self._complete_profiles)

    @property
    def num_profiles(self):
        return self._num_profiles

    @property
    def num_complete_profiles(self):
        return self._num_complete_profiles

    def profiles(self):
        return self._profiles.view()

    def payoffs(self):
        return self._payoffs.view()

    @utils.memoize
    def min_strat_payoffs(self):
        """Returns the minimum payoff for each role"""
        if not self.num_profiles:
            pays = np.full(self.num_strats, np.nan)
        else:
            pays = np.fmin.reduce(
                np.where(self._profiles > 0, self._payoffs, np.nan), 0
            )
        pays.setflags(write=False)
        return pays

    @utils.memoize
    def max_strat_payoffs(self):
        """Returns the maximum payoff for each role"""
        if not self.num_profiles:
            pays = np.full(self.num_strats, np.nan)
        else:
            pays = np.fmax.reduce(
                np.where(self._profiles > 0, self._payoffs, np.nan), 0
            )
        pays.setflags(write=False)
        return pays

    def get_payoffs(self, profiles):
        """Returns an array of profile payoffs

        If profile is not in game, an array of nans is returned where profile
        has support."""
        profiles = np.asarray(profiles, int)
        utils.check(self.is_profile(profiles).all(), "profiles must be valid")
        prof_view = profiles.reshape((-1, self.num_strats))
        payoffs = np.empty(prof_view.shape, float)
        for prof, pay in zip(prof_view, payoffs):
            hashed = utils.hash_array(prof)
            if hashed not in self._profile_map:
                pay[prof == 0] = 0
                pay[prof > 0] = np.nan
            else:
                np.copyto(pay, self._profile_map[hashed])
        return payoffs.reshape(profiles.shape)

    def deviation_payoffs(  # pylint: disable=too-many-statements,too-many-branches,too-many-locals,arguments-differ
        self, mixture, *, jacobian=False, ignore_incomplete=False, **_
    ):
        """Computes the expected value of deviating

        More specifically, this is the expected payoff of playing each pure
        strategy played against all opponents playing mix.

        Parameters
        ----------
        mixture : ndarray
            The mix all other players are using
        jacobian : bool
            If true, the second returned argument will be the jacobian of the
            deviation payoffs with respect to the mixture. The first axis is
            the deviating strategy, the second axis is the strategy in the mix
            the derivative is taken with respect to. For this to be calculated
            correctly, the game must be complete. Thus if the game is not
            complete, this will be all nan.
        ignore_incomplete : bool, optional
            If True, a "best estimate" will be returned for incomplete data.
            This means that instead of marking a payoff where all deviations
            aren't known as nan, the probability will be renormalized by the
            mass that is known, creating a biased estimate based of the data
            that is present.
        """
        mixture = np.asarray(mixture, float)
        supp = mixture > 0
        nan_mask = np.empty_like(mixture, dtype=bool)

        # Fill out mask where we don't have data
        if ignore_incomplete or self.is_complete():
            nan_mask.fill(False)
        elif self.is_empty():
            nan_mask.fill(True)
        else:
            # These calculations are approximate, but for games we can do
            # anything with, the size is bounded, and so numeric methods are
            # actually exact.
            strats = np.add.reduceat(supp, self.role_starts)
            devs = self._profiles[:, ~supp]
            num_supp = utils.game_size(self.num_role_players, strats).prod()
            dev_players = self.num_role_players - np.eye(self.num_roles, dtype=int)
            role_num_dev = utils.game_size(dev_players, strats).prod(1)
            num_dev = role_num_dev.repeat(self.num_role_strats)[~supp]

            nan_mask[supp] = np.all(devs == 0, 1).sum() < num_supp
            nan_mask[~supp] = devs[devs.sum(1) == 1].sum(0) < num_dev

        # Compute values
        if not nan_mask.all():
            # zero_prob effectively makes 0^0=1 and 0/0=0.
            zmix = mixture + self.zero_prob.repeat(self.num_role_strats)
            log_mix = np.log(zmix)
            prof_prob = self._profiles.dot(log_mix)[:, None]
            with np.errstate(under="ignore"):
                # Ignore underflow caused when profile probability is not
                # representable in floating point.
                probs = np.exp(prof_prob + self._dev_reps - log_mix)

            if ignore_incomplete:
                # mask out nans
                mask = np.isnan(self._payoffs)
                payoffs = np.where(mask, 0, self._payoffs)
                probs[mask] = 0
            else:
                payoffs = self._payoffs

            # Mask out nans
            zprob = self.zero_prob.dot(self.num_role_players)
            # TODO This threshold causes large errors in the jacobian when we
            # look at sparse mixtures. This should probably be addressed, but
            # it's unclear how without making this significantly slower.
            nan_pays = np.where(probs > zprob, payoffs, 0)
            devs = np.einsum("ij,ij->j", probs, nan_pays)
            devs[nan_mask] = np.nan

        else:
            devs = np.full(self.num_strats, np.nan)

        if ignore_incomplete:
            tprobs = probs.sum(0)
            tsupp = tprobs > 0
            devs[tsupp] /= tprobs[tsupp]
            devs[~tsupp] = np.nan

        if not jacobian:
            return devs

        if ignore_incomplete or not nan_mask.all():
            dev_profs = self._profiles[:, None] - np.eye(self.num_strats, dtype=int)
            dev_jac = np.einsum("ij,ij,ijk->jk", probs, nan_pays, dev_profs) / zmix
            if ignore_incomplete:
                dev_jac -= (
                    np.einsum("ij,ijk->jk", probs, dev_profs) * devs[:, None] / zmix
                )
                dev_jac[tsupp] /= tprobs[tsupp, None]
                dev_jac[~tsupp] = np.nan
            # TODO This is a little conservative and could be relaxed but would
            # require extra computation
            if not self.is_complete():
                dev_jac[nan_mask | ~supp] = np.nan
        else:
            dev_jac = np.full((self.num_strats,) * 2, np.nan)

        return devs, dev_jac

    def restrict(self, restriction):
        """Remove possible strategies from consideration"""
        restriction = np.asarray(restriction, bool)
        base = rsgame.empty_copy(self).restrict(restriction)
        prof_mask = ~np.any(self._profiles * ~restriction, 1)
        profiles = self._profiles[prof_mask][:, restriction]
        payoffs = self._payoffs[prof_mask][:, restriction]
        return _Game(
            base.role_names, base.strat_names, base.num_role_players, profiles, payoffs
        )

    def _add_constant(self, constant):
        with np.errstate(invalid="ignore"):
            new_pays = self._payoffs + np.broadcast_to(constant, self.num_roles).repeat(
                self.num_role_strats
            )
        new_pays[self._profiles == 0] = 0
        return _Game(
            self.role_names,
            self.strat_names,
            self.num_role_players,
            self._profiles,
            new_pays,
        )

    def _multiply_constant(self, constant):
        with np.errstate(invalid="ignore"):
            new_pays = self._payoffs * np.broadcast_to(constant, self.num_roles).repeat(
                self.num_role_strats
            )
        return _Game(
            self.role_names,
            self.strat_names,
            self.num_role_players,
            self._profiles,
            new_pays,
        )

    def _add_game(self, othr):
        with np.errstate(invalid="ignore"):
            new_pays = self._payoffs + othr.get_payoffs(self._profiles)
        mask = np.any((~np.isnan(new_pays)) & (self._profiles > 0), 1)
        return _Game(
            self.role_names,
            self.strat_names,
            self.num_role_players,
            self._profiles[mask],
            new_pays[mask],
        )

    def __contains__(self, profile):
        """Returns true if all data for that profile exists"""
        return utils.hash_array(np.asarray(profile, int)) in self._complete_profiles

    def profile_from_json(self, prof, dest=None, *, verify=True):
        """Read a profile from json

        A profile is an assignment from role-strategy pairs to counts. This
        method reads from several formats as specified in parameters.

        Parameters
        ----------
        prof : json
            A description of a profile in a number of formats. The correct
            format will be auto detected and used. The most common are {role:
            {strat: count}}, {role: [(strat, count, payoff)]},
            {symmetry_groups: [{role: role, strategy: strategy, count:
            count}]}.
        dest : ndarray, optional
            If supplied, ``dest`` will be written to instead of allocating a
            new array.
        """
        if dest is None:
            dest = np.empty(self.num_strats, int)
        else:
            utils.check(dest.dtype.kind == "i", "dest dtype must be integral")
            utils.check(
                dest.shape == (self.num_strats,), "dest shape must be num_strats"
            )
        dest.fill(0)

        try:
            # To parse as format that contains both data types
            self.profpay_from_json(prof, dest_prof=dest, verify=False)
        except ValueError:
            # Only remaining format is straight dictionary
            super().profile_from_json(prof, dest=dest, verify=False)

        utils.check(
            not verify or self.is_profile(dest), '"{}" is not a valid profile', prof
        )
        return dest

    def profile_to_assignment(self, prof):
        """Convert a profile to an assignment string"""
        return {
            role: list(
                itertools.chain.from_iterable(
                    itertools.repeat(strat, val.item())
                    for strat, val in zip(strats, counts)
                )
            )
            for counts, role, strats in zip(
                np.split(prof, self.role_starts[1:]), self.role_names, self.strat_names
            )
            if np.any(counts > 0)
        }

    def payoff_from_json(
        self, pays, dest=None, *, verify=True
    ):  # pylint: disable=arguments-differ
        """Read a set of payoffs from json

        Parameters
        ----------
        pays : json
            A description of a set of payoffs in a number of formats
        dest : ndarray, optional
            If supplied, ``dest`` will be written to instead of allocating a
            new array.
        """
        if dest is None:
            dest = np.empty(self.num_strats, float)
        else:
            utils.check(dest.dtype.kind == "f", "dest dtype must be floating")
            utils.check(
                dest.shape == (self.num_strats,), "dest shape must be num strats"
            )
        dest.fill(0)

        try:
            # To parse as format that contains both data types
            self.profpay_from_json(pays, dest_pays=dest, verify=verify)
        except ValueError:
            # Only remaining format is straight dictionary
            super().payoff_from_json(pays, dest=dest)

        return dest

    def profpay_from_json(self, prof, dest_prof=None, dest_pays=None, *, verify=True):
        """Read json as a profile and a payoff"""
        if dest_prof is None:
            dest_prof = np.empty(self.num_strats, int)
        if dest_pays is None:
            dest_pays = np.empty(self.num_strats, float)
        dest_prof.fill(0)
        dest_pays.fill(0)

        # observations but no data
        if not prof.get("observations", True):
            self._profpay_from_json_empty_obs(prof, dest_prof, dest_pays)
        # summary format
        elif "observations" not in prof and "symmetry_groups" in prof:
            self._profpay_from_json_summ(prof, dest_prof, dest_pays)
        # observations format
        elif "observations" in prof and "symmetry_groups" in prof["observations"][0]:
            self._profpay_from_json_obs(prof, dest_prof, dest_pays)
        # full format
        elif "observations" in prof:
            self._profpay_from_json_full(prof, dest_prof, dest_pays)
        # observation from simulation
        elif "players" in prof:
            self._profpay_from_json_observation(prof, dest_prof, dest_pays)
        # dict payoff
        elif all(not isinstance(v, abc.Mapping) for v in prof.values()):
            self._profpay_from_json_dict(prof, dest_prof, dest_pays)
        # error
        else:
            raise ValueError("unknown format")

        utils.check(
            not verify or self.is_profile(dest_prof),
            '"{}" does not define a valid profile',
            prof,
        )
        return dest_prof, dest_pays

    def _profpay_from_json_empty_obs(self, prof, dest_prof, dest_pays):
        """Get profile and payoff from empty observations format"""
        for symgrp in prof["symmetry_groups"]:
            _, role, strat, count, _ = _unpack_symgrp(**symgrp)
            index = self.role_strat_index(role, strat)
            dest_prof[index] = count
            dest_pays[index] = np.nan

    def _profpay_from_json_summ(self, prof, dest_prof, dest_pays):
        """Get profile and payoff from summary format"""
        for symgrp in prof["symmetry_groups"]:
            _, role, strat, count, pay = _unpack_symgrp(**symgrp)
            index = self.role_strat_index(role, strat)
            dest_prof[index] = count
            dest_pays[index] = pay

    def _profpay_from_json_obs(
        self, prof, dest_prof, dest_pays
    ):  # pylint: disable=too-many-locals
        """Get profile and payoff from observations format"""
        ids = {}
        for symgrp in prof["symmetry_groups"]:
            i, role, strat, count, _ = _unpack_symgrp(**symgrp)
            index = self.role_strat_index(role, strat)
            ids[i] = index
            dest_prof[index] = count

        for j, obs in enumerate(prof["observations"], 1):
            for symgrp in obs["symmetry_groups"]:
                i, pay = _unpack_obs(**symgrp)
                k = ids[i]
                dest_pays[k] += (pay - dest_pays[k]) / j

    def _profpay_from_json_full(
        self, prof, dest_prof, dest_pays
    ):  # pylint: disable=too-many-locals
        """Get profile and payoff from full format"""
        ids = {}
        for symgrp in prof["symmetry_groups"]:
            i, role, strat, count, _ = _unpack_symgrp(**symgrp)
            index = self.role_strat_index(role, strat)
            ids[i] = index
            dest_prof[index] = count

        counts = np.zeros(self.num_strats, int)
        for obs in prof["observations"]:
            for player in obs["players"]:
                i, pay = _unpack_player(**player)
                k = ids[i]
                counts[k] += 1
                dest_pays[k] += (pay - dest_pays[k]) / counts[k]

    def _profpay_from_json_observation(self, prof, dest_prof, dest_pays):
        """Get profile and payoff from observation format"""
        for player in prof["players"]:
            role, strat, pay = _unpack_obs_player(**player)
            ind = self.role_strat_index(role, strat)
            dest_prof[ind] += 1
            dest_pays[ind] += (pay - dest_pays[ind]) / dest_prof[ind]

    def _profpay_from_json_dict(self, prof, dest_prof, dest_pays):
        """Get profile and payoff from dict format"""
        for role, strats in prof.items():
            for strat, count, pays in strats:
                index = self.role_strat_index(role, strat)
                dest_prof[index] = count
                dest_pays[index] = _mean(pays)

    def profpay_to_json(self, payoffs, prof):
        """Format a profile and payoffs as json"""
        return {
            role: [
                (strat, int(count), float(pay))
                for strat, count, pay in zip(strats, counts, pays)
                if count > 0
            ]
            for role, strats, counts, pays in zip(
                self.role_names,
                self.strat_names,
                np.split(prof, self.role_starts[1:]),
                np.split(payoffs, self.role_starts[1:]),
            )
        }

    @utils.memoize
    def __hash__(self):
        return hash(
            (
                super().__hash__(),
                self.num_complete_profiles,
                np.sort(utils.axis_to_elem(self._profiles)).tobytes(),
            )
        )

    def __eq__(self, othr):
        return (
            super().__eq__(othr)
            and
            # Identical profiles
            self.num_profiles == othr.num_profiles
            and self.num_complete_profiles == othr.num_complete_profiles
            and self._eq_payoffs(othr)
        )

    def _eq_payoffs(self, othr):
        """Identical profiles and payoffs conditioned on all else equal"""
        # pylint: disable-msg=protected-access
        sord = np.argsort(utils.axis_to_elem(self._profiles))
        oord = np.argsort(utils.axis_to_elem(othr._profiles))
        return np.all(self._profiles[sord] == othr._profiles[oord]) and np.allclose(
            self._payoffs[sord], othr._payoffs[oord], equal_nan=True
        )

    def to_json(self):
        """Fromat a Game as json"""
        res = super().to_json()
        res["profiles"] = [
            self.profpay_to_json(pay, prof)
            for prof, pay in zip(self._profiles, self._payoffs)
        ]
        res["type"] = "game.1"
        return res

    def __repr__(self):
        return "{old}, {data:d} / {total:d})".format(
            old=super().__repr__()[:-1],
            data=self.num_profiles,
            total=self.num_all_profiles,
        )

    def __str__(self):
        """Fromat basegame as a printable string"""
        return "{}\npayoff data for {:d} out of {:d} profiles".format(
            super().__str__(), self.num_profiles, self.num_all_profiles
        )


[docs]def game(num_role_players, num_role_strats, profiles, payoffs): """Create a game with default names Parameters ---------- num_role_players : ndarray-like, int, The number of players per role. num_role_strats : ndarray-like, int, The number of strategies per role. profiles : ndarray-like, int The profiles for the game, with shape (num_profiles, num_strats). payoffs : ndarray-like, float The payoffs for the game, with shape (num_profiles, num_strats). """ return game_replace( rsgame.empty(num_role_players, num_role_strats), profiles, payoffs )
[docs]def game_names(role_names, num_role_players, strat_names, profiles, payoffs): """Create a game with specified names Parameters ---------- role_names : [str] The name for each role. num_role_players : ndarray-like, int, The number of players per role. strat_names : [[str]] The name for each strategy per role. profiles : ndarray-like, int The profiles for the game, with shape (num_profiles, num_strats). payoffs : ndarray-like, float The payoffs for the game, with shape (num_profiles, num_strats). """ return game_replace( rsgame.empty_names(role_names, num_role_players, strat_names), profiles, payoffs )
[docs]def game_json(json): """Read a Game from json This takes a game in any valid payoff format (i.e. output by this or by EGTA Online), and converts it into a Game. If several payoff exist, the mean is taken. This means that loading a game using this method, and loading it as a sample game produce different results, as the sample game will truncate extra payoffs for an individual profile, while this will take the minimum. Note, that there is no legitimate way to get a game with that structure, but it is possible to write the json. """ base = game_copy(rsgame.empty_json(json)) profiles = json.get("profiles", ()) if not profiles: return base num_profs = len(profiles) profs = np.empty((num_profs, base.num_strats), int) pays = np.empty((num_profs, base.num_strats), float) for profj, prof, pay in zip(profiles, profs, pays): base.profpay_from_json(profj, prof, pay) return game_replace(base, profs, pays)
[docs]def game_copy(copy_game): """Copy structure and payoffs from an existing game Parameters ---------- copy_game : RsGame Game to copy data from. This will create a copy with the games profiles and payoffs. """ return _Game( copy_game.role_names, copy_game.strat_names, copy_game.num_role_players, copy_game.profiles(), copy_game.payoffs(), )
[docs]def game_replace(copy_game, profiles, payoffs): """Copy structure from an existing game with new data Parameters ---------- copy_game : Game Game to copy structure out of. Structure includes role names, strategy names, and the number of players. profiles : ndarray-like, int The profiles for the game, with shape (num_profiles, num_strats). payoffs : ndarray-like, float The payoffs for the game, with shape (num_profiles, num_strats). """ profiles = np.asarray(profiles, int) payoffs = np.asarray(payoffs, float) utils.check( profiles.shape == payoffs.shape, "profiles and payoffs must be the same shape {} {}", profiles.shape, payoffs.shape, ) utils.check( profiles.shape[1:] == (copy_game.num_strats,), "profiles must have proper end shape : expected {} but was {}", (copy_game.num_strats,), profiles.shape[1:], ) utils.check(np.all(profiles >= 0), "profiles was negative") utils.check( np.all( np.add.reduceat(profiles, copy_game.role_starts, 1) == copy_game.num_role_players ), "not all profiles equaled player total", ) utils.check( not np.any((payoffs != 0) & (profiles == 0)), "there were nonzero payoffs for strategies without players", ) utils.check( not np.all(np.isnan(payoffs) | (profiles == 0), 1).any(), "a profile can't have entirely nan payoffs", ) utils.check( profiles.shape[0] == np.unique(utils.axis_to_elem(profiles)).size, "there can't be any duplicate profiles", ) return _Game( copy_game.role_names, copy_game.strat_names, copy_game.num_role_players, profiles, payoffs, )
class _SampleGame(_Game): """A Role Symmetric Game that has multiple samples per profile This behaves the same as a normal Game object, except that it has methods for accessing several payoffs per profile. It also has a `resample` method which returns a Game with bootstrapped payoffs instead of mean payoffs, allowing for easy bootstrapping. Parameters ---------- role_names : (str,) The name of each role. strat_names : ((str,),) The name of each strategy for each role. num_role_players : ndarray, int The number of players per role. profiles : ndarray The profiles for the game. sample_payoffs : (ndarray,) The sample payoffs for the game. Each element of the tuple is a set of payoff samples grouped by number of samples and parallel with profiles. The dimension of each element should be (num_payoffs, num_samples, num_strats), where num_payoffs is the number of samples for that number of observations. The number of samples for each element of the tuple must be distinct, and an element with zero samples is disallowed, it should be omitted instead. All requirements for valid payoffs also apply. """ def __init__( # pylint: disable=too-many-arguments self, role_names, strat_names, num_role_players, profiles, sample_payoffs ): super().__init__( role_names, strat_names, num_role_players, profiles, np.concatenate([s.mean(1) for s in sample_payoffs]) if sample_payoffs else np.empty((0, profiles.shape[1])), ) self._sample_payoffs = sample_payoffs for spay in self._sample_payoffs: spay.setflags(write=False) self.num_sample_profs = np.fromiter( # pragma: no branch (x.shape[0] for x in sample_payoffs), int, len(sample_payoffs) ) self.num_sample_profs.setflags(write=False) self.sample_starts = np.insert(self.num_sample_profs[:-1].cumsum(), 0, 0) self.sample_starts.setflags(write=False) self.num_samples = np.fromiter( # pragma: no branch (v.shape[1] for v in sample_payoffs), int, len(sample_payoffs) ) self.num_samples.setflags(write=False) self._sample_profile_map = None @utils.memoize def min_strat_payoffs(self): """Returns the minimum payoff for each role""" mins = np.full(self.num_strats, np.nan) for profs, spays in zip( np.split(self._profiles, self.sample_starts[1:]), self._sample_payoffs ): sample_mins = np.fmin.reduce( np.where(profs[:, None] > 0, spays, np.nan), (0, 1) ) np.fmin(mins, sample_mins, mins) mins.setflags(write=False) return mins @utils.memoize def max_strat_payoffs(self): """Returns the maximum payoff for each role""" maxs = np.full(self.num_strats, np.nan) for profs, spays in zip( np.split(self._profiles, self.sample_starts[1:]), self._sample_payoffs ): sample_maxs = np.fmax.reduce( np.where(profs[:, None] > 0, spays, np.nan), (0, 1) ) np.fmax(maxs, sample_maxs, maxs) maxs.setflags(write=False) return maxs def sample_payoffs(self): """Get the underlying sample payoffs""" return self._sample_payoffs def resample( self, num_resamples=None, *, independent_profile=False, independent_role=False, independent_strategy=False ): """Fetch a game with bootstrap sampled payoffs Arguments --------- num_resamples : int The number of resamples to take for each realized payoff. By default this is equal to the number of observations for that profile, yielding proper bootstrap sampling. independent_profile : bool If true, sample each profile independently. In general, only profiles with a different number of observations will be resampled independently. independent_role : bool If true, sample each role independently. Within a profile, the payoffs for each role will be drawn independently. independent_strategy : bool IF true, sample each strategy independently. Within a profile, the payoffs for each strategy will be drawn independently. This supersceeds `independent_role`. Notes ----- Each of the `independent_` arguments will increase the time to do a resample, but may produce better results as it will remove correlations between payoffs. """ dim2 = ( self.num_strats if independent_strategy else self.num_roles if independent_role else 1 ) payoffs = np.empty_like(self._payoffs) for obs, pays in zip( self._sample_payoffs, np.split(payoffs, self.sample_starts[1:]) ): obs = np.rollaxis(obs, 1, 3) num_samples = obs.shape[2] num_obs_resamples = num_samples if num_resamples is None else num_resamples dim1 = obs.shape[0] if independent_profile else 1 sample = rand.multinomial( num_obs_resamples, np.ones(num_samples) / num_samples, (dim1, dim2) ) if independent_role and not independent_strategy: sample = sample.repeat(self.num_role_strats, 1) np.copyto( pays, np.mean(obs * sample, 2) * (num_samples / num_obs_resamples) ) return _Game( self.role_names, self.strat_names, self.num_role_players, self._profiles, payoffs, ) def get_sample_payoffs(self, profile): """Get sample payoffs associated with a profile This returns an array of shape (num_observations, num_role_strats). If profile has no data, num_observations will be 0.""" if self._sample_profile_map is None: self._sample_profile_map = dict( zip( map(utils.hash_array, self._profiles), itertools.chain.from_iterable(self._sample_payoffs), ) ) profile = np.asarray(profile, int) utils.check(self.is_profile(profile), "must pass a valid profile") hashed = utils.hash_array(profile) if hashed not in self._sample_profile_map: # pylint: disable=no-else-return return np.empty((0, self.num_strats), float) else: return self._sample_profile_map[hashed] def flat_profiles(self): """Profiles in parallel with flat_payoffs""" if self.is_empty(): # pylint: disable=no-else-return return np.empty((0, self.num_strats), int) else: return self._profiles.repeat( self.num_samples.repeat(self.num_sample_profs), 0 ) def flat_payoffs(self): """All sample payoffs linearly concatenated together""" if self.is_empty(): # pylint: disable=no-else-return return np.empty((0, self.num_strats)) else: return np.concatenate( [pay.reshape((-1, self.num_strats)) for pay in self._sample_payoffs] ) def _add_constant(self, constant): off = np.broadcast_to(constant, self.num_roles).repeat(self.num_role_strats) with np.errstate(invalid="ignore"): new_pays = tuple( (profs > 0)[:, None] * (pays + off) for profs, pays in zip( np.split(self._profiles, self.sample_starts[1:]), self._sample_payoffs, ) ) return _SampleGame( self.role_names, self.strat_names, self.num_role_players, self._profiles, new_pays, ) def _multiply_constant(self, constant): mult = np.broadcast_to(constant, self.num_roles).repeat(self.num_role_strats) with np.errstate(invalid="ignore"): new_pays = tuple(pays * mult for pays in self._sample_payoffs) return _SampleGame( self.role_names, self.strat_names, self.num_role_players, self._profiles, new_pays, ) def restrict(self, restriction): """Remove possible strategies from consideration""" restriction = np.asarray(restriction, bool) base = rsgame.empty_copy(self).restrict(restriction) prof_mask = ~np.any(self._profiles * ~restriction, 1) profiles = self._profiles[prof_mask][:, restriction] sample_payoffs = tuple( pays[pmask][..., restriction] for pays, pmask in zip( self._sample_payoffs, np.split(prof_mask, self.sample_starts[1:]) ) if pmask.any() ) return _SampleGame( base.role_names, base.strat_names, base.num_role_players, profiles, sample_payoffs, ) def samplepay_from_json(self, prof, dest=None): """Read a set of payoff samples Parameters ---------- prof : json A description of a set of profiles and their payoffs. There are several formats that are acceptable, they're all output by egta. dest : ndarray, options If supplied, ``dest`` will be written to instead of allocting a new array. This may be hard to use as you need to know how many observations are in the json. """ with contextlib.suppress(ValueError): # samplepay format with profile too _, dest = self.profsamplepay_from_json(prof, dest_samplepay=dest) return dest with contextlib.suppress(ValueError, AttributeError): # Must be {role: {strat: [pay]}} num = max( max(len(p) if isinstance(p, abc.Iterable) else 1 for p in pays.values()) for pays in prof.values() ) if dest is None: dest = np.empty((num, self.num_strats), float) else: utils.check(dest.dtype.kind == "f", "dest dtype must be floating") utils.check( dest.shape == (num, self.num_strats), "dest_samplepay not large enough for observations", ) dest.fill(0) for role, strats in prof.items(): for strat, pay in strats.items(): dest[:, self.role_strat_index(role, strat)] = pay return dest raise ValueError("unknown format") def samplepay_to_json(self, samplepay): """Format sample payoffs as json""" # In a really weird degenerate case, if all payoffs are 0, we'll write # out an empty dictionary, which loses information about the number of # samples. In that case we arbitrarily write out the first strategy # with zero payoffs. samplepay = np.asarray(samplepay, float) if np.all(samplepay == 0): return { self.role_names[0]: {self.strat_names[0][0]: [0] * samplepay.shape[0]} } return { role: { strat: pay.tolist() for strat, pay in zip(strats, pays) if np.any(pay != 0) } for role, strats, pays in zip( self.role_names, self.strat_names, np.split(samplepay.T, self.role_starts[1:]), ) if np.any(pays != 0) } def profsamplepay_from_json(self, prof, dest_prof=None, dest_samplepay=None): """Convert json into a profile and an observation""" if dest_prof is None: dest_prof = np.empty(self.num_strats, int) dest_prof.fill(0) # summary format if "observations" not in prof and "symmetry_groups" in prof: return self._profsamplepay_from_json_summ(prof, dest_prof, dest_samplepay) # observations format elif "observations" in prof and "symmetry_groups" in prof["observations"][0]: return self._profsamplepay_from_json_obs(prof, dest_prof, dest_samplepay) # full format elif "observations" in prof: return self._profsamplepay_from_json_full(prof, dest_prof, dest_samplepay) # profile payoff elif all(not isinstance(v, abc.Mapping) for v in prof.values()): return self._profsamplepay_from_json_prof(prof, dest_prof, dest_samplepay) # unrecognized else: raise ValueError("unrecognized format") def _get_spay_dest(self, dest, num): """Get payoff dest for number of samples""" if dest is None: return np.zeros((num, self.num_strats), float) utils.check( dest.shape == (num, self.num_strats), "dest_samplepay not large enough for observations", ) dest.fill(0) return dest def _profsamplepay_from_json_summ(self, prof, dest_prof, dest): """Get profile and sample payoff for summary format""" dest = self._get_spay_dest(dest, 1) for symgrp in prof["symmetry_groups"]: _, role, strat, count, pay = _unpack_symgrp(**symgrp) index = self.role_strat_index(role, strat) dest_prof[index] = count dest[0, index] = pay return dest_prof, dest def _profsamplepay_from_json_obs( self, prof, dest_prof, dest ): # pylint: disable=too-many-locals """Get profile and sample payoff for observation format""" dest = self._get_spay_dest(dest, len(prof["observations"])) ids = {} for symgrp in prof["symmetry_groups"]: i, role, strat, count, _ = _unpack_symgrp(**symgrp) index = self.role_strat_index(role, strat) ids[i] = index dest_prof[index] = count for j, obs in enumerate(prof["observations"]): for symgrp in obs["symmetry_groups"]: i, pay = _unpack_obs(**symgrp) dest[j, ids[i]] = pay return dest_prof, dest def _profsamplepay_from_json_full( self, prof, dest_prof, dest ): # pylint: disable=too-many-locals """Get profile and sample payoff for full format""" dest = self._get_spay_dest(dest, len(prof["observations"])) ids = {} for symgrp in prof["symmetry_groups"]: i, role, strat, count, _ = _unpack_symgrp(**symgrp) index = self.role_strat_index(role, strat) ids[i] = index dest_prof[index] = count counts = np.empty(self.num_strats, int) for j, obs in enumerate(prof["observations"]): counts.fill(0) for player in obs["players"]: i, pay = _unpack_player(**player) k = ids[i] counts[k] += 1 dest[j, k] += (pay - dest[j, k]) / counts[k] utils.check( np.all(counts == dest_prof), "full format didn't have payoffs for the correct number " "of players", ) return dest_prof, dest def _profsamplepay_from_json_prof(self, prof, dest_prof, dest): """Get profile and sample payoff for profile format""" num = max( max(len(p) if isinstance(p, abc.Iterable) else 1 for _, __, p in sg) for sg in prof.values() ) dest = self._get_spay_dest(dest, num) for role, strats in prof.items(): for strat, count, pays in strats: index = self.role_strat_index(role, strat) dest_prof[index] = count dest[:, index] = pays return dest_prof, dest def profsamplepay_to_json(self, samplepay, prof): """Convery profile and observations to prof obs output""" return { role: [ (strat, int(count), list(map(float, pay))) for strat, count, pay in zip(strats, counts, pays.T) if count > 0 ] for role, strats, counts, pays in zip( self.role_names, self.strat_names, np.split(prof, self.role_starts[1:]), np.split(samplepay, self.role_starts[1:], 1), ) } @utils.memoize def __hash__(self): return hash((super().__hash__(), tuple(sorted(self.num_samples)))) def __eq__(self, other): return ( super().__eq__(other) and # Identical sample payoffs all( _sample_payoffs_equal(pay, other.get_sample_payoffs(prof)) for prof, pay in zip( self._profiles, itertools.chain.from_iterable(self._sample_payoffs) ) ) ) def to_json(self): """Fromat a SampleGame as json""" res = super().to_json() res["profiles"] = [ self.profsamplepay_to_json(pay, prof) for prof, pay in zip( self._profiles, itertools.chain.from_iterable(self._sample_payoffs) ) ] res["type"] = "samplegame.1" return res def __repr__(self): samples = self.num_samples if samples.size == 0: sample_str = "0" elif samples.size == 1: sample_str = str(samples[0]) else: sample_str = "{:d} - {:d}".format(samples.min(), samples.max()) return "{}, {})".format(super().__repr__()[:-1], sample_str) def __str__(self): samples = self.num_sample_profs.dot(self.num_samples) if self.num_samples.size == 0: sampstr = "no observations" elif self.num_samples.size == 1: samps = self.num_samples[0] sampstr = "{:d} observation{} per profile".format( samps, "" if samps == 1 else "s" ) else: sampstr = "{:d} to {:d} observations per profile".format( self.num_samples.min(), self.num_samples.max() ) return "{}\n{} payoff sample{}\n{}".format( super().__str__(), "no" if samples == 0 else samples, "" if samples == 1 else "s", sampstr, ) def _sample_payoffs_equal(pay1, pay2): """Returns true if two sample payoffs are almost equal""" return pay1.shape[0] == pay2.shape[0] and utils.allclose_perm( pay1, pay2, equal_nan=True )
[docs]def samplegame(num_role_players, num_role_strats, profiles, sample_payoffs): """Create a SampleGame with default names Parameters ---------- num_role_players : ndarray-like, int The number of players per role. num_role_strats : ndarray-like, int The number of strategies per role. profiles : ndarray-like, int The profiles for the game, with shape (num_profiles, num_strats). sample_payoffs : [ndarray-like, float] The sample payoffs for the game. """ return samplegame_replace( rsgame.empty(num_role_players, num_role_strats), profiles, sample_payoffs )
[docs]def samplegame_flat(num_role_players, num_role_strats, profiles, payoffs): """Create a SampleGame with default names and flat profiles Parameters ---------- num_role_players : ndarray-like, int The number of players per role. num_role_strats : ndarray-like, int The number of strategies per role. profiles : ndarray-like, int The profiles for the game, potentially with duplicates, with shape (num_sample_profiles, num_strats). payoffs : ndarray-like, float The sample payoffs for the game, in parallel with the profiles they're samples from, with shape (num_sample_profiles, num_strats). """ return samplegame_replace_flat( rsgame.empty(num_role_players, num_role_strats), profiles, payoffs )
[docs]def samplegame_names( role_names, num_role_players, strat_names, profiles, sample_payoffs ): """Create a SampleGame with specified names Parameters ---------- role_names : [str] The name of each role. num_role_players : ndarray The number of players for each role. strat_names : [[str]] The name of each strategy. profiles : ndarray The profiles for the game. sample_payoffs : [ndarray] The sample payoffs for the game.""" return samplegame_replace( rsgame.empty_names(role_names, num_role_players, strat_names), profiles, sample_payoffs, )
[docs]def samplegame_names_flat(role_names, num_role_players, strat_names, profiles, payoffs): """Create a SampleGame with specified names and flat payoffs Parameters ---------- role_names : [str] The name of each role. num_role_players : ndarray The number of players for each role. strat_names : [[str]] The name of each strategy. profiles : ndarray-like, int The profiles for the game, potentially with duplicates, (num_sample_profiles, num_strats). payoffs : ndarray-like, float The sample payoffs for the game, in parallel with the profiles they're samples from, (num_sample_profiles, num_strats). """ return samplegame_replace_flat( rsgame.empty_names(role_names, num_role_players, strat_names), profiles, payoffs )
[docs]def samplegame_json(json): """Read a SampleGame from json This will read any valid payoff game as a sample game. Invalid games will produce an empty sample game.""" base = samplegame_copy(rsgame.empty_json(json)) profiles = json.get("profiles", ()) if not profiles: logging.debug("no profiles found in sample game") return base sample_map = {} for profile in profiles: prof, spay = base.profsamplepay_from_json(profile) num_samps = spay.shape[0] profls, payls = sample_map.setdefault(num_samps, ([], [])) profls.append(prof[None]) payls.append(spay[None]) values = [v for _, v in sorted(sample_map.items())] profiles = np.concatenate( list(itertools.chain.from_iterable(prof for prof, _ in values)) ) sample_payoffs = tuple(np.concatenate(spay) for _, spay in values) return samplegame_replace(base, profiles, sample_payoffs)
[docs]def samplegame_copy(copy_game): """Copy a SampleGame from another game If game defined sample_payoffs, this will be created with those, otherwise it will create a game with one sample per payoff. Parameters ---------- copy_game : RsGame Game to copy data from. """ if hasattr(copy_game, "sample_payoffs"): sample_payoffs = copy_game.sample_payoffs() elif not copy_game.is_empty(): sample_payoffs = (copy_game.payoffs()[:, None],) else: sample_payoffs = () return _SampleGame( copy_game.role_names, copy_game.strat_names, copy_game.num_role_players, copy_game.profiles(), sample_payoffs, )
[docs]def samplegame_replace_flat( copy_game, profiles, payoffs ): # pylint: disable=too-many-locals """Replace sample payoff data for an existing game Parameters ---------- copy_game : BaseGame, optional Game to copy information out of. profiles : ndarray-like, int The profiles for the game, potentially with duplicates, with shape (num_sample_profiles, num_strats). payoffs : ndarray-like, float The sample payoffs for the game, in parallel with the profiles they're samples from, with shape (num_sample_profiles, num_strats). """ profiles = np.asarray(profiles, int) payoffs = np.asarray(payoffs, float) _, ind, inv, counts = np.unique( utils.axis_to_elem(profiles), return_index=True, return_inverse=True, return_counts=True, ) countso = counts.argsort() countsoi = np.empty(counts.size, int) countsoi[countso] = np.arange(counts.size) cinv = countsoi[inv] cinvo = cinv.argsort() cinvs = cinv[cinvo] payo = (np.insert(np.cumsum(1 - np.diff(cinvs)), 0, 0) + cinvs)[cinvo] num_samps, ccounts = np.unique(counts[countso], return_counts=True) splits = (num_samps * ccounts)[:-1].cumsum() profs = profiles[ind[countso]] pays = [ pay.reshape((n, c, -1)) for pay, n, c in zip(np.split(payoffs[payo], splits), ccounts, num_samps) ] return samplegame_replace(copy_game, profs, pays)
[docs]def samplegame_replace(copy_game, profiles, sample_payoffs): """Replace sample payoff data for an existing game Parameters ---------- copy_game : BaseGame, optional Game to copy information out of. profiles : ndarray-like, int The profiles for the game, with shape (num_profiles, num_strats). sample_payoffs : [ndarray-like, float] The sample payoffs for the game. """ profiles = np.asarray(profiles, int) sample_payoffs = tuple(np.asarray(sp) for sp in sample_payoffs) utils.check( profiles.shape[1:] == (copy_game.num_strats,), "profiles must have proper end shape : expected {} but was {}", (copy_game.num_strats,), profiles.shape[1:], ) utils.check(np.all(profiles >= 0), "profiles were negative") utils.check( np.all( np.add.reduceat(profiles, copy_game.role_starts, 1) == copy_game.num_role_players ), "not all profiles equaled player total", ) utils.check( profiles.shape[0] == np.unique(utils.axis_to_elem(profiles)).size, "there can't be any duplicate profiles", ) utils.check( profiles.shape[0] == sum(sp.shape[0] for sp in sample_payoffs), 'profiles and sample_payoffs must have the same number of "profiles"', ) utils.check( all(sp.shape[2] == copy_game.num_strats for sp in sample_payoffs), "all sample payoffs must have the appropriate number of strategies", ) utils.check( not any(pays.size == 0 for pays in sample_payoffs), "sample_payoffs can't be empty", ) utils.check( len({s.shape[1] for s in sample_payoffs}) == len(sample_payoffs), "each set of observations must have a unique number or be merged", ) for profs, spays in zip( np.split( profiles, list(itertools.accumulate(sp.shape[0] for sp in sample_payoffs[:-1])), ), sample_payoffs, ): utils.check( not np.any((spays != 0) & (profs == 0)[:, None]), "some sample payoffs were nonzero for invalid payoffs", ) utils.check( not np.all(np.isnan(spays) | (profs == 0)[:, None], 2).any(), "an observation can't have entirely nan payoffs", ) utils.check( np.all(np.isnan(spays).all(1) | ~np.isnan(spays).any()), "for a given strategy, all payoffs must be nan or non", ) return _SampleGame( copy_game.role_names, copy_game.strat_names, copy_game.num_role_players, profiles, sample_payoffs, )
# --------- # Utilities # --------- def _mean(vals): """Streaming mean of some values""" if not isinstance(vals, abc.Iterable): return vals count = 0 mean = 0 for val in vals: count += 1 mean += (val - mean) / count return mean if count > 0 else float("nan") def _unpack_symgrp( role, strategy, count, payoff=None, id=None, **_ ): # pylint: disable=invalid-name,redefined-builtin """Unpack a symmetry group""" return id, role, strategy, count, payoff def _unpack_obs(id, payoff, **_): # pylint: disable=invalid-name,redefined-builtin """Unpack an observation""" return id, payoff def _unpack_player(sid, p, **_): # pylint: disable=invalid-name """Unpack a player""" return sid, p def _unpack_obs_player(role, strategy, payoff, **_): """Unpack an observation player""" return role, strategy, payoff