Source code for gameanalysis.learning

"""Package for learning complete games from data

The API of this individual module is still unstable and may change as
improvements or refinements are made.

There are two general game types in this module: learned games and deviation
games. Learned games vary by the method, but generally expose methods for
computing payoffs and may other features. Deviation games use learned games and
different functions to compute deviation payoffs via various methods.
"""
import warnings

import numpy as np
from numpy.lib import recfunctions
import sklearn
from sklearn import gaussian_process as gp

from gameanalysis import gamereader
from gameanalysis import paygame
from gameanalysis import restrict
from gameanalysis import rsgame
from gameanalysis import utils


class _DevRegressionGame(rsgame._CompleteGame):  # pylint: disable=protected-access
    """A game regression model that learns deviation payoffs

    This model functions as a game, but doesn't have a default way of computing
    deviation payoffs. It must be wrapped with another game that uses payoff
    data to compute deviation payoffs.
    """

    def __init__(  # pylint: disable=too-many-arguments
        self, game, regressors, offset, scale, min_payoffs, max_payoffs, rest
    ):
        super().__init__(game.role_names, game.strat_names, game.num_role_players)
        self._regressors = regressors
        self._offset = offset
        self._offset.setflags(write=False)
        self._scale = scale
        self._scale.setflags(write=False)
        self._min_payoffs = min_payoffs
        self._min_payoffs.setflags(write=False)
        self._max_payoffs = max_payoffs
        self._max_payoffs.setflags(write=False)
        self._rest = rest
        self._rest.setflags(write=False)

    def deviation_payoffs(self, _, **_kw):  # pylint: disable=arguments-differ
        raise ValueError(
            "regression games don't define deviation payoffs and must be "
            "used as a model for a deviation game"
        )

    def get_payoffs(self, profiles):
        utils.check(self.is_profile(profiles).all(), "must pass valid profiles")
        payoffs = np.zeros(profiles.shape)
        for i, (off, scale, reg) in enumerate(
            zip(self._offset, self._scale, self._regressors)
        ):
            mask = profiles[..., i] > 0
            profs = profiles[mask]
            profs[:, i] -= 1
            if profs.size:
                payoffs[mask, i] = (
                    reg.predict(restrict.translate(profs, self._rest)).ravel() * scale
                    + off
                )
        return payoffs

    def get_dev_payoffs(self, dev_profs):
        """Compute the payoff for deviating

        This implementation is more efficient than the default since we don't
        need to compute the payoff for non deviators."""
        prof_view = np.rollaxis(
            restrict.translate(
                dev_profs.reshape((-1, self.num_roles, self.num_strats)), self._rest
            ),
            1,
            0,
        )
        payoffs = np.empty(dev_profs.shape[:-2] + (self.num_strats,))
        pay_view = payoffs.reshape((-1, self.num_strats)).T
        for pays, profs, reg in zip(
            pay_view, utils.repeat(prof_view, self.num_role_strats), self._regressors
        ):
            np.copyto(pays, reg.predict(profs))
        return payoffs * self._scale + self._offset

    def max_strat_payoffs(self):
        return self._max_payoffs.view()

    def min_strat_payoffs(self):
        return self._min_payoffs.view()

    def restrict(self, restriction):
        base = rsgame.empty_copy(self).restrict(restriction)
        new_rest = self._rest.copy()
        new_rest[new_rest] = restriction
        regs = tuple(reg for reg, m in zip(self._regressors, restriction) if m)
        return _DevRegressionGame(
            base,
            regs,
            self._offset[restriction],
            self._scale[restriction],
            self._min_payoffs[restriction],
            self._max_payoffs[restriction],
            new_rest,
        )

    def _add_constant(self, constant):
        off = np.broadcast_to(constant, self.num_roles).repeat(self.num_role_strats)
        return _DevRegressionGame(
            self,
            self._regressors,
            self._offset + off,
            self._scale,
            self._min_payoffs + off,
            self._max_payoffs + off,
            self._rest,
        )

    def _multiply_constant(self, constant):
        mul = np.broadcast_to(constant, self.num_roles).repeat(self.num_role_strats)
        return _DevRegressionGame(
            self,
            self._regressors,
            self._offset * mul,
            self._scale * mul,
            self._min_payoffs * mul,
            self._max_payoffs * mul,
            self._rest,
        )

    def _add_game(self, _):
        return NotImplemented

    def __eq__(self, othr):
        # pylint: disable-msg=protected-access
        return (
            super().__eq__(othr)
            and self._regressors == othr._regressors
            and np.allclose(self._offset, othr._offset)
            and np.allclose(self._scale, othr._scale)
            and np.all(self._rest == othr._rest)
        )

    def __hash__(self):
        return hash((super().__hash__(), self._rest.tobytes()))


def _dev_profpay(game):
    """Iterate over deviation profiles and payoffs"""
    sgame = paygame.samplegame_copy(game)
    profiles = sgame.flat_profiles()
    payoffs = sgame.flat_payoffs()

    for i, pays in enumerate(payoffs.T):
        mask = (profiles[:, i] > 0) & ~np.isnan(pays)
        utils.check(mask.any(), "couldn't find deviation data for a strategy")
        profs = profiles[mask]
        profs[:, i] -= 1
        yield i, profs, pays[mask]


[docs]def nngame_train( # pylint: disable=too-many-arguments,too-many-locals game, epochs=100, layer_sizes=(32, 32), dropout=0.2, verbosity=0, optimizer="sgd", loss="mean_squared_error", ): """Train a neural network regression model This mostly exists as a proof of concept, individual testing should be done to make sure it is working sufficiently. This API will likely change to support more general architectures and training. """ utils.check(layer_sizes, "must have at least one layer") utils.check(0 <= dropout < 1, "dropout must be a valid probability") # This is for delayed importing inf tensor flow from keras import models, layers model = models.Sequential() lay_iter = iter(layer_sizes) model.add( layers.Dense(next(lay_iter), input_shape=[game.num_strats], activation="relu") ) for units in lay_iter: model.add(layers.Dense(units, activation="relu")) if dropout: model.add(layers.Dropout(dropout)) model.add(layers.Dense(1, activation="sigmoid")) regs = [] offsets = np.empty(game.num_strats) scales = np.empty(game.num_strats) for i, profs, pays in _dev_profpay(game): # XXX Payoff normalization specific to sigmoid. If we accept alternate # models, we need a way to compute how to potentially normalize # payoffs. min_pay = pays.min() offsets[i] = min_pay max_pay = pays.max() scale = 1 if np.isclose(max_pay, min_pay) else max_pay - min_pay scales[i] = scale reg = models.clone_model(model) reg.compile(optimizer=optimizer, loss=loss) reg.fit(profs, (pays - min_pay) / scale, epochs=epochs, verbose=verbosity) regs.append(reg) return _DevRegressionGame( game, tuple(regs), offsets, scales, game.min_strat_payoffs(), game.max_strat_payoffs(), np.ones(game.num_strats, bool), )
[docs]def sklgame_train(game, estimator): """Create a regression game from an arbitrary sklearn estimator Parameters ---------- game : RsGame The game to learn, must have at least one payoff per strategy. estimator : sklearn estimator An estimator that supports clone, fit, and predict via the stand scikit-learn estimator API. """ regs = [] for _, profs, pays in _dev_profpay(game): reg = sklearn.base.clone(estimator) reg.fit(profs, pays) regs.append(reg) return _DevRegressionGame( game, tuple(regs), np.zeros(game.num_strats), np.ones(game.num_strats), game.min_strat_payoffs(), game.max_strat_payoffs(), np.ones(game.num_strats, bool), )
class _RbfGpGame( rsgame._CompleteGame ): # pylint: disable=too-many-instance-attributes,protected-access """A regression game using RBF Gaussian processes This regression game has a build in deviation payoff based off of a continuous approximation of the multinomial distribution. """ def __init__( # pylint: disable=too-many-locals,too-many-arguments self, role_names, strat_names, num_role_players, offset, coefs, lengths, sizes, profiles, alpha, ): super().__init__(role_names, strat_names, num_role_players) self._offset = offset self._offset.setflags(write=False) self._coefs = coefs self._coefs.setflags(write=False) self._lengths = lengths self._lengths.setflags(write=False) self._sizes = sizes self._sizes.setflags(write=False) self._size_starts = np.insert(self._sizes[:-1].cumsum(), 0, 0) self._size_starts.setflags(write=False) self._profiles = profiles self._profiles.setflags(write=False) self._alpha = alpha self._alpha.setflags(write=False) # Useful member self._dev_players = np.repeat( self.num_role_players - np.eye(self.num_roles, dtype=int), self.num_role_strats, 0, ) self._dev_players.setflags(write=False) # Compute min and max payoffs # TODO These are pretty conservative, and could maybe be made more # accurate sdp = self._dev_players.repeat(self.num_role_strats, 1) max_rbf = np.einsum("ij,ij,ij->i", sdp, sdp, 1 / self._lengths) minw = np.exp(-max_rbf / 2) # pylint: disable=invalid-unary-operand-type mask = self._alpha > 0 pos = np.add.reduceat(self._alpha * mask, self._size_starts) neg = np.add.reduceat(self._alpha * ~mask, self._size_starts) self._min_payoffs = self._coefs * (pos * minw + neg) + self._offset self._min_payoffs.setflags(write=False) self._max_payoffs = self._coefs * (pos + neg * minw) + self._offset self._max_payoffs.setflags(write=False) def get_payoffs(self, profiles): utils.check(self.is_profile(profiles).all(), "must pass valid profiles") dev_profiles = np.repeat( profiles[..., None, :] - np.eye(self.num_strats, dtype=int), self._sizes, -2 ) vec = (dev_profiles - self._profiles) / self._lengths.repeat(self._sizes, 0) rbf = np.einsum("...ij,...ij->...i", vec, vec) payoffs = self._offset + self._coefs * np.add.reduceat( np.exp(-rbf / 2) * self._alpha, self._size_starts, -1 ) # pylint: disable=invalid-unary-operand-type payoffs[profiles == 0] = 0 return payoffs def get_dev_payoffs( self, dev_profs, *, jacobian=False ): # pylint: disable=arguments-differ dev_profiles = dev_profs.repeat( np.add.reduceat(self._sizes, self.role_starts), -2 ) vec = (dev_profiles - self._profiles) / self._lengths.repeat(self._sizes, 0) rbf = np.einsum("...ij,...ij->...i", vec, vec) exp = ( np.exp(-rbf / 2) * self._alpha ) # pylint: disable=invalid-unary-operand-type payoffs = self._offset + self._coefs * np.add.reduceat( exp, self._size_starts, -1 ) if not jacobian: return payoffs jac = -( self._coefs[:, None] / self._lengths * np.add.reduceat(exp[:, None] * vec, self._size_starts, 0) ) return payoffs, jac def max_strat_payoffs(self): return self._max_payoffs.view() def min_strat_payoffs(self): return self._min_payoffs.view() def deviation_payoffs( self, mixture, *, jacobian=False, **_ ): # pylint: disable=too-many-locals players = self._dev_players.repeat(self.num_role_strats, 1) avg_prof = players * mixture diag = 1 / (self._lengths ** 2 + avg_prof) diag_sizes = diag.repeat(self._sizes, 0) diff = self._profiles - avg_prof.repeat(self._sizes, 0) det = 1 / ( 1 - self._dev_players * np.add.reduceat(mixture ** 2 * diag, self.role_starts, 1) ) det_sizes = det.repeat(self._sizes, 0) cov_diag = np.einsum("ij,ij,ij->i", diff, diff, diag_sizes) cov_outer = np.add.reduceat(mixture * diag_sizes * diff, self.role_starts, 1) sec_term = np.einsum( "ij,ij,ij,ij->i", self._dev_players.repeat(self._sizes, 0), det_sizes, cov_outer, cov_outer, ) exp = np.exp(-(cov_diag + sec_term) / 2) coef = self._lengths.prod(1) * np.sqrt(diag.prod(1) * det.prod(1)) avg = np.add.reduceat(self._alpha * exp, self._size_starts) payoffs = self._coefs * coef * avg + self._offset if not jacobian: return payoffs beta = 1 - players * mixture * diag jac_coef = ( (beta ** 2 - 1) * det.repeat(self.num_role_strats, 1) + players * diag ) * avg[:, None] delta = np.repeat(cov_outer * det_sizes, self.num_role_strats, 1) jac_exp = ( -self._alpha[:, None] * exp[:, None] * ( (delta * beta.repeat(self._sizes, 0) - diff * diag_sizes - 1) ** 2 - (delta - 1) ** 2 ) ) jac_avg = players * np.add.reduceat(jac_exp, self._size_starts, 0) jac = -self._coefs[:, None] * coef[:, None] * (jac_coef + jac_avg) / 2 return payoffs, jac # TODO Add function that creates sample game which draws payoffs from the # gp distribution def restrict(self, restriction): restriction = np.asarray(restriction, bool) base = rsgame.empty_copy(self).restrict(restriction) size_mask = restriction.repeat(self._sizes) sizes = self._sizes[restriction] profiles = self._profiles[size_mask] lengths = self._lengths[restriction] zeros = profiles[:, ~restriction] / lengths[:, ~restriction].repeat(sizes, 0) removed = np.exp( -np.einsum("ij,ij->i", zeros, zeros) / 2 ) # pylint: disable=invalid-unary-operand-type uprofs, inds = np.unique( recfunctions.merge_arrays( [ np.arange(restriction.sum()).repeat(sizes).view([("s", int)]), utils.axis_to_elem(profiles[:, restriction]), ], flatten=True, ), return_inverse=True, ) new_alpha = np.bincount(inds, removed * self._alpha[size_mask]) new_sizes = np.diff( np.concatenate( [[-1], np.flatnonzero(np.diff(uprofs["s"])), [new_alpha.size - 1]] ) ) return _RbfGpGame( base.role_names, base.strat_names, base.num_role_players, self._offset[restriction], self._coefs[restriction], lengths[:, restriction], new_sizes, uprofs["axis"], new_alpha, ) def _add_constant(self, constant): off = np.broadcast_to(constant, self.num_roles).repeat(self.num_role_strats) return _RbfGpGame( self.role_names, self.strat_names, self.num_role_players, self._offset + off, self._coefs, self._lengths, self._sizes, self._profiles, self._alpha, ) def _multiply_constant(self, constant): mul = np.broadcast_to(constant, self.num_roles).repeat(self.num_role_strats) return _RbfGpGame( self.role_names, self.strat_names, self.num_role_players, self._offset * mul, self._coefs * mul, self._lengths, self._sizes, self._profiles, self._alpha, ) def _add_game(self, _): return NotImplemented def to_json(self): base = super().to_json() base["offsets"] = self.payoff_to_json(self._offset) base["coefs"] = self.payoff_to_json(self._coefs) lengths = {} for role, strats, lens in zip( self.role_names, self.strat_names, np.split(self._lengths, self.role_starts[1:]), ): lengths[role] = {s: self.payoff_to_json(l) for s, l in zip(strats, lens)} base["lengths"] = lengths profs = {} for role, strats, data in zip( self.role_names, self.strat_names, np.split( np.split(self._profiles, self._size_starts[1:]), self.role_starts[1:] ), ): profs[role] = { strat: [self.profile_to_json(p) for p in dat] for strat, dat in zip(strats, data) } base["profiles"] = profs alphas = {} for role, strats, alphs in zip( self.role_names, self.strat_names, np.split( np.split(self._alpha, self._size_starts[1:]), self.role_starts[1:] ), ): alphas[role] = {s: a.tolist() for s, a in zip(strats, alphs)} base["alphas"] = alphas base["type"] = "rbf.1" return base def __eq__(self, othr): # pylint: disable-msg=protected-access return ( super().__eq__(othr) and np.allclose(self._offset, othr._offset) and np.allclose(self._coefs, othr._coefs) and np.allclose(self._lengths, othr._lengths) and np.all(self._sizes == othr._sizes) and utils.allclose_perm( np.concatenate( [ np.arange(self.num_strats).repeat(self._sizes)[:, None], self._profiles, self._alpha[:, None], ], 1, ), np.concatenate( [ np.arange(othr.num_strats).repeat(othr._sizes)[:, None], othr._profiles, othr._alpha[:, None], ], 1, ), ) ) @utils.memoize def __hash__(self): hprofs = np.sort( utils.axis_to_elem( np.concatenate( [ np.arange(self.num_strats).repeat(self._sizes)[:, None], self._profiles, ], 1, ) ) ).tobytes() return hash((super().__hash__(), hprofs))
[docs]def rbfgame_train(game, num_restarts=3): # pylint: disable=too-many-locals """Train a regression game with an RBF Gaussian process This model is somewhat well tests and has a few added benefits over standard regression models due the nature of its functional form. Parameters ---------- game : RsGame The game to learn. Must have at least one payoff per strategy. num_restarts : int, optional The number of random restarts to make with the optimizer. Higher numbers will give a better fit (in expectation), but will take longer. """ dev_players = ( np.maximum(game.num_role_players - np.eye(game.num_roles, dtype=int), 1) .repeat(game.num_role_strats, 0) .repeat(game.num_role_strats, 1) ) bounds = np.insert(dev_players[..., None], 0, 1, 2) # TODO Add an alpha that is smaller for points near the edge of the # simplex, accounting for the importance of minimizing error at the # extrema. means = np.empty(game.num_strats) coefs = np.empty(game.num_strats) lengths = np.empty((game.num_strats, game.num_strats)) profiles = [] alpha = [] sizes = [] for (strat, profs, pays), bound in zip(_dev_profpay(game), bounds): pay_mean = pays.mean() pays -= pay_mean reg = gp.GaussianProcessRegressor( 1.0 * gp.kernels.RBF(bound.mean(1), bound) + gp.kernels.WhiteKernel(1), n_restarts_optimizer=num_restarts, copy_X_train=False, ) reg.fit(profs, pays) means[strat] = pay_mean coefs[strat] = reg.kernel_.k1.k1.constant_value lengths[strat] = reg.kernel_.k1.k2.length_scale uprofs, inds = np.unique(utils.axis_to_elem(profs), return_inverse=True) profiles.append(utils.axis_from_elem(uprofs)) alpha.append(np.bincount(inds, reg.alpha_)) sizes.append(uprofs.size) if np.any(lengths[..., None] == bounds): warnings.warn( "some lengths were at their bounds, this may indicate a poor " "fit" ) return _RbfGpGame( game.role_names, game.strat_names, game.num_role_players, means, coefs, lengths, np.array(sizes), np.concatenate(profiles), np.concatenate(alpha), )
[docs]def rbfgame_json(json): """Read an rbf game from json""" utils.check(json["type"].split(".", 1)[0] == "rbf", "incorrect type") base = rsgame.empty_json(json) offsets = base.payoff_from_json(json["offsets"]) coefs = base.payoff_from_json(json["coefs"]) lengths = np.empty((base.num_strats,) * 2) for role, strats in json["lengths"].items(): for strat, pay in strats.items(): ind = base.role_strat_index(role, strat) base.payoff_from_json(pay, lengths[ind]) profiles = [None] * base.num_strats for role, strats in json["profiles"].items(): for strat, profs in strats.items(): ind = base.role_strat_index(role, strat) profiles[ind] = np.stack( [base.profile_from_json(p, verify=False) for p in profs] ) alphas = [None] * base.num_strats for role, strats in json["alphas"].items(): for strat, alph in strats.items(): ind = base.role_strat_index(role, strat) alphas[ind] = np.array(alph) sizes = np.fromiter( # pragma: no branch (a.size for a in alphas), int, base.num_strats ) return _RbfGpGame( base.role_names, base.strat_names, base.num_role_players, offsets, coefs, lengths, sizes, np.concatenate(profiles), np.concatenate(alphas), )
class _DeviationGame( rsgame._CompleteGame ): # pylint: disable=abstract-method,protected-access """A game that adds deviation payoffs""" def __init__(self, model_game): super().__init__( model_game.role_names, model_game.strat_names, model_game.num_role_players ) utils.check(model_game.is_complete(), "deviation models must be complete games") self.model = model_game def get_payoffs(self, profiles): return self.model.get_payoffs(profiles) def profiles(self): return self.model.profiles() def payoffs(self): return self.model.payoffs() def max_strat_payoffs(self): return self.model.max_strat_payoffs() def min_strat_payoffs(self): return self.model.min_strat_payoffs() def to_json(self): base = super().to_json() base["model"] = self.model.to_json() return base def __eq__(self, othr): return super().__eq__(othr) and self.model == othr.model @utils.memoize def __hash__(self): return hash((super().__hash__(), self.model)) class _SampleDeviationGame(_DeviationGame): """Deviation payoffs by sampling from mixture This model produces unbiased deviation payoff estimates, but they're noisy and random and take a while to compute. This is accurate in the limit as `num_samples` goes to infinity. Parameters ---------- model : DevRegressionGame A payoff model num_samples : int, optional The number of samples to use for each deviation estimate. Higher means lower variance but higher computation time. """ def __init__(self, model, num_samples=100): super().__init__(model) utils.check(num_samples > 0, "num samples must be greater than 0") # TODO It might be interesting to play with a sample schedule, i.e. # change the number of samples based off of the query number to # deviation payoffs (i.e. reduce variance as we get close to # convergence) self.num_samples = num_samples def deviation_payoffs(self, mixture, *, jacobian=False, **_): """Compute the deivation payoffs The method computes the jacobian as if we were importance sampling the results, i.e. the function is really always sample according to mixture m', but then importance sample to get the actual result.""" profs = self.random_role_deviation_profiles(self.num_samples, mixture) payoffs = self.model.get_dev_payoffs(profs) dev_pays = payoffs.mean(0) if not jacobian: return dev_pays supp = mixture > 0 weights = np.zeros(profs.shape) weights[..., supp] = profs[..., supp] / mixture[supp] jac = ( np.einsum("ij,ijk->jk", payoffs, weights.repeat(self.num_role_strats, 1)) / self.num_samples ) return dev_pays, jac def restrict(self, restriction): return _SampleDeviationGame(self.model.restrict(restriction), self.num_samples) def _add_constant(self, constant): return _SampleDeviationGame(self.model + constant, self.num_samples) def _multiply_constant(self, constant): return _SampleDeviationGame(self.model * constant, self.num_samples) def _add_game(self, othr): try: assert self.num_samples == othr.num_samples return _SampleDeviationGame(self.model + othr.model, self.num_samples) except (AttributeError, AssertionError): return NotImplemented def to_json(self): base = super().to_json() base["samples"] = self.num_samples base["type"] = "sample.1" return base def __eq__(self, othr): return super().__eq__(othr) and self.num_samples == othr.num_samples @utils.memoize def __hash__(self): return hash((super().__hash__(), self.num_samples))
[docs]def sample(game, num_samples=100): """Create a sample game from a model Parameters ---------- game : RsGame If this is a payoff model it will be used to take samples, if this is an existing deviation game, then this will use it's underlying model. num_samples : int, optional The number of samples to take. """ try: return _SampleDeviationGame(game.model, num_samples=num_samples) except AttributeError: return _SampleDeviationGame(game, num_samples=num_samples)
[docs]def sample_json(json): """Read sample game from json""" utils.check(json["type"].split(".", 1)[0] == "sample", "incorrect type") return _SampleDeviationGame( gamereader.loadj(json["model"]), num_samples=json["samples"] )
class _PointDeviationGame(_DeviationGame): """Deviation payoffs by point approximation This model computes payoffs by finding the deviation payoffs from the point estimate of the mixture. It's fast but biased. This is accurate in the limit as the number of players goes to infinity. For this work, the underlying implementation of get_dev_payoffs must support floating point profiles, which only really makes sense for regression games. For deviation payoffs to have a jacobian, the underlying model must also support a jacobian for get_dev_payoffs. Parameters ---------- model : DevRegressionGame A payoff model """ def __init__(self, model): super().__init__(model) self._dev_players = np.repeat( self.num_role_players - np.eye(self.num_roles, dtype=int), self.num_role_strats, 1, ) def deviation_payoffs(self, mixture, *, jacobian=False, **_): if not jacobian: return self.model.get_dev_payoffs(self._dev_players * mixture) dev, jac = self.model.get_dev_payoffs( self._dev_players * mixture, jacobian=True ) jac *= self._dev_players.repeat(self.num_role_strats, 0) return dev, jac def restrict(self, restriction): return _PointDeviationGame(self.model.restrict(restriction)) def _add_constant(self, constant): return _PointDeviationGame(self.model + constant) def _multiply_constant(self, constant): return _PointDeviationGame(self.model * constant) def _add_game(self, othr): try: assert isinstance(othr, _PointDeviationGame) return _PointDeviationGame(self.model + othr.model) except (AttributeError, AssertionError): return NotImplemented def to_json(self): base = super().to_json() base["type"] = "point.1" return base
[docs]def point(game): """Create a point game from a model Parameters ---------- game : RsGame If this is a payoff model it will be used to take samples, if this is an existing deviation game, then this will use it's underlying model. """ try: return _PointDeviationGame(game.model) except AttributeError: return _PointDeviationGame(game)
[docs]def point_json(json): """Read point game from json""" utils.check(json["type"].split(".", 1)[0] == "point", "incorrect type") return _PointDeviationGame(gamereader.loadj(json["model"]))
class _NeighborDeviationGame(_DeviationGame): """Create a neighbor game from a model This takes a normalized weighted estimate of the deviation payoffs by finding all profiles within `num_neighbors` of the maximum probability profile for the mixture and weighting them accordingly. This is biased, but accurate in the limit as `num_neighbors` approaches `num_players`. It also produces discontinuities every time the maximum probability profile switches. Parameters ---------- game : RsGame If this is a payoff model it will be used to take samples, if this is an existing deviation game, then this will use it's underlying model. num_neighbors : int, optional The number of deviations to take. """ def __init__(self, model, num_neighbors=2): super().__init__(model) utils.check(num_neighbors >= 0, "num devs must be nonnegative") self.num_neighbors = num_neighbors def deviation_payoffs(self, mixture, *, jacobian=False, **_): # TODO This is not smooth because there are discontinuities when the # maximum probability profile jumps at the boundary. If we wanted to # make it smooth, one option would be to compute the smoother # interpolation between this and lower probability profiles. All we # need to ensure smoothness is that the weight at profile # discontinuities is 0. profiles = self.nearby_profiles(self.max_prob_prof(mixture), self.num_neighbors) payoffs = self.get_payoffs(profiles) game = paygame.game_replace(self, profiles, payoffs) return game.deviation_payoffs( mixture, ignore_incomplete=True, jacobian=jacobian ) def restrict(self, restriction): return _NeighborDeviationGame( self.model.restrict(restriction), self.num_neighbors ) def _add_constant(self, constant): return _NeighborDeviationGame(self.model + constant, self.num_neighbors) def _multiply_constant(self, constant): return _NeighborDeviationGame(self.model * constant, self.num_neighbors) def _add_game(self, othr): try: assert self.num_neighbors == othr.num_neighbors return _NeighborDeviationGame(self.model + othr.model, self.num_neighbors) except (AttributeError, AssertionError): return NotImplemented def to_json(self): base = super().to_json() base["neighbors"] = self.num_neighbors base["type"] = "neighbor.2" return base def __eq__(self, othr): return super().__eq__(othr) and self.num_neighbors == othr.num_neighbors @utils.memoize def __hash__(self): return hash((super().__hash__(), self.num_neighbors))
[docs]def neighbor(game, num_neighbors=2): """Create a neighbor game from a model Parameters ---------- game : RsGame If this is a payoff model it will be used to take samples, if this is an existing deviation game, then this will use it's underlying model. num_neighbors : int, optional The number of deviations to explore out. """ try: return _NeighborDeviationGame(game.model, num_neighbors=num_neighbors) except AttributeError: return _NeighborDeviationGame(game, num_neighbors=num_neighbors)
[docs]def neighbor_json(json): """Read neighbor game from json""" utils.check(json["type"].split(".", 1)[0] == "neighbor", "incorrect type") return _NeighborDeviationGame( gamereader.loadj(json["model"]), num_neighbors=json.get("neighbors", json.get("devs", None)), )