Source code for egta.innerloop

"""module for doing innerloop procedures"""
import asyncio
import functools
import heapq
import logging

import numpy as np
from gameanalysis import collect
from gameanalysis import nash
from gameanalysis import regret
from gameanalysis import restrict


# TODO There's something to be said about individual rationality constraints
# relative to best deviation, i.e. if all are positive gains, but negative
# payoff, that might mean we should warn, or at least explore something else.
# TODO Restrict size should maybe be a num strats per role and set such that
# and restriction that has less than the number of profiles is valid. For
# backwards compatibility, it'd be nice to have a good way to interpret a
# single number for games with multiple roles, potentially the division that
# produces the maximum number of profiles?
# TODO Schedule restriction should only schedule if equilibria < num or haven't
# started queuing backups
# TODO It'd be cool to have an option to explore better responses proportional
# to gain instead of best response. This would require a deterministic random
# seed in order to get consistent output.
# TODO in failure conditions, some tasks are left hanging, instead we should be
# cancelling, and awaiting them so they're not left running
[docs]async def inner_loop( # pylint: disable=too-many-locals agame, *, initial_restrictions=None, regret_thresh=1e-3, dist_thresh=0.1, support_thresh=1e-4, restricted_game_size=3, num_equilibria=1, num_backups=1, devs_by_role=False, style="best", executor=None ): """Inner loop a game using a scheduler Parameters ---------- game : RsGame The game to find equilibria in. This function is most useful when game is a SchedulerGame, but any complete RsGame will work. initial_restriction : [[bool]], optional Initial restrictions to start inner loop from. If unspecified, every pure restriction is used. regret_thresh : float > 0, optional The maximum regret to consider an equilibrium an equilibrium. dist_thresh : float > 0, optional The minimum norm between two mixed profiles for them to be considered distinct. support_thresh : float > 0, optional Candidate equilibria strategies with probability lower than this will be truncated. This is useful because often Nash finding returns strategies with very low support, which now mean extra deviating strategies need to be sampled. Trimming saves these samples, but may increase regret above the threshold. restricted_game_size : int > 0, optional The maximum restricted game support size with which beneficial deviations must be explored. Restricted games with support larger than this are queued and only explored in the event that no equilibrium can be found in beneficial deviations smaller than this. num_equilibria : int > 0, optional The number of equilibria to attempt to find. Only one is guaranteed, but this might be beneifical if the game has a known degenerate equilibria, but one which is still helpful as a deviating strategy. num_backups : int > 0, optional In the event that no equilibrium can be found in beneficial deviations to small restricted games, other restrictions will be explored. This parameter indicates how many restricted games for each role should be explored. devs_by_role : boolean, optional If specified, deviations will only be explored for each role in order, proceeding to the next role only when no beneficial deviations are found. This can reduce the number of profiles sampled, but may also fail to find certain equilibria due to the different path through restricted games. style : string, optional A string describing the thoroughness of equilibrium finding. Seed `nash.mixed_equilibria` for options and a description. executor : Executor, optional The executor to be used for Nash finding. The default setting will allow async networking calls to continue to happen during long nash finding, but buy using a process pool this can take advantage of parallel computation. """ init_role_dev = 0 if devs_by_role else None exp_restrictions = collect.bitset(agame.num_strats) backups = [[] for _ in range(agame.num_roles)] equilibria = collect.mcces(dist_thresh) loop = asyncio.get_event_loop() async def add_restriction(rest): """Adds a restriction to be evaluated""" if not exp_restrictions.add(rest): return # already explored if agame.is_pure_restriction(rest): # Short circuit for pure restriction return await add_deviations(rest, rest.astype(float), init_role_dev) data = await agame.get_restricted_game(rest) reqa = await loop.run_in_executor( executor, functools.partial( nash.mixed_equilibria, data, regret_thresh=regret_thresh, dist_thresh=dist_thresh, style=style, processes=1, ), ) if reqa.size: eqa = restrict.translate( data.trim_mixture_support(reqa, thresh=support_thresh), rest ) await asyncio.gather( *[add_deviations(rest, eqm, init_role_dev) for eqm in eqa] ) else: logging.warning( "couldn't find equilibria in %s with restriction %s. This is " "likely due to high variance in payoffs which means " "quiesce should be re-run with more samples per profile. " "This could also be fixed by performing a more expensive " "equilibria search to always return one.", agame, agame.restriction_to_repr(rest), ) async def add_deviations(rest, mix, role_index): """Add deviations to be evaluated""" # We need the restriction here, since trimming support may increase # regret of strategies in the initial restriction data = await agame.get_deviation_game(mix > 0, role_index) devs = data.deviation_payoffs(mix) exp = np.add.reduceat(devs * mix, agame.role_starts) gains = devs - exp.repeat(agame.num_role_strats) if role_index is None: if np.all((gains <= regret_thresh) | rest): # Found equilibrium reg = gains.max() if equilibria.add(mix, reg): logging.warning( "found equilibrium %s in game %s with regret %f", agame.mixture_to_repr(mix), agame, reg, ) else: await asyncio.gather( *[ queue_restrictions(rgains, ri, rest) for ri, rgains in enumerate( np.split(gains, agame.role_starts[1:]) ) ] ) else: # Set role index rgains = np.split(gains, agame.role_starts[1:])[role_index] rrest = np.split(rest, agame.role_starts[1:])[role_index] if np.all((rgains <= regret_thresh) | rrest): # No deviations role_index += 1 if role_index < agame.num_roles: # Explore next deviation await add_deviations(rest, mix, role_index) else: # found equilibrium # This should not require scheduling as to get here all # deviations have to be scheduled data = await agame.get_deviation_game(mix > 0) reg = regret.mixture_regret(data, mix) if equilibria.add(mix, reg): logging.warning( "found equilibrium %s in game %s with regret %f", agame.mixture_to_repr(mix), agame, reg, ) else: await queue_restrictions(rgains, role_index, rest) async def queue_restrictions(role_gains, role_index, rest): """Queue new restrictions appropriately""" role_rest = np.split(rest, agame.role_starts[1:])[role_index] if role_rest.all(): return # Can't deviate rest_size = rest.sum() role_start = agame.role_starts[role_index] best_resp = np.nanargmax(np.where(role_rest, np.nan, role_gains)) if role_gains[best_resp] > regret_thresh and rest_size < restricted_game_size: br_sub = rest.copy() br_sub[role_start + best_resp] = True await add_restriction(br_sub) else: best_resp = None # Add best response to backup back = backups[role_index] for strat_ind, (gain, role) in enumerate(zip(role_gains, role_rest)): if strat_ind == best_resp or role or gain <= 0: continue sub = rest.copy() sub[role_start + strat_ind] = True heapq.heappush(back, (-gain, id(sub), sub)) # id for tie-breaking restrictions = ( agame.pure_restrictions() if initial_restrictions is None else np.asarray(initial_restrictions, bool) ) iteration = 0 while len(equilibria) < num_equilibria and ( any(q for q in backups) or not next(iter(exp_restrictions)).all() ): if iteration == 1: logging.warning( "scheduling backup restrictions in game %s. This only happens " "when quiesce criteria could not be met with current " "maximum restriction size (%d). This probably means that " "the maximum restriction size should be increased. If " "this is happening frequently, increasing the number of " "backups taken at a time might be desired (currently %s).", agame, restricted_game_size, num_backups, ) elif iteration > 1: logging.info("scheduling backup restrictions in game %s", agame) await asyncio.gather(*[add_restriction(r) for r in restrictions]) restrictions = collect.bitset(agame.num_strats, exp_restrictions) for role, back in enumerate(backups): unscheduled = num_backups while unscheduled > 0 and back: rest = heapq.heappop(back)[-1] unscheduled -= restrictions.add(rest) for _ in range(unscheduled): added = False for mask in restrictions: rmask = np.split(mask, agame.role_starts[1:])[role] if rmask.all(): continue rest = mask.copy() # TODO We could randomize instead of taking the first # strategy, but this would remove reproducability unless it # was somehow predicated on all of the explored # restrictions or something... strat = np.split(rest, agame.role_starts[1:])[role].argmin() rest[agame.role_starts[role] + strat] = True restrictions.add(rest) added = True break if not added: break iteration += 1 # Return equilibria if equilibria: # pylint: disable=no-else-return return np.stack([eqm for eqm, _ in equilibria]) else: return np.empty((0, agame.num_strats)) # pragma: no cover