"""Utilities helpful for game analysis"""
from collections import abc
import contextlib
import functools
import inspect
import itertools
import json
import math
import operator
import random
import signal
import string
import sys
import threading
import warnings
import numpy as np
from numpy.lib import stride_tricks
import scipy.special as sps
from scipy import optimize
_TINY = np.finfo(float).tiny
# This is the maximum integer that can be exactly represented as a float
_MAX_INT_FLOAT = 2 ** (np.finfo(float).nmant - 1)
_SIMPLEX_BIG = 1 / np.finfo(float).resolution
# XXX A lot of these are candidates for cython
[docs]def prod(collection):
"""Product of all elements in the collection"""
return functools.reduce(operator.mul, collection)
[docs]def comb(n, k):
"""Return n choose k
This function works on arrays, and will properly return a python integer
object if the number is too large to be stored in a 64 bit integer.
"""
# pylint: disable-msg=invalid-name
res = np.rint(sps.comb(n, k, False))
if np.all(res < _MAX_INT_FLOAT): # pylint: disable=no-else-return
return res.astype(int)
elif isinstance(n, abc.Iterable) or isinstance(k, abc.Iterable):
broad = np.broadcast(np.asarray(n), np.asarray(k))
res = np.empty(broad.shape, dtype=object)
res.flat = [sps.comb(n_, k_, True) for n_, k_ in broad]
return res
else:
return sps.comb(n, k, True)
[docs]def comb_inv(cmb, k):
"""Return the inverse of `comb`
Given a number of combinations, and the size of subset we're choosing,
compute the integer lower bound, i.e. return n* such that `comb(n*, k) <=
cmb < comb(n* + 1, k)`.
"""
# pylint: disable-msg=invalid-name
n = np.empty(np.broadcast(cmb, k).shape, int)
na = n.view()
na.shape = (n.size,)
cmba = np.broadcast_to(cmb, n.size)
ka = np.broadcast_to(k, n.size)
step = ka.copy()
mask = step > 0
na[~mask] = 0
na[mask] = np.ceil((ka[mask] / np.e * cmba[mask] ** (1 / ka[mask])).astype(float))
# If we didn't approximate the lower bound, then there are at most k values
# to check. This does a poor mans binary search with some wasted effort,
# however for small k, it's negligible, and we see performance improvements
# over linear search in general.
while np.any(mask):
valid = comb(na[mask] + step[mask], ka[mask]) <= cmba[mask]
inc = mask.copy()
inc[mask] = valid
na[inc] += step[inc]
red = mask.copy()
red[mask] = ~valid
step[red] //= 2
mask = step > 0
if n.ndim == 0: # pylint: disable=no-else-return
return n.item()
else:
return n
[docs]def game_size(players, strategies):
"""Number of profiles in a symmetric game with players and strategies"""
return comb(np.asarray(players) + strategies - 1, players)
[docs]def game_size_inv(size, players):
"""Inverse of game_size
Given a game size and a number of players, return a lower bound on the
number of strategies s* such that game_size(players, s*) <= size <
game_size(players, s* + 1)`.
"""
return comb_inv(size, players) - players + 1
[docs]def repeat(iterable, reps):
"""Repeat each element of iterable reps times"""
return itertools.chain.from_iterable(
itertools.repeat(e, r) for e, r in zip(iterable, reps)
)
[docs]def acomb(n, k, repetition=False):
"""Compute an array of all n choose k options
The result will be an array shape (m, n) where m is n choose k optionally
with repetitions."""
# pylint: disable-msg=invalid-name
if repetition: # pylint: disable=no-else-return
return _acombr(n, k)
else:
return _acomb(n, k)
def _acombr(n, k):
"""Combinations with repetitions"""
# pylint: disable-msg=invalid-name
# This uses dynamic programming to compute everything
num = sps.comb(n, k, repetition=True, exact=True)
grid = np.zeros((num, n), dtype=int)
memoized = {}
# This recursion breaks if asking for numbers that are too large (stack
# overflow), but the order to fill n and k is predictable, it may be better
# to to use a for loop.
def fill_region(n, k, region):
"""Recursively fill a region"""
if n == 1:
region[0, 0] = k
return
elif k == 0:
region.fill(0)
return
if (n, k) in memoized:
np.copyto(region, memoized[n, k])
return
memoized[n, k] = region
o = 0
for ki in range(k, -1, -1):
n_ = n - 1
k_ = k - ki
m = sps.comb(n_, k_, repetition=True, exact=True)
region[o : o + m, 0] = ki
fill_region(n_, k_, region[o : o + m, 1:])
o += m
fill_region(n, k, grid)
return grid
def _acomb(n, k):
"""Combinations"""
# pylint: disable-msg=invalid-name
if k == 0:
return np.zeros((1, n), bool)
# This uses dynamic programming to compute everything
num = sps.comb(n, k, exact=True)
grid = np.empty((num, n), dtype=bool)
memoized = {}
# This recursion breaks if asking for numbers that are too large (stack
# overflow), but the order to fill n and k is predictable, it may be better
# to to use a for loop.
def fill_region(n, k, region):
"""Recursively fill a region"""
if n <= k:
region.fill(True)
return
elif k == 1:
region.fill(False)
np.fill_diagonal(region, True)
return
if (n, k) in memoized:
np.copyto(region, memoized[n, k])
return
memoized[n, k] = region
trues = sps.comb(n - 1, k - 1, exact=True)
region[:trues, 0] = True
fill_region(n - 1, k - 1, region[:trues, 1:])
region[trues:, 0] = False
fill_region(n - 1, k, region[trues:, 1:])
fill_region(n, k, grid)
return grid
[docs]def acartesian2(*arrays):
"""Array cartesian product in 2d
Produces a new ndarray that has the cartesian product of every row in the
input arrays. The number of columns is the sum of the number of columns in
each input. The number of rows is the product of the number of rows in each
input.
Arguments
---------
*arrays : [ndarray (xi, s)]
"""
rows = prod(a.shape[0] for a in arrays)
columns = sum(a.shape[1] for a in arrays)
dtype = arrays[0].dtype # should always have at least one role
check(all(a.dtype == dtype for a in arrays), "all arrays must have the same dtype")
result = np.zeros((rows, columns), dtype)
pre_row = 1
post_row = rows
pre_column = 0
for array in arrays:
length, width = array.shape
post_row //= length
post_column = pre_column + width
view = result[:, pre_column:post_column]
view.shape = (pre_row, -1, post_row, width)
np.copyto(view, array[:, None])
pre_row *= length
pre_column = post_column
return result
[docs]def simplex_project(array):
"""Return the projection onto the simplex"""
array = np.asarray(array, float)
check(not np.isnan(array).any(), "can't project nan onto simplex: {}", array)
# This fails for really large values, so we normalize the array so the
# largest element has absolute value at most _SIMPLEX_BIG
array = np.clip(array, -_SIMPLEX_BIG, _SIMPLEX_BIG)
size = array.shape[-1]
sort = -np.sort(-array, -1)
rho = (1 - sort.cumsum(-1)) / np.arange(1, size + 1)
inds = size - 1 - np.argmax((rho + sort > 0)[..., ::-1], -1)
rho.shape = (-1, size)
lam = rho[np.arange(rho.shape[0]), inds.flat]
lam.shape = array.shape[:-1] + (1,)
return np.maximum(array + lam, 0)
[docs]def multinomial_mode(p, n):
"""Compute the mode of n samples from multinomial distribution p.
Notes
-----
Algorithm from [3]_, notation follows [4]_.
.. [3] Finucan 1964. The mode of a multinomial distribution.
.. [4] Gall 2003. Determination of the modes of a Multinomial distribution.
"""
# pylint: disable-msg=invalid-name
p = np.asarray(p, float)
mask = p > 0
result = np.zeros(p.size, int)
p = p[mask]
f = p * (n + p.size / 2)
k = f.astype(int)
f -= k
n0 = k.sum()
if n0 < n:
q = (1 - f) / (k + 1)
for _ in range(n0, n):
a = q.argmin()
k[a] += 1
f[a] -= 1
q[a] = (1 - f[a]) / (k[a] + 1)
elif n0 > n:
with np.errstate(divide="ignore"):
q = f / k
for _ in range(n, n0):
a = q.argmin()
k[a] -= 1
f[a] += 1
q[a] = f[a] / k[a]
result[mask] = k
return result
[docs]def geometric_histogram(num, prob):
"""Return the histogram of n draws from a geometric distribution
This function computes values from the same distribution as
`np.bincount(np.random.geometric(prob, num) - 1)` but does so more
efficiently.
"""
check(num > 0, "must take at least one sample")
check(0 < prob <= 1, "must use a valid probability in (0, 1]")
results = []
# This is a rough upper bound on the expectation of the extreme value of
# num geometrics with probability prob
inc = math.ceil((np.log(num) + 1) * (1 / prob - 0.5)) + 1
while num > 0:
res = np.random.multinomial(num, prob * (1 - prob) ** np.arange(inc))
results.append(res[:-1])
num = res[-1]
# Remove trailing zeros
last = results.pop()
results.append(last[: np.flatnonzero(last)[-1] + 1])
return np.concatenate(results)
[docs]def axis_to_elem(array, axis=-1):
"""Converts an axis of an array into a unique element
In general, this returns a copy of the array, unless the data is
contiguous. This usually requires that the last axis is the one being
merged.
Parameters
----------
array : ndarray
The array to convert an axis to a view.
axis : int, optional
The axis to convert into a single element. Defaults to the last axis.
"""
array = np.asarray(array)
# ascontiguousarray will make a copy if necessary
axis_at_end = np.ascontiguousarray(np.rollaxis(array, axis, array.ndim))
new_shape = axis_at_end.shape
elems = axis_at_end.view([("axis", array.dtype, new_shape[-1:])])
elems.shape = new_shape[:-1]
return elems
[docs]def axis_from_elem(array, axis=-1):
"""Converts and array of axis elements back to an axis"""
return np.rollaxis(array["axis"], -1, axis)
[docs]def hash_array(array):
"""Hash an array"""
return _HashArray(array)
class _HashArray(object): # pylint: disable=too-few-public-methods
"""A hashed array object"""
def __init__(self, array):
self.array = np.asarray(array)
self.array.setflags(write=False)
self._hash = hash(self.array.tobytes())
def __hash__(self):
return self._hash
def __eq__(self, othr):
# pylint: disable-msg=protected-access
return self._hash == othr._hash and np.all(self.array == othr.array)
[docs]def iunique(iterable):
"""Return an iterable of unique items ordered by first occurrence"""
seen = set()
for item in iterable:
if item not in seen:
seen.add(item)
yield item
[docs]def random_strings(min_length, max_length=None, digits=string.ascii_lowercase):
"""Return a random string
Parameters
----------
min_length : int
The minimum length string to return.
max_length : int, optional
The maximum length string to return. If None or unspecified, this is
the same as min_length.
num : int, optional
The number of strings to return. If None or unspecified this returns a
single string, otherwise it returns a generator with length `num`.
digits : str, optional
The optional digits to select from.
"""
if max_length is None:
max_length = min_length
check(min_length <= max_length, "max_length can't be less than min_length")
while True:
length = random.randint(min_length, max_length)
yield "".join(random.choice(digits) for _ in range(length))
[docs]def prefix_strings(prefix, num):
"""Returns a list of prefixed integer strings"""
padding = int(math.log10(max(num - 1, 1))) + 1
return ("{}{:0{:d}d}".format(prefix, i, padding) for i in range(num))
[docs]def is_sorted(iterable, *, key=None, reverse=False, strict=False):
"""Returns true if iterable is sorted
Parameters
----------
iterable : iterable
The iterable to check for sorted-ness.
key : x -> y, optional
Apply mapping function key to iterable prior to checking. This can be
done before calling, but this ensures identical calls as sorted.
reverse : bool, optional
`key` and `reverse` function as they for `sorted`"""
if key is not None:
iterable = map(key, iterable)
ait, bit = itertools.tee(iterable)
next(bit, None) # Don't throw error if empty
if strict and reverse: # pylint: disable=no-else-return
return all(a > b for a, b in zip(ait, bit))
elif reverse:
return all(a >= b for a, b in zip(ait, bit))
elif strict:
return all(a < b for a, b in zip(ait, bit))
else:
return all(a <= b for a, b in zip(ait, bit))
[docs]def allequal_perm(aarr, barr):
"""Test if two arrays are equal for any permutation of their first axis
Parameters
----------
aarr, barr : array_like
Identically sized arrays. This will determine if any permutation of
their first axis will make them equal.
"""
aarr, barr = map(np.asarray, [aarr, barr])
check(aarr.shape == barr.shape, "can only compare identically sized arrays")
auniq, acounts = np.unique(
axis_to_elem(aarr.reshape(aarr.shape[0], -1)), return_counts=True
)
buniq, bcounts = np.unique(
axis_to_elem(barr.reshape(barr.shape[0], -1)), return_counts=True
)
return np.all(acounts == bcounts) and np.all(auniq == buniq)
[docs]def allclose_perm(aarr, barr, **kwargs):
"""allclose but for any permutation of aarr and barr
Parameters
----------
aarr, barr : array_like
Identically sized arrays of floats. This will determine if any
permutation of their first axis will make `allclose` true.
kwargs
Additional arguments to pass to `isclose`.
"""
aarr, barr = map(np.asarray, [aarr, barr])
check(aarr.shape == barr.shape, "can only compare identically sized arrays")
isclose = np.isclose(aarr[:, None], barr, **kwargs)
isclose.shape = isclose.shape[:2] + (-1,)
return isclose[optimize.linear_sum_assignment(~isclose.all(2))].all()
[docs]def check(condition, message, *args, **kwargs):
"""Check state and raise exception if not valid"""
if not condition:
raise ValueError(message.format(*args, **kwargs))
[docs]def memoize(member_function):
"""Memoize computation of single object functions"""
check(
len(inspect.signature(member_function).parameters) == 1,
"can only memoize single object functions",
)
@functools.wraps(member_function)
def new_member_function(obj):
"""Memoized member function"""
name = "__{}_{}".format(member_function.__name__, obj.__class__.__name__)
if not hasattr(obj, name):
setattr(obj, name, member_function(obj))
return getattr(obj, name)
return new_member_function
[docs]def deprecated(func):
"""Mark a function as deprecated"""
@functools.wraps(func)
def wrapped(*args, **kwargs):
"""Deprecated function"""
warnings.warn(
"Call to deprecated function {}.".format(func.__name__),
category=DeprecationWarning,
stacklevel=2,
)
return func(*args, **kwargs)
return wrapped
[docs]def subsequences(iterable, seq=2):
"""Return subsequences of an iterable
Each element in the generator will be a tuple of `seq` elements that were
ordered in the iterable.
"""
iters = itertools.tee(iterable, seq)
for i, itr in enumerate(iters):
for _ in range(i):
next(itr, None)
return zip(*iters)
[docs]def asubsequences(array, seq=2, axis=0):
"""Return sub-sequences of an array
This returns a new array with leading dimension `seq` representing a length
`seq` sub-sequence of the input array. The following dimensions are
preserved except for `axis`, which is `seq - 1` shorter due to edge
effects. This method returns a view, so no data copying happens.
Parameters
----------
array : ndarray
Array to take subsequences over.
seq : int, optional
Length of subsequences to take.
axis : int, optional
The axis to treat as the sequence to take sub-sequences of.
"""
new_shape = list(array.shape)
new_shape[axis] -= seq - 1
return stride_tricks.as_strided(
array, shape=[seq] + new_shape, strides=(array.strides[axis],) + array.strides
)
def _raise_handler(exception):
"""Create handler for exception"""
def handler(_signum, _frame):
"""Raise timeout error"""
raise exception
return handler
def _timeout_context(seconds, exception):
"""Timeout context manager that works in parent or child thread"""
if seconds is None or seconds <= 0: # pylint: disable=no-else-return
return _noop()
elif threading.current_thread() == threading.main_thread():
return _timeout_context_main(seconds, exception)
else:
return _timeout_context_child(seconds, exception)
@contextlib.contextmanager
def _noop():
"""Noop for irrelevant time"""
yield
@contextlib.contextmanager
def _timeout_context_main(seconds, exception):
"""Timeout context manager for main thread"""
old = signal.signal(signal.SIGALRM, _raise_handler(exception))
signal.setitimer(signal.ITIMER_REAL, seconds)
yield
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, old)
@contextlib.contextmanager
def _timeout_context_child(seconds, exception):
"""Timeout context manager for child thread"""
timedout = threading.Event()
complete = threading.Event()
def globaltrace(_frame, why, _arg): # pragma: no cover
"""Global trace to set local trace"""
if why == "call": # pylint: disable=no-else-return
return localtrace
else:
return None
def localtrace(_frame, why, _arg): # pragma: no cover
"""Local trace to check for timeout"""
if complete.is_set(): # pylint: disable=no-else-return
return None
elif timedout.is_set() and why == "line":
complete.set()
raise exception
else:
return localtrace
timer = threading.Timer(seconds, timedout.set)
timer.start()
sys.settrace(globaltrace)
yield
# These are executed, but coverage misses them, potentially due to messing
# with the trace
complete.set() # pragma: no cover
timer.cancel() # pragma: no cover
class _Timeout(object): # pylint: disable=too-few-public-methods
"""Timeout class"""
def __init__(self, seconds, exception):
self._create = functools.partial(_timeout_context, seconds, exception)
self._seconds = seconds
self._exception = exception
self._context = None
def __enter__(self):
self._context = self._create()
self._context.__enter__() # pylint: disable=no-member
def __exit__(self, typ, value, traceback):
self._context.__exit__(typ, value, traceback) # pylint: disable=no-member
self._context = None
def __call__(self, func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
"""Wrapped function"""
with self._create():
return func(*args, **kwargs)
return wrapped
[docs]def timeout(seconds=None, exception=TimeoutError):
"""Timeout code
A TimeoutError is raised if code doesn't finish within seconds. The result
can be used as a decorator or as a context manager. This will work in child
threads, but as a significant performance hit.
Parameters
----------
seconds : int, optional
The number of seconds until timeout. If omitted, no timeout is used,
and this is essentially a no op
exception : Exception, optional
The exception to raise. IF omitted, a generic TimeoutError is used.
"""
return _Timeout(seconds, exception)
[docs]def iloads(jstring, **kwargs):
"""Load a string as a generator of json objects"""
decoder = json.JSONDecoder(**kwargs)
idx = json.decoder.WHITESPACE.match(jstring, 0).end()
while idx < len(jstring):
obj, end = decoder.raw_decode(jstring, idx)
yield obj
idx = json.decoder.WHITESPACE.match(jstring, end).end()
[docs]def iload(jfp, **kwargs):
"""Load a file as a generator of json objects"""
return iloads(jfp.read(), **kwargs)