Source code for egta.simsched
"""A scheduler that gets payoffs from a local simulation"""
import asyncio
import collections
import contextlib
import json
import logging
import subprocess
from gameanalysis import paygame
from gameanalysis import rsgame
from gameanalysis import utils
from egta import profsched
class _SimulationScheduler(
profsched._AOpenableScheduler
): # pylint: disable=too-many-instance-attributes,protected-access
"""Schedule profiles using a command line program
Parameters
----------
game : RsGame
A gameanalysis game that indicates how array profiles should be turned
into json profiles.
config : {key: value}
A dictionary mapping string keys to values that will be passed to the
simulator in the standard simulation spec format.
command : [str]
A list of strings that represents a command line program to run. This
program must accept simulation spec files as flushed lines of input to
standard in, and write the resulting output as an observation to
standard out. After all input lines have been read, this must flush the
output otherwise this could hang waiting for results that are trapped
in a buffer.
buff_size : int, optional
The maximum number of bytes to send to the command at a time. The
default should be fine for most applications, but if you know your
machine has a larger or smaller buffer size, setting this accurately
will prevent unnecessary blocking.
"""
def __init__(self, game, config, command, buff_size=65536):
super().__init__(game.role_names, game.strat_names, game.num_role_players)
self._game = paygame.game_copy(rsgame.empty_copy(game))
self._base = {"configuration": config}
self.command = command
self.buff_size = buff_size
self._is_open = False
self._proc = None
self._reader = None
self._read_queue = asyncio.Queue()
self._write_lock = asyncio.Lock()
self._buffer_empty = asyncio.Event()
self._buffer_bytes = 0
self._line_bytes = collections.deque()
self._buffer_empty.set()
async def sample_payoffs(self, profile):
utils.check(self._is_open, "not open")
self._base["assignment"] = self._game.profile_to_json(profile)
bprof = json.dumps(self._base, separators=(",", ":")).encode("utf8")
size = len(bprof) + 1
utils.check(
size < self.buff_size,
"profile could not be written to buffer without blocking",
)
async with self._write_lock:
self._buffer_bytes += size
self._line_bytes.appendleft(size)
if self._buffer_bytes >= self.buff_size:
self._buffer_empty.clear()
await self._buffer_empty.wait()
got_data = asyncio.Event()
line = [None]
self._read_queue.put_nowait((line, got_data))
self._proc.stdin.write(bprof)
self._proc.stdin.write(b"\n")
try:
await self._proc.stdin.drain()
except ConnectionError: # pragma: no cover race condition
raise RuntimeError("process died unexpectedly")
logging.debug("scheduled profile: %s", self._game.profile_to_repr(profile))
await got_data.wait()
if self._reader.done() and self._reader.exception() is not None:
raise self._reader.exception()
jpays = json.loads(line[0].decode("utf8"))
payoffs = self._game.payoff_from_json(jpays)
payoffs.setflags(write=False)
logging.debug("read payoff for profile: %s", self.profile_to_repr(profile))
return payoffs
async def _read(self):
"""Read line loop"""
while True:
line, got_data = await self._read_queue.get()
try:
line[0] = await self._proc.stdout.readline()
if not line[0]:
raise RuntimeError("process died unexpectedly")
self._buffer_bytes -= self._line_bytes.pop()
if self._buffer_bytes < self.buff_size: # pragma: no branch
self._buffer_empty.set()
finally:
got_data.set()
async def aopen(self):
"""Open the simsched"""
utils.check(not self._is_open, "can't open twice")
utils.check(self._proc is None, "proce must be None")
utils.check(self._reader is None, "stream must be None")
try:
self._proc = await asyncio.create_subprocess_exec(
*self.command, stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
self._reader = asyncio.ensure_future(self._read())
self._is_open = True
except Exception as ex:
# XXX This line exists to fool duplication check
await self.aclose()
raise ex
return self
async def aclose(self):
"""Close the simsched"""
self._is_open = False
if self._reader is not None:
self._reader.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await self._reader
self._reader = None
if self._proc is not None:
with contextlib.suppress(ProcessLookupError):
self._proc.terminate()
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._proc.wait(), 0.25)
with contextlib.suppress(ProcessLookupError):
self._proc.kill()
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._proc.wait(), 0.25)
self._proc = None
while not self._read_queue.empty():
self._read_queue.get_nowait()
self._buffer_empty.set()
self._buffer_bytes = 0
self._line_bytes.clear()
def __str__(self):
return " ".join(self.command)
[docs]def simsched(game, config, command, buff_size=65536):
"""Create a new simsched"""
return _SimulationScheduler(game, config, command, buff_size=buff_size)