Source code for egta.trace

"""Module for tracing continuous equilibria"""
import asyncio
import functools
import itertools
import logging

import numpy as np
from scipy.sparse import csgraph
from gameanalysis import regret
from gameanalysis import rsgame
from gameanalysis import trace
from gameanalysis import utils

from egta import asyncgame
from egta import innerloop


[docs]async def trace_all_equilibria( # pylint: disable=too-many-locals agame0, agame1, *, regret_thresh=1e-3, dist_thresh=0.1, max_step=0.1, executor=None, **innerloop_args ): """Trace out all equilibria between all games Parameters ---------- agame0 : AsyncGame The game that is played when time is 0. agame1 : AsyncGame The game that is played when time is 1. regret_thresh : float, optional The threshold for epsilon regret for equilibria returned. exectutor : Executor, optional The executor to run computation intensive operations in. """ utils.check( rsgame.empty_copy(agame0) == rsgame.empty_copy(agame1), "games must have same structure", ) loop = asyncio.get_event_loop() trace_args = dict(regret_thresh=regret_thresh, max_step=max_step) innerloop_args.update( executor=executor, regret_thresh=regret_thresh, dist_thresh=dist_thresh ) async def trace_eqm(eqm, prob): """Trace and equilibrium out from prob""" game0 = agame0.get_game() game1 = agame1.get_game() (pr0, eqa0), (pr1, eqa1) = await asyncio.gather( loop.run_in_executor( executor, functools.partial( trace.trace_equilibrium, game0, game1, prob, eqm, 0, **trace_args ), ), loop.run_in_executor( executor, functools.partial( trace.trace_equilibrium, game0, game1, prob, eqm, 1, **trace_args ), ), ) return ( np.concatenate([pr0[::-1], pr1[1:]]), np.concatenate([eqa0[::-1], eqa1[1:]]), ) async def trace_between(lower, upper): """Trace between times lower and upper""" if upper <= lower: return () mid = (lower + upper) / 2 midgame = asyncgame.mix(agame0, agame1, mid) eqa = await innerloop.inner_loop(midgame, **innerloop_args) if not eqa.size: # pragma: no cover logging.warning("found no equilibria in %s", midgame) return () traces = await asyncio.gather(*[trace_eqm(eqm, mid) for eqm in eqa]) lupper = min(t[0] for t, _ in traces) ulower = max(t[-1] for t, _ in traces) logging.warning("traced %s out to %g - %g", midgame, lupper, ulower) lower_traces, upper_traces = await asyncio.gather( trace_between(lower, lupper), trace_between(ulower, upper) ) # Lazily extend them return itertools.chain(lower_traces, traces, upper_traces) traces = list(await trace_between(0.0, 1.0)) game0, game1 = agame0.get_game(), agame1.get_game() traces = _merge_traces(game0, game1, traces, dist_thresh, trace_args) for probs, eqa in traces: _smooth_trace(game0, game1, probs, eqa, trace_args) _smooth_trace(game0, game1, probs[::-1], eqa[::-1], trace_args) return sorted(traces, key=lambda tr: (tr[0][0], tr[0][-1]))
def _trace_distance(game0, game1, trace1, trace2, trace_args): """Compute the distance between traces This uses interpolation to estimate each trace at arbitrary points in time and then computes the average time-weighted norm between the traces.""" time1, eqa1 = trace1 time2, eqa2 = trace2 if time1[-1] < time2[0] or time2[-1] < time1[0]: return np.inf tmin = max(time1[0], time2[0]) tmax = min(time1[-1], time2[-1]) # XXX This sorted merge could be more efficient times = np.concatenate( [ time1[(tmin <= time1) & (time1 <= tmax)], time2[(tmin <= time2) & (time2 <= tmax)], ] ) times.sort() eqa1i = trace.trace_interpolate(game0, game1, time1, eqa1, times, **trace_args) eqa2i = trace.trace_interpolate(game0, game1, time2, eqa2, times, **trace_args) errs = np.linalg.norm(eqa1i - eqa2i, axis=1) return np.diff(times).dot(errs[:-1] + errs[1:]) / 2 / (tmax - tmin) def _merge_traces( game0, game1, traces, thresh, trace_args ): # pylint: disable=too-many-locals """Merge a list of traces Parameters ---------- traces : [(ts, eqa)] A list of traces, which are themselves tuples of times and equilibria. thresh : float How similar traces need to be in order to be merged. This is the average norm between the traces. interp : str The way to interpolate between equilibria. This is passed to interp1d. """ distances = np.zeros([len(traces)] * 2) for i, trace1 in enumerate(traces): for j, trace2 in enumerate(traces[:i]): distances[i, j] = distances[j, i] = _trace_distance( game0, game1, trace1, trace2, trace_args ) num, comps = csgraph.connected_components(distances <= thresh, False) new_traces = [] for i in range(num): times = np.concatenate([t for (t, _), m in zip(traces, comps == i) if m]) eqa = np.concatenate([eqms for (_, eqms), m in zip(traces, comps == i) if m]) inds = np.argsort(times) new_traces.append((times[inds], eqa[inds])) return new_traces def _smooth_trace(game0, game1, probs, eqa, trace_args): """Smooth the equilibria in a trace in place Smoothing attempts to trace out from one time to an adjacent time. If the new point has lower regret, it's taken instead. This onle goes one direction, so it should be repeated for reversed views. """ for (pfrom, pto), (eqmfrom, eqmto) in zip( utils.subsequences(probs), utils.subsequences(eqa) ): (*_, pres), ( *_, eqmres, ) = trace.trace_equilibrium( # pylint: disable=too-many-star-expressions game0, game1, pfrom, eqmfrom, pto, **trace_args ) if np.isclose(pres, pto): mixgame = rsgame.mix(game0, game1, pto) regto = regret.mixture_regret(mixgame, eqmto) regres = regret.mixture_regret(mixgame, eqmres) if regres < regto: np.copyto(eqmto, eqmres)