"""Python package to handle python interface to egta online api"""
# pylint: disable=too-many-lines
import asyncio
import base64
import collections
import copy
import functools
import hashlib
import itertools
import json
import logging
from os import path
import inflection
import jsonschema
import requests
from lxml import etree
_AUTH_FILE = '.egta_auth_token'
_SEARCH_PATH = [_AUTH_FILE, path.expanduser(path.join('~', _AUTH_FILE))]
# TODO Add simulation object
# TODO Add a json schema for every object, and have every method validate
# FIXME Change asserts to ValueErrors
def _load_auth_token(auth_token):
"""Load an authorization token"""
if auth_token is not None: # pragma: no cover
return auth_token
for file_name in _SEARCH_PATH: # pragma: no branch
if path.isfile(file_name):
with open(file_name) as fil:
return fil.read().strip()
return '<no auth_token supplied or found in any of: {}>'.format( # pragma: no cover pylint: disable=line-too-long
', '.join(_SEARCH_PATH))
def _encode_data(data):
"""Takes data in nested dictionary form, and converts it for egta
All dictionary keys must be strings. This call is non destructive.
"""
encoded = {}
for k, val in data.items():
if isinstance(val, dict):
for inner_key, inner_val in _encode_data(val).items():
encoded['{0}[{1}]'.format(k, inner_key)] = inner_val
else:
encoded[k] = val
return encoded
class _Base(dict):
"""A base api object"""
def __init__(self, session, *args, **kwargs):
super().__init__(*args, **kwargs)
self._sess = session
assert 'id' in self
class _EgtaOnlineSession(object): # pylint: disable=too-many-instance-attributes
"""Object that holds the egta online session
This object is private to hide private request methods."""
def __init__( # pylint: disable=too-many-arguments
self, auth_token, domain, retry_on, num_tries, retry_delay,
retry_backoff, executor):
self.domain = domain
self.auth_token = _load_auth_token(auth_token)
self._retry_on = frozenset(retry_on)
self._num_tries = num_tries
self._retry_delay = retry_delay
self._retry_backoff = retry_backoff
self._executor = executor
self._loop = asyncio.get_event_loop()
self._session = None
async def aopen(self):
"""Open the requester"""
assert self._session is None
self._session = requests.Session()
# This authenticates us for the duration of the session
resp = self._session.get(
'https://{domain}'.format(domain=self.domain),
data={'auth_token': self.auth_token})
resp.raise_for_status()
assert '<a href="/users/sign_in">Sign in</a>' not in resp.text, \
"Couldn't authenticate with auth_token: '{}'".format(
self.auth_token)
async def aclose(self):
"""Close the requester"""
if self._session is not None: # pragma: no branch
self._session.close()
self._session = None
async def retry_request(self, verb, url, data):
"""Make a request, retying if it fails"""
data = _encode_data(data)
response = None
timeout = self._retry_delay
for _ in range(self._num_tries):
logging.debug('%s request to %s with data %s', verb, url, data)
try:
response = await self._loop.run_in_executor(
self._executor, functools.partial(
self._session.request, verb, url, data=data))
if response.status_code not in self._retry_on:
response.raise_for_status()
logging.debug('response "%s"', response.text)
return response
logging.debug(
'%s request to %s with data %s failed with status'
'%d, retrying in %.0f seconds', verb, url, data,
response.status_code, timeout) # pragma: no cover
except ConnectionError as ex: # pragma: no cover
logging.debug(
'%s request to %s with data %s failed with '
'exception %s %s, retrying in %.0f seconds', verb,
url, data, ex.__class__.__name__, ex, timeout)
logging.debug( # pragma: no cover
'sleeping %d due to connection error', timeout)
await asyncio.sleep(timeout) # pragma: no cover
timeout *= self._retry_backoff # pragma: no cover
# TODO catch session level errors and reinitialize it
raise ConnectionError() # pragma: no cover
async def request(self, verb, endpoint, data=None):
"""Convenience method for making requests"""
url = 'https://{domain}/api/v3/{endpoint}'.format(
domain=self.domain, endpoint=endpoint)
return await self.retry_request(verb, url, data or {})
async def json_validate_request(self, schema, verb, endpoint, data=None):
"""Convenience method for making validated json requests"""
sleep = self._retry_delay
exception = ValueError("shouldn't ever call this")
for _ in range(self._num_tries): # pragma: no branch
resp = await self.request(verb, endpoint, data)
try:
jresp = resp.json()
jsonschema.validate(jresp, schema)
return jresp
except (json.decoder.JSONDecodeError,
jsonschema.ValidationError) as ex:
exception = ex
logging.debug(
'sleeping %d due to invalid json', sleep)
await asyncio.sleep(sleep)
sleep *= self._retry_backoff
raise exception
async def non_api_request(self, verb, endpoint, data=None):
"""Make a standard request instead of hitting the api"""
url = 'https://{domain}/{endpoint}'.format(
domain=self.domain, endpoint=endpoint)
return await self.retry_request(verb, url, data or {})
async def json_non_api_request(
self, schema, verb, endpoint, data=None):
"""non api request for json"""
sleep = self._retry_delay
exception = ValueError("shouldn't ever call this")
for _ in range(self._num_tries): # pragma: no branch
resp = await self.non_api_request(verb, endpoint, data)
try:
jresp = resp.json()
jsonschema.validate(jresp, schema)
return jresp
except (json.decoder.JSONDecodeError,
jsonschema.ValidationError) as ex:
exception = ex
logging.debug(
'sleeping %d due to invalid json', sleep)
await asyncio.sleep(sleep)
sleep *= self._retry_backoff
raise exception
async def html_non_api_request(self, verb, endpoint, data=None):
"""non api request for xml"""
resp = await self.non_api_request(verb, endpoint, data)
return etree.HTML(resp.text)
# The following methods are used by several "objects" and so they are in
# session object for easy access
async def get_simulators(self):
"""Get a generator of all simulators"""
resp = await self.request('get', 'simulators')
return [_Simulator(self, s) for s in resp.json()['simulators']]
async def get_simulator_fullname(self, fullname):
"""Get a simulator with its full name"""
for sim in await self.get_simulators():
if '{}-{}'.format(sim['name'], sim['version']) == fullname:
return sim
assert False, 'No simulator found for full name {}'.format(
fullname)
async def create_generic_scheduler( # pylint: disable=too-many-arguments
self, sim_id, name, active, process_memory, size,
time_per_observation, observations_per_simulation, nodes,
configuration):
"""Creates a generic scheduler and returns it"""
resp = await self.request(
'post',
'generic_schedulers',
data={'scheduler': {
'simulator_id': sim_id,
'name': name,
'active': int(active),
'process_memory': process_memory,
'size': size,
'time_per_observation': time_per_observation,
'observations_per_simulation': observations_per_simulation,
'nodes': nodes,
'default_observation_requirement': 0,
'configuration': configuration,
}})
return _Scheduler(self, resp.json())
async def get_games(self):
"""Get a generator of all games"""
resp = await self.request('get', 'games')
return [_Game(self, g) for g in resp.json()['games']]
async def get_game(self, game_id):
"""Get a game from an id"""
return await _Game(self, id=game_id).get_structure()
async def create_game(self, sim_id, name, size, configuration):
"""Creates a game and returns it"""
resp = await self.html_non_api_request(
'post',
'games',
data={
'auth_token': self.auth_token, # Necessary for some reason
'game': {
'name': name,
'size': size,
},
'selector': {
'simulator_id': sim_id,
'configuration': configuration,
},
})
game_id = int(resp.xpath('//div[starts-with(@id, "game_")]')[0]
.attrib['id'][5:])
return await self.get_game(game_id)
async def get_canon_game(
self, sim_id, symgrps, configuration):
"""Get the canonicalized game"""
digest = hashlib.sha512()
digest.update(str(sim_id).encode('utf8'))
for role, count, strats in sorted(symgrps):
digest.update(b'\0\0')
digest.update(role.encode('utf8'))
digest.update(b'\0')
digest.update(str(count).encode('utf8'))
for strat in sorted(strats):
digest.update(b'\0')
digest.update(strat.encode('utf8'))
digest.update(b'\0')
for key, value in sorted(configuration.items()):
digest.update(b'\0\0')
digest.update(key.encode('utf8'))
digest.update(b'\0')
digest.update(str(value).encode('utf8'))
name = base64.b64encode(digest.digest()).decode('utf8')
size = sum(p for _, p, _ in symgrps)
for game in await self.get_games():
if game['name'] != name:
continue
assert game['size'] == size, \
'A hash collision happened'
return game
game = await self.create_game(sim_id, name, size, configuration)
await game.add_symgroups(symgrps)
return game
class _EgtaOnlineApi(object):
"""Class that allows access to an Egta Online server
This can be used as context manager to automatically close the active
session."""
def __init__( # pylint: disable=too-many-arguments
self, auth_token=None, domain='egtaonline.eecs.umich.edu',
retry_on=(504,), num_tries=20, retry_delay=20, retry_backoff=1.2,
executor=None):
self.domain = domain
self._sess = _EgtaOnlineSession(
auth_token, domain, retry_on, num_tries, retry_delay,
retry_backoff, executor)
async def aopen(self):
"""Open the api"""
try:
await self._sess.aopen()
except Exception as ex:
await self.aclose()
raise ex
async def aclose(self):
"""Close the api"""
await self._sess.aclose()
async def __aenter__(self):
await self.aopen()
return self
async def __aexit__(self, *args):
await self.aclose()
async def get_simulators(self):
"""Get a generator of all simulators"""
return await self._sess.get_simulators()
async def get_simulator(self, sim_id):
"""Get a simulator with an id"""
return await _Simulator(self._sess, id=sim_id).get_info()
async def get_simulator_fullname(self, fullname):
"""Get a simulator with its full name
A full name is <name>-<version>."""
return await self._sess.get_simulator_fullname(fullname)
async def get_generic_schedulers(self):
"""Get a generator of all generic schedulers"""
resp = await self._sess.request('get', 'generic_schedulers')
return [_Scheduler(self._sess, s) for s in
resp.json()['generic_schedulers']]
async def get_scheduler(self, sched_id):
"""Get a scheduler with an id"""
return await _Scheduler(self._sess, id=sched_id).get_info()
async def get_scheduler_name(self, name):
"""Get a scheduler from its names"""
for sched in await self.get_generic_schedulers():
if sched['name'] == name:
return sched
assert False, 'No scheduler found for name {}'.format(
name)
async def create_generic_scheduler( # pylint: disable=too-many-arguments
self, sim_id, name, active, process_memory, size,
time_per_observation, observations_per_simulation, nodes=1,
configuration=None):
"""Creates a generic scheduler and returns it
Parameters
----------
sim_id : int
The simulator id for this scheduler.
name : str
The name for the scheduler.
active : boolean
True or false, specifying whether the scheduler is initially
active.
process_memory : int
The amount of memory in MB that your simulations need.
size : int
The number of players for the scheduler.
time_per_observation : int
The time you require to take a single observation in seconds.
observations_per_simulation : int
The maximum number of observations to take per simulation run. If a
profile is added with fewer observations than this, they will all
be scheduled at once, if more, then this number will be scheduler,
and only after they complete successfully will more be added.
nodes : int, optional
The number of nodes required to run one of your simulations. If
unsure, this should be 1.
configuration : {str: str}, optional
A dictionary representation that sets all the run-time parameters
for this scheduler. This bypasses the simulator default to just set
the configuration specified. To use the simulator default, you must
first get the configuration from the simulator object."""
return await self._sess.create_generic_scheduler(
sim_id, name, active, process_memory, size, time_per_observation,
observations_per_simulation, nodes, configuration or {})
async def get_games(self):
"""Get a generator of all games"""
return await self._sess.get_games()
async def get_game(self, game_id):
"""Get a game from an id"""
return await self._sess.get_game(game_id)
async def get_game_name(self, name):
"""Get a game from its names"""
for game in await self.get_games():
if game['name'] == name:
return game
assert False, 'No game found for name {}'.format(name)
async def create_game(self, sim_id, name, size, configuration=None):
"""Creates a game and returns it
Parameters
----------
sim_id : int
The simulator id for this game.
name : str
The name for the game.
size : int
The number of players in this game.
configuration : {str: str}, optional
A dictionary representation that sets all the run-time parameters
for this scheduler. This ignores simulator defaults, and will only
set the configuration parameters specified. To include simulator
defaults you must manually get the configuration parameters from
the simulator."""
return await self._sess.create_game(
sim_id, name, size, configuration or {})
async def get_canon_game(self, sim_id, symgrps, configuration=None):
"""Get the canonicalized game
This is a default version of the game with symgrps and configuration.
This way games can be reused without worrying about making sure they
exist or creating duplicate games.
Parameters
----------
sim_id : int
The id of the simulator to make the game for.
symgrps : [(role, players, [strategy])]
The symmetry groups for the game. The game is created or fetched
with these in mind, and should not be modified afterwards.
"""
return await self._sess.get_canon_game(
sim_id, symgrps, configuration or {})
async def get_profile(self, prof_id):
"""Get a profile from its id
`id`s can be found with a scheduler's `get_requirements`, when adding a
profile to a scheduler, or from a game with sufficient granularity."""
return await _Profile(self._sess, id=prof_id).get_structure()
def get_simulations(self, page_start=1, asc=False, column=None, search=''):
"""Get information about current simulations
Parameters
----------
page_start : int, optional
The page of results to start at beginning at 1. Traditionally there
are 25 results per page, but this is defined by the server.
asc : bool, optional
If results should be sorted ascending. By default, they are
descending, showing the most recent jobs or solders.
column : str, optional
The column to sort on `page_start` must be at least 1. `column`
should be one of 'job', 'folder', 'profile', 'simulator', or
'state'.
search : string, optional
A string to optionally filter results by. See the page on
egtaonline for more information about what this can be. By default
no filtering is done.
"""
column = _SIMS_MAPPING.get(column, column)
data = {
'direction': 'ASC' if asc else 'DESC',
'search': search,
}
if column is not None:
data['sort'] = column
return _SimulationIterator(self._sess, page_start, data)
async def get_simulation(self, folder):
"""Get a simulation from its folder number"""
resp = await self._sess.html_non_api_request(
'get',
'simulations/{:d}'.format(folder))
info = resp.xpath('//div[@class="show_for simulation"]/p')
parsed = (''.join(e.itertext()).split(':', 1) for e in info)
return {key.lower().replace(' ', '_'): _sims_parse(val.strip())
for key, val in parsed}
class _SimulationIterator(object): # pylint: disable=too-few-public-methods
"""AsyncIterator for simulations"""
def __init__(self, session, page_start, data):
self._sess = session
self._data = data
self._page = itertools.count(page_start)
self._rows = iter(())
def __aiter__(self):
return self
async def __anext__(self):
try:
row = next(self._rows)
except StopIteration:
self._data['page'] = next(self._page)
resp = await self._sess.html_non_api_request(
'get', 'simulations', data=self._data)
self._rows = iter(resp.xpath('//tbody/tr'))
try:
row = next(self._rows)
except StopIteration:
raise StopAsyncIteration
res = (_sims_parse(''.join(e.itertext())) # pragma: no branch
for e in row.getchildren())
return dict(zip(_SIMS_MAPPING, res))
class _Simulator(_Base):
"""Get information about and modify EGTA Online Simulators"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self['url'] = '/'.join([
'https:/', self._sess.domain, 'simulators', str(self['id'])])
async def get_info(self):
"""Return information about this simulator
If the id is unknown this will search all simulators for one with the
same name and optionally version. If version is unspecified, but only
one simulator with that name exists, this lookup should still succeed.
This returns a new simulator object, but will update the id of the
current simulator if it was undefined."""
resp = await self._sess.request(
'get', 'simulators/{sim:d}.json'.format(sim=self['id']))
result = resp.json()
return _Simulator(self._sess, result)
async def add_role(self, role):
"""Adds a role to the simulator"""
sim_info = await self.get_info()
while role not in sim_info['role_configuration']:
await self._sess.request(
'post',
'simulators/{sim:d}/add_role.json'.format(sim=self['id']),
data={'role': role})
sim_info = await self.get_info()
async def remove_role(self, role):
"""Removes a role from the simulator"""
sim_info = await self.get_info()
while role in sim_info['role_configuration']:
await self._sess.request(
'post',
'simulators/{sim:d}/remove_role.json'.format(sim=self['id']),
data={'role': role})
sim_info = await self.get_info()
async def _add_strategy(self, role, strategy):
"""Like `add_strategy` but without the duplication check"""
await self._sess.request(
'post',
'simulators/{sim:d}/add_strategy.json'.format(sim=self['id']),
data={'role': role, 'strategy': strategy})
async def add_strategy(self, role, strategy):
"""Adds a strategy to the simulator
Note: This performs an extra check to prevent adding an existing
strategy to the simulator."""
# We call get_info to make sure we're up to date, but there are still
# race condition issues with this.
sim_info = await self.get_info()
while strategy not in sim_info['role_configuration'][role]:
await self._add_strategy(role, strategy)
sim_info = await self.get_info()
async def add_strategies(self, role_strat_dict):
"""Adds all of the roles and strategies in a dictionary
The dictionary should be of the form {role: [strategies]}."""
# We call get_info again to make sure we're up to date. There are
# obviously race condition issues with this.
sim_info = await self.get_info()
async def add_role(role, strats):
"""Asynchronous add role"""
if role not in sim_info['role_configuration']:
await self.add_role(role)
strats = set(strats)
strats.difference_update(sim_info['role_configuration'].get(
role, ()))
while strats:
await asyncio.gather(*[
self._add_strategy(role, strat) for strat in strats])
s_info = await self.get_info()
strats.difference_update(s_info['role_configuration'].get(
role, ()))
await asyncio.gather(*[
add_role(role, strats) for role, strats
in role_strat_dict.items()])
async def _remove_strategy(self, role, strategy):
"""Removes a strategy from the simulator"""
await self._sess.request(
'post',
'simulators/{sim:d}/remove_strategy.json'.format(sim=self['id']),
data={'role': role, 'strategy': strategy})
async def remove_strategy(self, role, strategy):
"""Removes a strategy from the simulator"""
sim_info = await self.get_info()
while strategy in sim_info['role_configuration'].get(role, ()):
await self._remove_strategy(role, strategy)
sim_info = await self.get_info()
async def remove_strategies(self, role_strat_dict):
"""Removes all of the strategies in a dictionary
The dictionary should be of the form {role: [strategies]}. Empty roles
are not removed."""
remaining = set(itertools.chain.from_iterable(
((role, strat) for strat in set(strats))
for role, strats in role_strat_dict.items()))
sim_info = await self.get_info()
remaining.intersection_update(
set(itertools.chain.from_iterable(
((role, strat) for strat in set(strats))
for role, strats in sim_info['role_configuration'].items())))
await asyncio.gather(*[
self._remove_strategy(role, strat) for role, strat
in itertools.chain.from_iterable(
((role, strat) for strat in set(strats))
for role, strats in role_strat_dict.items())])
async def create_generic_scheduler( # pylint: disable=too-many-arguments
self, name, active, process_memory, size, time_per_observation,
observations_per_simulation, nodes=1, configuration=None):
"""Creates a generic scheduler for this simulator and returns it"""
return await self._sess.create_generic_scheduler(
self['id'], name, active, process_memory, size,
time_per_observation, observations_per_simulation, nodes,
configuration or {})
async def create_game(self, name, size, configuration=None):
"""Creates a game for this simulator and returns it"""
return await self._sess.create_game(
self['id'], name, size, configuration or {})
async def get_canon_game(self, symgrps, configuration=None):
"""Get the canon game for this simulator"""
return await self._sess.get_canon_game(
self['id'], symgrps, configuration or {})
class _Scheduler(_Base):
"""Get information and modify EGTA Online Scheduler"""
async def get_info(self):
"""Get a scheduler information"""
resp = await self._sess.request(
'get',
'schedulers/{sched_id}.json'.format(sched_id=self['id']))
return _Scheduler(self._sess, resp.json())
async def get_requirements(self):
"""Get the schedulign requirements of a scheduler"""
resp = await self._sess.request(
'get',
'schedulers/{sched_id}.json'.format(sched_id=self['id']),
{'granularity': 'with_requirements'})
result = resp.json()
# The or is necessary since egta returns null instead of an empty list
# when a scheduler has not requirements
reqs = result.get('scheduling_requirements', None) or ()
result['scheduling_requirements'] = [
_Profile(self._sess, prof, id=prof.pop('profile_id'))
for prof in reqs]
result['url'] = 'https://{}/{}s/{:d}'.format(
self._sess.domain, inflection.underscore(result['type']),
result['id'])
return _Scheduler(self._sess, result)
async def update(self, **kwargs):
"""Update the parameters of a given scheduler
kwargs are any of the mandatory arguments for create_generic_scheduler,
except for configuration, that cannont be updated for whatever
reason."""
if 'active' in kwargs:
kwargs['active'] = int(kwargs['active'])
await self._sess.request(
'put',
'generic_schedulers/{sid:d}.json'.format(sid=self['id']),
data={'scheduler': kwargs})
async def activate(self):
"""Activate the scheduler"""
await self.update(active=True)
async def deactivate(self):
"""Deactivate the scheduler"""
await self.update(active=False)
async def add_role(self, role, count):
"""Add a role with specific count to the scheduler"""
await self._sess.request(
'post',
'generic_schedulers/{sid:d}/add_role.json'.format(sid=self['id']),
data={'role': role, 'count': count})
async def add_roles(self, role_counts):
"""Add roles
Parameters
----------
role_counts : {role: count}
A dictionary of the roles and counts.
"""
await asyncio.gather(*[
self.add_role(role, count) for role, count
in role_counts.items()])
async def remove_role(self, role):
"""Remove a role from the scheduler"""
await self._sess.request(
'post',
'generic_schedulers/{sid:d}/remove_role.json'.format(
sid=self['id']),
data={'role': role})
async def remove_roles(self, roles):
"""Remove roles
Parameters
----------
roles : [role]
An iterable of the roles to remove.
"""
await asyncio.gather(*[
self.remove_role(role) for role in roles])
async def destroy_scheduler(self):
"""Delete a generic scheduler"""
await self._sess.request(
'delete',
'generic_schedulers/{sid:d}.json'.format(sid=self['id']))
async def add_profile(self, assignment, count):
"""Add a profile to the scheduler
Parameters
----------
assignment : str or list
This must be an assignment string (e.g. "role: count strategy, ...;
...") or a symmetry group list (e.g. `[{"role": role, "strategy":
strategy, "count": count}, ...]`).
count : int
The number of observations of that profile to schedule.
Notes
-----
If the profile already exists, this won't change the requested count.
"""
if not isinstance(assignment, str):
assignment = symgrps_to_assignment(assignment)
resp = await self._sess.request(
'post',
'generic_schedulers/{sid:d}/add_profile.json'.format(
sid=self['id']),
data={
'assignment': assignment,
'count': count
})
return _Profile(self._sess, resp.json(), assignment=assignment)
async def remove_profile(self, prof_id):
"""Removes a profile from a scheduler
Parameters
----------
prof_id : int
The profile id to remove
"""
await self._sess.request(
'post',
'generic_schedulers/{sid:d}/remove_profile.json'.format(
sid=self['id']),
data={'profile_id': prof_id})
async def remove_all_profiles(self):
"""Removes all profiles from a scheduler"""
# We fetch scheduling requirements in case the data in self if out of
# date.
reqs = await self.get_requirements()
await asyncio.gather(*[
self.remove_profile(prof['id']) for prof
in reqs['scheduling_requirements']])
async def create_game(self, name=None):
"""Creates a game with the same parameters of the scheduler
If name is unspecified, it will copy the name from the scheduler. This
will fail if there's already a game with that name."""
if {'configuration', 'name', 'simulator_id', 'size'}.difference(self):
reqs = await self.get_requirements()
return await reqs.create_game(name)
return await self._sess.create_game(
self['simulator_id'], self['name'] if name is None else name,
self['size'], dict(self['configuration']))
class _Profile(_Base):
"""Class for manipulating profiles"""
async def _get_info(self, granularity, validate):
"""Gets information about the profile
Parameters
----------
granularity : str
String representing the granularity of data to fetch. This is
identical to game level granularity. It can be one of
'structure', 'summary', 'observations', 'full'. See the
corresponding get_`granularity` methods.
validate : bool
Whether to validate the returned json to make sure it's
valid.
"""
jresp = await self._sess.json_validate_request(
_PROF_SCHEMATA[granularity] if validate else _NO_SCHEMA,
'get',
'profiles/{pid:d}.json'.format(pid=self['id']),
{'granularity': granularity})
return _Profile(self._sess, jresp)
async def get_structure(self, validate=True):
"""Get profile information but no payoff data"""
return await self._get_info('structure', validate)
async def get_summary(self, validate=True):
"""Return payoff data for each symmetry group"""
return await self._get_info('summary', validate)
async def get_observations(self, validate=True):
"""Return payoff data for each observation symmetry group"""
return await self._get_info('observations', validate)
async def get_full_data(self, validate=True):
"""Return payoff data for each player observation"""
return await self._get_info('full', validate)
class _Game(_Base):
"""Get information and manipulate EGTA Online Games"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self['url'] = '/'.join([
'https:/', self._sess.domain, 'games', str(self['id'])])
async def _get_info(self, granularity, validate):
"""Gets game information and data
Parameters
----------
granularity : str
Get data at one of the following granularities: structure, summary,
observations, full. See the corresponding get_`granularity` methods
for detailed descriptions of each granularity.
validate : bool
Whether to cvalidate the returned json. Since we make a non-api
request, the result is often not valid, so this is usually
preferred despite the icnrease in time.
"""
try:
# This call breaks convention because the api is broken, so we use
# a different api.
result = await self._sess.json_non_api_request(
_GAME_SCHEMATA[granularity] if validate else _NO_SCHEMA,
'get',
'games/{gid:d}.json'.format(gid=self['id']),
data={'granularity': granularity})
if granularity == 'structure':
# TODO Is there a good way to validate this? Given how
# small it is its unlikely to be wrong, but this is still
# a missed edge case
result = json.loads(result)
else:
result['profiles'] = [
_Profile(self._sess, p) for p
in result['profiles'] or ()]
return _Game(self._sess, result)
except requests.exceptions.HTTPError as ex:
if not (str(ex).startswith('500 Server Error:') and
granularity in {'observations', 'full'}):
raise ex
result = await self.get_summary()
if granularity == 'observations':
profs = await asyncio.gather(*[
prof.get_observations(validate) for prof
in result['profiles']])
for gran in profs:
gran.pop('simulator_instance_id')
for obs in gran['observations']:
obs['extended_features'] = {}
obs['features'] = {}
else:
profs = await asyncio.gather(*[
prof.get_full_data(validate) for prof
in result['profiles']])
for gran in profs:
gran.pop('simulator_instance_id')
for obs in gran['observations']:
obs['extended_features'] = {}
obs['features'] = {}
for prf in obs['players']:
prf['e'] = {}
prf['f'] = {}
result['profiles'] = profs
return result
async def get_structure(self, validate=True):
"""Get game information without payoff data"""
return await self._get_info('structure', validate)
async def get_summary(self, validate=True):
"""Get payoff data for each profile by symmetry group"""
return await self._get_info('summary', validate)
async def get_observations(self, validate=True):
"""Get payoff data for each symmetry groups observation"""
return await self._get_info('observations', validate)
async def get_full_data(self, validate=True):
"""Get payoff data for each players observation"""
return await self._get_info('full', validate)
async def add_role(self, role, count):
"""Adds a role to the game"""
await self._sess.request(
'post',
'games/{game:d}/add_role.json'.format(game=self['id']),
data={'role': role, 'count': count})
async def add_roles(self, role_count_dict):
"""Add roles to the game
Parameters
----------
role_count_dict : {role: count}
A dictionary of the counts for each role to add.
"""
# XXX Egtaonline sometimes just doesn't add roles if we hit it
# too fast
# await asyncio.gather(*[
# self.add_role(role, count) for role, count
# in role_count_dict.items()])
for role, count in role_count_dict.items():
await self.add_role(role, count)
async def remove_role(self, role):
"""Removes a role from the game"""
await self._sess.request(
'post',
'games/{game:d}/remove_role.json'.format(game=self['id']),
data={'role': role})
async def remove_roles(self, roles):
"""Remove roles from the game
Parameters
----------
roles : [role]
An iterable of the roles to remove
"""
# XXX Egtaonline sometimes just doesn't remove roles if we hit it
# too fast
# await asyncio.gather(*[
# self.remove_role(role) for role in roles])
for role in roles:
await self.remove_role(role)
async def add_strategy(self, role, strategy):
"""Adds a strategy to the game"""
await self._sess.request(
'post',
'games/{game:d}/add_strategy.json'.format(game=self['id']),
data={'role': role, 'strategy': strategy})
async def add_strategies(self, role_strat_dict):
"""Attempts to add all of the strategies in a dictionary
The dictionary should be of the form {role: [strategies]}."""
# XXX Egta sometimes doesn't remove strategies
# await asyncio.gather(*[
# self.add_strategy(role, strat) for role, strat
# in itertools.chain.from_iterable(
# ((role, strat) for strat in set(strats))
# for role, strats in role_strat_dict.items())])
for role, strats in role_strat_dict.items():
for strat in strats:
await self.add_strategy(role, strat)
async def remove_strategy(self, role, strategy):
"""Removes a strategy from the game"""
await self._sess.request(
'post',
'games/{game:d}/remove_strategy.json'.format(game=self['id']),
data={'role': role, 'strategy': strategy})
async def remove_strategies(self, role_strat_dict):
"""Removes all of the strategies in a dictionary
The dictionary should be of the form {role: [strategies]}. Empty roles
are not removed."""
# XXX Egta sometime doesn't remove strategies
# await asyncio.gather(*[
# self.remove_strategy(role, strat) for role, strat
# in itertools.chain.from_iterable(
# ((role, strat) for strat in set(strats))
# for role, strats in role_strat_dict.items())])
for role, strats in role_strat_dict.items():
for strat in strats:
await self.remove_strategy(role, strat)
async def add_symgroup(self, role, count, strategies):
"""Add a symmetry group to the game
Parameters
----------
role : str
count : int
strategies : [str]
"""
await self.add_role(role, count)
await self.add_strategies({role: strategies})
async def add_symgroups(self, symgrps):
"""Add all symgrps to the game
Parameters
----------
symgrps : [(role, count, [strat])]
The symgroups to add to the game.
"""
# XXX Egta sometimes doesn't add strategies
# await asyncio.gather(*[
# self.add_symgroup(role, count, strats) for role, count, strats
# in symgrps])
for role, count, strats in symgrps:
await self.add_symgroup(role, count, strats)
async def destroy_game(self):
"""Delete a game"""
await self._sess.non_api_request(
'post',
'games/{game:d}'.format(game=self['id']),
data={
'auth_token': self._sess.auth_token, # Necessary
'_method': 'delete',
})
async def create_generic_scheduler( # pylint: disable=too-many-arguments
self, name, active, process_memory, time_per_observation,
observations_per_simulation, nodes=1, configuration=None):
"""Create a generic scheduler with the configuration of the game"""
if not {'simulator_fullname', 'roles'} <= self.keys():
summ = await self.get_summary()
return await summ.create_generic_scheduler(
name, active, process_memory, time_per_observation,
observations_per_simulation, nodes, configuration)
size = sum(symgrp['count'] for symgrp in self['roles'])
sim = await self._sess.get_simulator_fullname(
self['simulator_fullname'])
sched = await self._sess.create_generic_scheduler(
sim['id'], name, active, process_memory, size,
time_per_observation, observations_per_simulation, nodes,
configuration or {})
await sched.add_roles({
symgrp['name']: symgrp['count'] for symgrp in self['roles']})
return sched
[docs]def api( # pylint: disable=too-many-arguments
auth_token=None, domain='egtaonline.eecs.umich.edu', retry_on=(504,),
num_tries=20, retry_delay=20, retry_backoff=1.2, executor=None):
"""Create an api object"""
return _EgtaOnlineApi(
auth_token, domain, retry_on, num_tries, retry_delay, retry_backoff,
executor)
[docs]def symgrps_to_assignment(symmetry_groups):
"""Converts a symmetry groups structure to an assignemnt string"""
roles = {}
for symgrp in symmetry_groups:
role, strat, count = symgrp['role'], symgrp[
'strategy'], symgrp['count']
roles.setdefault(role, []).append((strat, count))
return '; '.join(
'{}: {}'.format(role, ', '.join('{:d} {}'.format(count, strat)
for strat, count in sorted(strats)
if count > 0))
for role, strats in sorted(roles.items()))
_SIMS_MAPPING = collections.OrderedDict([
('state', 'state'),
('profile', 'profiles.assignment'),
('simulator', 'simulator_fullname'),
('folder', 'id'),
('job', 'job_id'),
])
# Schemata
_PROF_STRUCT_SCHEMA = {
'type': 'object',
'properties': {
'assignment': {'type': 'string'},
'created_at': {'type': 'string'},
'id': {'type': 'integer'},
'observations_count': {'type': 'integer'},
'role_configuration': {'type': 'object'},
'simulator_instance_id': {'type': 'integer'},
'size': {'type': 'integer'},
'updated_at': {'type': 'string'},
},
'required': ['assignment', 'created_at', 'id', 'observations_count',
'role_configuration', 'simulator_instance_id', 'size',
'updated_at'],
}
_GAME_STRUCT_SCHEMA = {
'type': 'object',
'properties': {
'created_at': {'type': 'string'},
'id': {'type': 'integer'},
'name': {'type': 'string'},
'simulator_instance_id': {'type': 'integer'},
'size': {'type': 'integer'},
'updated_at': {'type': 'string'},
},
'required': ['created_at', 'id', 'name', 'simulator_instance_id',
'size', 'updated_at'],
}
_PROF_SUMM_SCHEMA = {
'type': 'object',
'properties': {
'id': {'type': 'integer'},
'observations_count': {'type': 'integer'},
'symmetry_groups': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'id': {'type': 'integer'},
'payoff': {'type': ['number', 'null']},
'payoff_sd': {'type': ['number', 'null']},
'role': {'type': 'string'},
'strategy': {'type': 'string'},
},
'required': ['id', 'payoff', 'payoff_sd', 'role',
'strategy'],
},
},
},
'required': ['id', 'observations_count', 'symmetry_groups'],
}
_GAME_SUMM_SCHEMA = {
'type': 'object',
'properties': {
'id': {'type': 'integer'},
'simulator_fullname': {'type': 'string'},
'profiles': {'oneOf': [
{'type': 'null'},
{
'type': 'array',
'items': _PROF_SUMM_SCHEMA,
},
]},
'name': {'type': 'string'},
'configuration': {
'type': 'array',
'items': {
'type': 'array',
'items': {'type': 'string'},
'maxItems': 2,
'minItems': 2,
}
},
'roles': {'oneOf': [
{'type': 'null'},
{
'type': 'array',
'items': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'count': {'type': 'integer'},
'strategies': {
'type': 'array',
'items': {'type': 'string'},
}
},
'required': ['count', 'name', 'strategies']
},
},
]},
},
'required': ['id', 'simulator_fullname', 'profiles', 'name',
'configuration', 'roles'],
}
_OBS_OBS_SCHEMA = {
'type': 'object',
'properties': {
'extended_features': {'type': ['object', 'null']},
'features': {'type': ['object', 'null']},
'symmetry_groups': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'id': {'type': 'integer'},
'payoff': {'type': 'number'},
'payoff_sd': {'type': ['number', 'null']},
},
'required': ['id', 'payoff', 'payoff_sd'],
},
},
},
'required': ['extended_features', 'features', 'symmetry_groups'],
}
_PROF_OBS_SCHEMA = {
'type': 'object',
'properties': {
'id': {'type': 'integer'},
'observations': {
'type': 'array',
'items': _OBS_OBS_SCHEMA,
},
'symmetry_groups': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'id': {'type': 'integer'},
'count': {'type': 'integer'},
'role': {'type': 'string'},
'strategy': {'type': 'string'},
},
'required': ['id', 'count', 'role', 'strategy'],
},
},
},
'required': ['id', 'observations', 'symmetry_groups'],
}
_GAME_OBS_SCHEMA = copy.deepcopy(_GAME_SUMM_SCHEMA)
_GAME_OBS_SCHEMA['properties']['profiles']['oneOf'][1]['items'] = \
_PROF_OBS_SCHEMA
_PROF_FULL_SCHEMA = copy.deepcopy(_PROF_OBS_SCHEMA)
_PROF_FULL_SCHEMA['properties']['observations']['items'] = {
'type': 'object',
'properties': {
'extended_features': {'type': ['object', 'null']},
'features': {'type': ['object', 'null']},
'players': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'e': {'type': ['object', 'null']},
'f': {'type': ['object', 'null']},
'sid': {'type': 'integer'},
'p': {'type': 'number'},
},
'required': ['e', 'f', 'p', 'sid'],
},
},
},
'required': ['extended_features', 'features', 'players'],
}
_GAME_FULL_SCHEMA = copy.deepcopy(_GAME_SUMM_SCHEMA)
_GAME_FULL_SCHEMA['properties']['profiles']['oneOf'][1]['items'] = \
_PROF_FULL_SCHEMA
# TODO These don't check for sim_instance_id
_PROF_SCHEMATA = {
'structure': _PROF_STRUCT_SCHEMA,
'summary': _PROF_SUMM_SCHEMA,
'observations': _PROF_OBS_SCHEMA,
'full': _PROF_FULL_SCHEMA,
}
_GAME_SCHEMATA = {
'structure': {'type': 'string'}, # bug in the way structure is returned
'summary': _GAME_SUMM_SCHEMA,
'observations': _GAME_OBS_SCHEMA,
'full': _GAME_FULL_SCHEMA,
}
_NO_SCHEMA = {'type': ['string', 'object']}
def _sims_parse(res):
"""Converts N/A to `nan` and otherwise tries to parse integers"""
try:
return int(res)
except ValueError:
if res.lower() == 'n/a': # pylint: disable=no-else-return
return float('nan') # pragma: no cover
else:
return res