wepy.work_mapper.mapper module¶
Reference implementations, abstract base classes, and a production ready worker style mapper for mapping runner dynamics to walkers for wepy simulation cycles.
-
class
wepy.work_mapper.mapper.
ABCMapper
(segment_func=None, **kwargs)[source]¶ Bases:
object
Abstract base class for a Mapper.
Constructor for the Mapper class. No arguments are required.
- Parameters
segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.
-
property
attributes
¶
-
init
(segment_func=None, **kwargs)[source]¶ Runtime initialization and setting of function to map over walkers.
- Parameters
segment_func (callable implementing the Runner.run_segment interface) –
-
property
segment_func
¶ The function that will be called for new data in the map method.
-
class
wepy.work_mapper.mapper.
Mapper
(segment_func=None, **kwargs)[source]¶ Bases:
wepy.work_mapper.mapper.ABCMapper
Basic non-parallel reference implementation of a mapper.
Constructor for the Mapper class. No arguments are required.
- Parameters
segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.
-
map
(*args, **kwargs)[source]¶ Map the ‘segment_func’ to args.
- Parameters
*args (list of list) – Each element is the argument to one call of ‘segment_func’.
- Returns
results – The results of each call to ‘segment_func’ in the same order as input.
- Return type
Examples
>>> Mapper(segment_func=sum).map([(0,1,2), (3,4,5)]) [3, 12]
-
property
worker_segment_times
¶ The run timings for each segment for each walker.
- Returns
worker_seg_times – Dictionary mapping worker indices to a list of times in seconds for each segment run.
- Return type
dict of int : list of float
-
property
attributes
¶
-
cleanup
(**kwargs)¶ Runtime post-simulation tasks.
This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.
The Mapper class performs no actions here and all arguments are ignored.
-
init
(segment_func=None, **kwargs)¶ Runtime initialization and setting of function to map over walkers.
- Parameters
segment_func (callable implementing the Runner.run_segment interface) –
-
property
segment_func
¶ The function that will be called for new data in the map method.
-
class
wepy.work_mapper.mapper.
Task
(func, *args, **kwargs)[source]¶ Bases:
object
Class that composes a function and arguments.
Constructor for Task.
- Parameters
func (callable) – Function to be called on the arguments.
*args – The arguments to pass to func
-
exception
wepy.work_mapper.mapper.
WrapperException
(message, wrapped_exception=None, tb=None)[source]¶ Bases:
Exception
Exception used for wrapping another exception.
Since tracebacks can’t be pickled we format it and save that instead.
-
args
¶
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
-
exception
wepy.work_mapper.mapper.
TaskException
(message, wrapped_exception=None, tb=None)[source]¶ Bases:
wepy.work_mapper.mapper.WrapperException
-
args
¶
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
-
class
wepy.work_mapper.mapper.
ABCWorkerMapper
(num_workers=None, segment_func=None, proc_start_method='fork', **kwargs)[source]¶ Bases:
wepy.work_mapper.mapper.ABCMapper
Constructor for WorkerMapper.
- Parameters
num_workers (int) – The number of worker processes to spawn.
segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.
proc_start_method (str or None) – A string indicating the type of process start method to use from python multiprocessing typically ‘fork’, ‘spawn’, or ‘forkserver’, or the platform default for None. See documentation. Generates a context with the method multiprocessing.get_context(proc_start_method) on init.
-
init
(num_workers=None, segment_func=None, **kwargs)[source]¶ Runtime initialization and setting of function to map over walkers.
- Parameters
num_workers (int) – The number of worker processes to spawn
segment_func (callable implementing the Runner.run_segment interface) –
-
cleanup
(**kwargs)[source]¶ Runtime post-simulation tasks.
This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.
The Mapper class performs no actions here and all arguments are ignored.
-
property
num_workers
¶ The number of worker processes.
-
property
worker_segment_times
¶ The run timings for each segment for each walker.
- Returns
worker_seg_times – Dictionary mapping worker indices to a list of times in seconds for each segment run.
- Return type
dict of int : list of float
-
_make_task
(*args, **kwargs)[source]¶ Generate a task from ‘segment_func’ attribute.
Similar to partial evaluation (or currying).
Args will be eventually used as the arguments to the call of ‘segment_func’ by the worker processes when they receive the task from the queue.
- Returns
task
- Return type
Task object
-
property
attributes
¶
-
map
(*args, **kwargs)¶
-
property
segment_func
¶ The function that will be called for new data in the map method.
-
exception
wepy.work_mapper.mapper.
WorkerException
(message, wrapped_exception=None, tb=None)[source]¶ Bases:
wepy.work_mapper.mapper.WrapperException
-
args
¶
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
-
exception
wepy.work_mapper.mapper.
WorkerKilledError
[source]¶ Bases:
ChildProcessError
-
args
¶
-
characters_written
¶
-
errno
¶ POSIX exception code
-
filename
¶ exception filename
-
filename2
¶ second exception filename
-
strerror
¶ exception strerror
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
-
class
wepy.work_mapper.mapper.
WorkerMapper
(num_workers=None, worker_type=None, worker_attributes=None, segment_func=None, **kwargs)[source]¶ Bases:
wepy.work_mapper.mapper.ABCWorkerMapper
Work mapper implementation using multiple worker processes and task queue.
Uses the python multiprocessing module to spawn multiple worker processes which watch a task queue of walker segments.
Constructor for WorkerMapper.
- Parameters
num_workers (int) – The number of worker processes to spawn.
worker_type (callable, optional) – Callable that generates an object implementing the Worker interface, typically a type from a Worker class.
worker_attributes (dictionary) – A dictionary of values that are passed to the worker constructor as key-word arguments.
segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.
-
property
worker_type
¶ The callable that generates a worker object.
Typically this is just the type from the class definition of the Worker where the constructor is called.
-
init
(num_workers=None, segment_func=None, **kwargs)[source]¶ Runtime initialization and setting of function to map over walkers.
- Parameters
num_workers (int) – The number of worker processes to spawn
segment_func (callable implementing the Runner.run_segment interface) –
-
cleanup
(**kwargs)[source]¶ Runtime post-simulation tasks.
This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.
The Mapper class performs no actions here and all arguments are ignored.
-
_make_task
(*args, **kwargs)¶ Generate a task from ‘segment_func’ attribute.
Similar to partial evaluation (or currying).
Args will be eventually used as the arguments to the call of ‘segment_func’ by the worker processes when they receive the task from the queue.
- Returns
task
- Return type
Task object
-
property
attributes
¶
-
property
num_workers
¶ The number of worker processes.
-
property
segment_func
¶ The function that will be called for new data in the map method.
-
property
worker_segment_times
¶ The run timings for each segment for each walker.
- Returns
worker_seg_times – Dictionary mapping worker indices to a list of times in seconds for each segment run.
- Return type
dict of int : list of float
-
class
wepy.work_mapper.mapper.
Worker
(worker_idx, task_queue, result_queue, exception_queue, interrupt_connection, mapper_attributes=None, log_level='INFO', **kwargs)[source]¶ Bases:
multiprocessing.context.Process
Worker process.
This is a subclass of process with an overriden __init__ constructor that will automatically generate the Process.
When this class is constructed a new process will be formed.
Constructor for the Worker class.
- Parameters
worker_idx (int) – The index of the worker. Should be unique.
task_queue (multiprocessing.JoinableQueue) – The shared task queue the worker will watch for new tasks to complete.
result_queue (multiprocessing.Queue) – The shared queue that completed task results will be placed on.
interrupt_connection (multiprocessing.Connection) – One end of a pipe to listen for messages specific to this worker.
mapper_attributes (None or dict) – A dictionary of the attributes of the mapper for reference in workers.
kwargs – The worker specific attributes
-
NAME_TEMPLATE
= 'Worker-{}'¶ A string formatting template to identify worker processes in logs. The field will be filled with the worker index.
-
property
worker_idx
¶ Dictionary of attributes of the worker.
-
property
attributes
¶ Dictionary of attributes of the worker.
-
property
mapper_attributes
¶ Dictionary of attributes of the worker.
-
static
_Popen
(process_obj)¶
-
_bootstrap
()¶
-
_check_closed
()¶
-
_start_method
= None¶
-
property
authkey
¶
-
close
()¶ Close the Process object.
This method releases resources held by the Process object. It is an error to call this method if the child process is still running.
-
property
daemon
¶ Return whether process is a daemon
-
property
exitcode
¶ Return exit code of process or None if it has yet to stop
-
property
ident
¶ Return identifier (PID) of process or None if it has yet to start
-
is_alive
()¶ Return whether process is alive
-
join
(timeout=None)¶ Wait until child process terminates
-
kill
()¶ Terminate process; sends SIGKILL signal or uses TerminateProcess()
-
property
name
¶
-
property
pid
¶ Return identifier (PID) of process or None if it has yet to start
-
property
sentinel
¶ Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.
-
start
()¶ Start child process
-
terminate
()¶ Terminate process; sends SIGTERM signal or uses TerminateProcess()
-
run_task
(task)[source]¶ Actually executes the task.
This default runner simply executes the task thunk.
This can be customized by subclasses in order to allow for injection of worker specific data.
- Parameters
task (Task object) – The partially evaluated task; function plus arguments
- Returns
Results of running the task.
- Return type
task_result
-
_run_task
(task)[source]¶ Runs the given task and returns the results.
This manages handling exceptions and tracebacks from the actual run_task function which is intended to be specialized by different workers to inject worker specific arguments to tasks. Such as node and device identification.
- Parameters
task (Task object) – The partially evaluated task; function plus arguments
- Returns
Results of running the task.
- Return type
task_result