"""A scheduler that gets payoffs from a local simulation"""
import asyncio
import itertools
import json
import logging
import os
import shutil
import tempfile
import zipfile
from gameanalysis import paygame
from gameanalysis import rsgame
from gameanalysis import utils
from egta import profsched
class _ZipScheduler(
profsched._OpenableScheduler
): # pylint: disable=too-many-instance-attributes,protected-access
"""Schedule profiles using am EGTA Online zip file
Parameters
----------
game : RsGame
A gameanalysis game that indicates how array profiles should be turned
into json profiles.
config : {key: value}
A dictionary mapping string keys to values that will be passed to the
simulator in the standard simulation spec format.
zipf : string, file-like
A zip file that follows the same semantics that EGTA Online expects.
max_procs : int, optional
The maximum number of processes to spawn for simulations.
"""
def __init__(self, game, conf, zipf, *, max_procs=4, simultaneous_obs=1):
super().__init__(game.role_names, game.strat_names, game.num_role_players)
self._game = paygame.game_copy(rsgame.empty_copy(game))
self.conf = conf
self.zipf = zipf
self._extra_profs = {}
self._base = {}
self._count = simultaneous_obs
self._is_open = False
self._sim_dir = None
self._prof_dir = None
self._sim_root = None
self._num = 0
self._procs = asyncio.Semaphore(max_procs)
async def sample_payoffs(self, profile):
utils.check(self._is_open, "must enter scheduler")
hprof = utils.hash_array(profile)
counter, queue = self._extra_profs.get(hprof, (None, None))
if counter is not None:
# Already scheduling some profiles
if next(counter) >= self._count:
self._extra_profs.pop(hprof)
pay = await queue.get()
logging.debug("read payoff for profile: %s", self.profile_to_repr(profile))
return pay
else:
# Need to schedule new profiles
direc = os.path.join(self._prof_dir.name, str(self._num))
self._num += 1
queue = asyncio.Queue()
if self._count > 1:
self._extra_profs[hprof] = (itertools.count(2), queue)
os.makedirs(direc)
self._base["assignment"] = self._game.profile_to_assignment(profile)
with open(os.path.join(direc, "simulation_spec.json"), "w") as fil:
json.dump(self._base, fil)
logging.debug(
"scheduled %d profile%s: %s",
self._count,
"" if self._count == 1 else "s",
self.profile_to_repr(profile),
)
# Limit simultaneous processes
async with self._procs:
proc = await asyncio.create_subprocess_exec(
os.path.join("script", "batch"),
direc,
str(self._count),
cwd=self._sim_root,
stderr=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.DEVNULL,
)
_, err = await proc.communicate()
utils.check(
proc.returncode == 0,
"process failed with returncode {:d} and stderr {}",
proc.returncode,
err,
)
obs_files = (
f
for f in os.listdir(direc)
if "observation" in f and f.endswith(".json")
)
for _ in range(self._count):
obs_file = next(obs_files, None)
utils.check(
obs_file is not None,
"simulation didn't write enough observation files",
)
with open(os.path.join(direc, obs_file)) as fil:
pay = self._game.payoff_from_json(json.load(fil))
pay.setflags(write=False)
queue.put_nowait(pay)
obs_file = next(obs_files, None)
utils.check(obs_file is None, "simulation wrote too many observation files")
shutil.rmtree(direc)
pay = queue.get_nowait()
logging.debug("read payoff for profile: %s", self.profile_to_repr(profile))
return pay
def open(self):
"""Open the zip scheduler"""
utils.check(not self._is_open, "can't be open")
try:
self._num = 0
self._sim_dir = tempfile.TemporaryDirectory()
self._prof_dir = tempfile.TemporaryDirectory()
with zipfile.ZipFile(self.zipf) as zfil:
zfil.extractall(self._sim_dir.name)
sim_files = [
d for d in os.listdir(self._sim_dir.name) if d not in {"__MACOSX"}
]
utils.check(
len(sim_files) == 1,
"improper zip format, only one file should exist in root",
)
self._sim_root = os.path.join(self._sim_dir.name, sim_files[0])
os.chmod(os.path.join(self._sim_root, "script", "batch"), 0o700)
with open(os.path.join(self._sim_root, "defaults.json")) as fil:
self._base["configuration"] = json.load(fil).get("configuration", {})
self._base["configuration"].update(self.conf)
self._is_open = True
except Exception as ex:
self.close()
raise ex
def close(self):
"""Close the zip scheduler"""
self._is_open = False
self._sim_dir.cleanup()
self._prof_dir.cleanup()
def __str__(self):
return self.zipf
[docs]def zipsched(game, conf, zipf, *, max_procs=4, simultaneous_obs=1):
"""Create a zip scheduler"""
return _ZipScheduler(
game, conf, zipf, max_procs=max_procs, simultaneous_obs=simultaneous_obs
)