wepy.work_mapper.task_mapper module

exception wepy.work_mapper.task_mapper.TaskProcessException(message, wrapped_exception=None, tb=None)[source]

Bases: wepy.work_mapper.mapper.WrapperException

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception wepy.work_mapper.task_mapper.TaskProcessKilledError[source]

Bases: ChildProcessError

args
characters_written
errno

POSIX exception code

filename

exception filename

filename2

second exception filename

strerror

exception strerror

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class wepy.work_mapper.task_mapper.TaskMapper(walker_task_type=None, num_workers=None, segment_func=None, **kwargs)[source]

Bases: wepy.work_mapper.mapper.ABCWorkerMapper

Process-per-task mapper.

This method of work mapper starts new processes for each runner segment task that needs to be run. This allows cheap copying of shared state using the operating system primitives. On linux this would be either ‘fork’ (default) or ‘spawn’. Fork is cheap but doesn’t initialize certain process namespace things, whereas spawn is much more expensive but properly cleans things up. Fork should be sufficient in most cases, however spawn may be needed when you have some special contexts in the parent process. This is the case with starting CUDA contexts in the main parent process and then forking new processes from it. We suggest using fork and avoiding making these kinds of contexts in the main process.

This method avoids using shared memory or sending objects through interprocess communication (that has a serialization and deserialization cost associated with them) by using OS copying mechanism. However, a new process will be created each cycle for each walker in the simulation. So if you want a large number of walkers you may experience a large overhead. If your walker states are very small or a very fast serializer is available you may also not benefit from full process address space copies. Instead the WorkerMapper may be better suited.

Constructor for WorkerMapper.

Parameters
  • num_workers (int) – The number of worker processes to spawn.

  • segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.

  • proc_start_method (str or None) – A string indicating the type of process start method to use from python multiprocessing typically ‘fork’, ‘spawn’, or ‘forkserver’, or the platform default for None. See documentation. Generates a context with the method multiprocessing.get_context(proc_start_method) on init.

init(**kwargs)[source]

Runtime initialization and setting of function to map over walkers.

Parameters
  • num_workers (int) – The number of worker processes to spawn

  • segment_func (callable implementing the Runner.run_segment interface) –

_sigterm_shutdown(signum, frame)[source]
property walker_task_type

The callable that generates a worker object.

Typically this is just the type from the class definition of the Worker where the constructor is called.

force_shutdown()[source]
map(*args, **kwargs)[source]
_make_task(*args, **kwargs)

Generate a task from ‘segment_func’ attribute.

Similar to partial evaluation (or currying).

Args will be eventually used as the arguments to the call of ‘segment_func’ by the worker processes when they receive the task from the queue.

Returns

task

Return type

Task object

property attributes
cleanup(**kwargs)

Runtime post-simulation tasks.

This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.

The Mapper class performs no actions here and all arguments are ignored.

property num_workers

The number of worker processes.

property segment_func

The function that will be called for new data in the map method.

property worker_segment_times

The run timings for each segment for each walker.

Returns

worker_seg_times – Dictionary mapping worker indices to a list of times in seconds for each segment run.

Return type

dict of int : list of float

class wepy.work_mapper.task_mapper.WalkerTaskProcess(walker_idx, mapper_attributes, func, task_args, task_kwargs, worker_queue, results_list, worker_segment_times, interrupt_connection, **kwargs)[source]

Bases: multiprocessing.context.Process

NAME_TEMPLATE = 'Walker-{}'
_external_sigterm_shutdown(signum, frame)[source]
_shutdown()[source]

The normal shutdown which can be ordered by the work mapper.

property attributes
_run_task(task)[source]
run_task(task)[source]
static _Popen(process_obj)
_bootstrap()
_check_closed()
_start_method = None
property authkey
close()

Close the Process object.

This method releases resources held by the Process object. It is an error to call this method if the child process is still running.

property daemon

Return whether process is a daemon

property exitcode

Return exit code of process or None if it has yet to stop

property ident

Return identifier (PID) of process or None if it has yet to start

is_alive()

Return whether process is alive

join(timeout=None)

Wait until child process terminates

kill()

Terminate process; sends SIGKILL signal or uses TerminateProcess()

property name
property pid

Return identifier (PID) of process or None if it has yet to start

run()[source]

Method to be run in sub-process; can be overridden in sub-class

property sentinel

Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.

start()

Start child process

terminate()

Terminate process; sends SIGTERM signal or uses TerminateProcess()

_run_walker()[source]