wepy.work_mapper.mapper module

Reference implementations, abstract base classes, and a production ready worker style mapper for mapping runner dynamics to walkers for wepy simulation cycles.

class wepy.work_mapper.mapper.ABCMapper(segment_func=None, **kwargs)[source]

Bases: object

Abstract base class for a Mapper.

Constructor for the Mapper class. No arguments are required.

Parameters

segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.

property attributes
init(segment_func=None, **kwargs)[source]

Runtime initialization and setting of function to map over walkers.

Parameters

segment_func (callable implementing the Runner.run_segment interface) –

property segment_func

The function that will be called for new data in the map method.

cleanup(**kwargs)[source]

Runtime post-simulation tasks.

This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.

The Mapper class performs no actions here and all arguments are ignored.

map(*args, **kwargs)[source]
class wepy.work_mapper.mapper.Mapper(segment_func=None, **kwargs)[source]

Bases: wepy.work_mapper.mapper.ABCMapper

Basic non-parallel reference implementation of a mapper.

Constructor for the Mapper class. No arguments are required.

Parameters

segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.

map(*args, **kwargs)[source]

Map the ‘segment_func’ to args.

Parameters

*args (list of list) – Each element is the argument to one call of ‘segment_func’.

Returns

results – The results of each call to ‘segment_func’ in the same order as input.

Return type

list

Examples

>>> Mapper(segment_func=sum).map([(0,1,2), (3,4,5)])
[3, 12]
property worker_segment_times

The run timings for each segment for each walker.

Returns

worker_seg_times – Dictionary mapping worker indices to a list of times in seconds for each segment run.

Return type

dict of int : list of float

property attributes
cleanup(**kwargs)

Runtime post-simulation tasks.

This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.

The Mapper class performs no actions here and all arguments are ignored.

init(segment_func=None, **kwargs)

Runtime initialization and setting of function to map over walkers.

Parameters

segment_func (callable implementing the Runner.run_segment interface) –

property segment_func

The function that will be called for new data in the map method.

class wepy.work_mapper.mapper.Task(func, *args, **kwargs)[source]

Bases: object

Class that composes a function and arguments.

Constructor for Task.

Parameters
  • func (callable) – Function to be called on the arguments.

  • *args – The arguments to pass to func

exception wepy.work_mapper.mapper.WrapperException(message, wrapped_exception=None, tb=None)[source]

Bases: Exception

Exception used for wrapping another exception.

Since tracebacks can’t be pickled we format it and save that instead.

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception wepy.work_mapper.mapper.TaskException(message, wrapped_exception=None, tb=None)[source]

Bases: wepy.work_mapper.mapper.WrapperException

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class wepy.work_mapper.mapper.ABCWorkerMapper(num_workers=None, segment_func=None, proc_start_method='fork', **kwargs)[source]

Bases: wepy.work_mapper.mapper.ABCMapper

Constructor for WorkerMapper.

Parameters
  • num_workers (int) – The number of worker processes to spawn.

  • segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.

  • proc_start_method (str or None) – A string indicating the type of process start method to use from python multiprocessing typically ‘fork’, ‘spawn’, or ‘forkserver’, or the platform default for None. See documentation. Generates a context with the method multiprocessing.get_context(proc_start_method) on init.

init(num_workers=None, segment_func=None, **kwargs)[source]

Runtime initialization and setting of function to map over walkers.

Parameters
  • num_workers (int) – The number of worker processes to spawn

  • segment_func (callable implementing the Runner.run_segment interface) –

cleanup(**kwargs)[source]

Runtime post-simulation tasks.

This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.

The Mapper class performs no actions here and all arguments are ignored.

property num_workers

The number of worker processes.

property worker_segment_times

The run timings for each segment for each walker.

Returns

worker_seg_times – Dictionary mapping worker indices to a list of times in seconds for each segment run.

Return type

dict of int : list of float

_make_task(*args, **kwargs)[source]

Generate a task from ‘segment_func’ attribute.

Similar to partial evaluation (or currying).

Args will be eventually used as the arguments to the call of ‘segment_func’ by the worker processes when they receive the task from the queue.

Returns

task

Return type

Task object

property attributes
map(*args, **kwargs)
property segment_func

The function that will be called for new data in the map method.

exception wepy.work_mapper.mapper.WorkerException(message, wrapped_exception=None, tb=None)[source]

Bases: wepy.work_mapper.mapper.WrapperException

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception wepy.work_mapper.mapper.WorkerKilledError[source]

Bases: ChildProcessError

args
characters_written
errno

POSIX exception code

filename

exception filename

filename2

second exception filename

strerror

exception strerror

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class wepy.work_mapper.mapper.WorkerMapper(num_workers=None, worker_type=None, worker_attributes=None, segment_func=None, **kwargs)[source]

Bases: wepy.work_mapper.mapper.ABCWorkerMapper

Work mapper implementation using multiple worker processes and task queue.

Uses the python multiprocessing module to spawn multiple worker processes which watch a task queue of walker segments.

Constructor for WorkerMapper.

Parameters
  • num_workers (int) – The number of worker processes to spawn.

  • worker_type (callable, optional) – Callable that generates an object implementing the Worker interface, typically a type from a Worker class.

  • worker_attributes (dictionary) – A dictionary of values that are passed to the worker constructor as key-word arguments.

  • segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.

property worker_type

The callable that generates a worker object.

Typically this is just the type from the class definition of the Worker where the constructor is called.

init(num_workers=None, segment_func=None, **kwargs)[source]

Runtime initialization and setting of function to map over walkers.

Parameters
  • num_workers (int) – The number of worker processes to spawn

  • segment_func (callable implementing the Runner.run_segment interface) –

_sigterm_shutdown(signum, frame)[source]
force_shutdown(**kwargs)[source]
cleanup(**kwargs)[source]

Runtime post-simulation tasks.

This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.

The Mapper class performs no actions here and all arguments are ignored.

map(*args, **kwargs)[source]
_make_task(*args, **kwargs)

Generate a task from ‘segment_func’ attribute.

Similar to partial evaluation (or currying).

Args will be eventually used as the arguments to the call of ‘segment_func’ by the worker processes when they receive the task from the queue.

Returns

task

Return type

Task object

property attributes
property num_workers

The number of worker processes.

property segment_func

The function that will be called for new data in the map method.

property worker_segment_times

The run timings for each segment for each walker.

Returns

worker_seg_times – Dictionary mapping worker indices to a list of times in seconds for each segment run.

Return type

dict of int : list of float

class wepy.work_mapper.mapper.Worker(worker_idx, task_queue, result_queue, exception_queue, interrupt_connection, mapper_attributes=None, log_level='INFO', **kwargs)[source]

Bases: multiprocessing.context.Process

Worker process.

This is a subclass of process with an overriden __init__ constructor that will automatically generate the Process.

When this class is constructed a new process will be formed.

Constructor for the Worker class.

Parameters
  • worker_idx (int) – The index of the worker. Should be unique.

  • task_queue (multiprocessing.JoinableQueue) – The shared task queue the worker will watch for new tasks to complete.

  • result_queue (multiprocessing.Queue) – The shared queue that completed task results will be placed on.

  • interrupt_connection (multiprocessing.Connection) – One end of a pipe to listen for messages specific to this worker.

  • mapper_attributes (None or dict) – A dictionary of the attributes of the mapper for reference in workers.

  • kwargs – The worker specific attributes

NAME_TEMPLATE = 'Worker-{}'

A string formatting template to identify worker processes in logs. The field will be filled with the worker index.

property worker_idx

Dictionary of attributes of the worker.

property attributes

Dictionary of attributes of the worker.

property mapper_attributes

Dictionary of attributes of the worker.

run()[source]

Method to be run in sub-process; can be overridden in sub-class

static _Popen(process_obj)
_bootstrap()
_check_closed()
_sigterm_shutdown(signum, frame)[source]
_start_method = None
property authkey
close()

Close the Process object.

This method releases resources held by the Process object. It is an error to call this method if the child process is still running.

property daemon

Return whether process is a daemon

property exitcode

Return exit code of process or None if it has yet to stop

property ident

Return identifier (PID) of process or None if it has yet to start

is_alive()

Return whether process is alive

join(timeout=None)

Wait until child process terminates

kill()

Terminate process; sends SIGKILL signal or uses TerminateProcess()

property name
property pid

Return identifier (PID) of process or None if it has yet to start

property sentinel

Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.

start()

Start child process

terminate()

Terminate process; sends SIGTERM signal or uses TerminateProcess()

_shutdown()[source]
_run_worker()[source]
run_task(task)[source]

Actually executes the task.

This default runner simply executes the task thunk.

This can be customized by subclasses in order to allow for injection of worker specific data.

Parameters

task (Task object) – The partially evaluated task; function plus arguments

Returns

Results of running the task.

Return type

task_result

_run_task(task)[source]

Runs the given task and returns the results.

This manages handling exceptions and tracebacks from the actual run_task function which is intended to be specialized by different workers to inject worker specific arguments to tasks. Such as node and device identification.

Parameters

task (Task object) – The partially evaluated task; function plus arguments

Returns

Results of running the task.

Return type

task_result