wepy.work_mapper.mapper module¶
Reference implementations, abstract base classes, and a production ready worker style mapper for mapping runner dynamics to walkers for wepy simulation cycles.
- class wepy.work_mapper.mapper.ABCMapper(segment_func=None, **kwargs)[source]¶
Bases:
object
Abstract base class for a Mapper.
Constructor for the Mapper class. No arguments are required.
- Parameters:
segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.
- property attributes¶
- init(segment_func=None, **kwargs)[source]¶
Runtime initialization and setting of function to map over walkers.
- Parameters:
segment_func (callable implementing the Runner.run_segment interface)
- property segment_func¶
The function that will be called for new data in the map method.
- class wepy.work_mapper.mapper.Mapper(segment_func=None, **kwargs)[source]¶
Bases:
ABCMapper
Basic non-parallel reference implementation of a mapper.
Constructor for the Mapper class. No arguments are required.
- Parameters:
segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.
- map(*args, **kwargs)[source]¶
Map the ‘segment_func’ to args.
- Parameters:
*args (list of list) – Each element is the argument to one call of ‘segment_func’.
- Returns:
results – The results of each call to ‘segment_func’ in the same order as input.
- Return type:
Examples
>>> Mapper(segment_func=sum).map([(0,1,2), (3,4,5)]) [3, 12]
- property worker_segment_times¶
The run timings for each segment for each walker.
- property attributes¶
- cleanup(**kwargs)¶
Runtime post-simulation tasks.
This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.
The Mapper class performs no actions here and all arguments are ignored.
- init(segment_func=None, **kwargs)¶
Runtime initialization and setting of function to map over walkers.
- Parameters:
segment_func (callable implementing the Runner.run_segment interface)
- property segment_func¶
The function that will be called for new data in the map method.
- class wepy.work_mapper.mapper.Task(func, *args, **kwargs)[source]¶
Bases:
object
Class that composes a function and arguments.
Constructor for Task.
- Parameters:
func (callable) – Function to be called on the arguments.
*args – The arguments to pass to func
- exception wepy.work_mapper.mapper.WrapperException(message, wrapped_exception=None, tb=None)[source]¶
Bases:
Exception
Exception used for wrapping another exception.
Since tracebacks can’t be pickled we format it and save that instead.
- args¶
- with_traceback()¶
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception wepy.work_mapper.mapper.TaskException(message, wrapped_exception=None, tb=None)[source]¶
Bases:
WrapperException
- args¶
- with_traceback()¶
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class wepy.work_mapper.mapper.ABCWorkerMapper(num_workers=None, segment_func=None, proc_start_method='fork', **kwargs)[source]¶
Bases:
ABCMapper
Constructor for WorkerMapper.
- Parameters:
num_workers (int) – The number of worker processes to spawn.
segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.
proc_start_method (str or None) – A string indicating the type of process start method to use from python multiprocessing typically ‘fork’, ‘spawn’, or ‘forkserver’, or the platform default for None. See documentation. Generates a context with the method multiprocessing.get_context(proc_start_method) on init.
- init(num_workers=None, segment_func=None, **kwargs)[source]¶
Runtime initialization and setting of function to map over walkers.
- Parameters:
num_workers (int) – The number of worker processes to spawn
segment_func (callable implementing the Runner.run_segment interface)
- cleanup(**kwargs)[source]¶
Runtime post-simulation tasks.
This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.
The Mapper class performs no actions here and all arguments are ignored.
- property num_workers¶
The number of worker processes.
- property worker_segment_times¶
The run timings for each segment for each walker.
- _make_task(*args, **kwargs)[source]¶
Generate a task from ‘segment_func’ attribute.
Similar to partial evaluation (or currying).
Args will be eventually used as the arguments to the call of ‘segment_func’ by the worker processes when they receive the task from the queue.
- Returns:
task
- Return type:
Task object
- property attributes¶
- map(*args, **kwargs)¶
- property segment_func¶
The function that will be called for new data in the map method.
- exception wepy.work_mapper.mapper.WorkerException(message, wrapped_exception=None, tb=None)[source]¶
Bases:
WrapperException
- args¶
- with_traceback()¶
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception wepy.work_mapper.mapper.WorkerKilledError[source]¶
Bases:
ChildProcessError
- args¶
- characters_written¶
- errno¶
POSIX exception code
- filename¶
exception filename
- filename2¶
second exception filename
- strerror¶
exception strerror
- with_traceback()¶
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class wepy.work_mapper.mapper.WorkerMapper(num_workers=None, worker_type=None, worker_attributes=None, segment_func=None, **kwargs)[source]¶
Bases:
ABCWorkerMapper
Work mapper implementation using multiple worker processes and task queue.
Uses the python multiprocessing module to spawn multiple worker processes which watch a task queue of walker segments.
Constructor for WorkerMapper.
- Parameters:
num_workers (int) – The number of worker processes to spawn.
worker_type (callable, optional) – Callable that generates an object implementing the Worker interface, typically a type from a Worker class.
worker_attributes (dictionary) – A dictionary of values that are passed to the worker constructor as key-word arguments.
segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.
- property worker_type¶
The callable that generates a worker object.
Typically this is just the type from the class definition of the Worker where the constructor is called.
- init(num_workers=None, segment_func=None, **kwargs)[source]¶
Runtime initialization and setting of function to map over walkers.
- Parameters:
num_workers (int) – The number of worker processes to spawn
segment_func (callable implementing the Runner.run_segment interface)
- cleanup(**kwargs)[source]¶
Runtime post-simulation tasks.
This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.
The Mapper class performs no actions here and all arguments are ignored.
- _make_task(*args, **kwargs)¶
Generate a task from ‘segment_func’ attribute.
Similar to partial evaluation (or currying).
Args will be eventually used as the arguments to the call of ‘segment_func’ by the worker processes when they receive the task from the queue.
- Returns:
task
- Return type:
Task object
- property attributes¶
- property num_workers¶
The number of worker processes.
- property segment_func¶
The function that will be called for new data in the map method.
- class wepy.work_mapper.mapper.Worker(worker_idx, task_queue, result_queue, exception_queue, interrupt_connection, mapper_attributes=None, log_level='INFO', **kwargs)[source]¶
Bases:
Process
Worker process.
This is a subclass of process with an overriden __init__ constructor that will automatically generate the Process.
When this class is constructed a new process will be formed.
Constructor for the Worker class.
- Parameters:
worker_idx (int) – The index of the worker. Should be unique.
task_queue (multiprocessing.JoinableQueue) – The shared task queue the worker will watch for new tasks to complete.
result_queue (multiprocessing.Queue) – The shared queue that completed task results will be placed on.
interrupt_connection (multiprocessing.Connection) – One end of a pipe to listen for messages specific to this worker.
mapper_attributes (None or dict) – A dictionary of the attributes of the mapper for reference in workers.
kwargs – The worker specific attributes
- NAME_TEMPLATE = 'Worker-{}'¶
A string formatting template to identify worker processes in logs. The field will be filled with the worker index.
- property worker_idx¶
Dictionary of attributes of the worker.
- property attributes¶
Dictionary of attributes of the worker.
- property mapper_attributes¶
Dictionary of attributes of the worker.
- static _Popen(process_obj)¶
- static _after_fork()¶
- _bootstrap(parent_sentinel=None)¶
- _check_closed()¶
- _start_method = None¶
- property authkey¶
- close()¶
Close the Process object.
This method releases resources held by the Process object. It is an error to call this method if the child process is still running.
- property daemon¶
Return whether process is a daemon
- property exitcode¶
Return exit code of process or None if it has yet to stop
- property ident¶
Return identifier (PID) of process or None if it has yet to start
- is_alive()¶
Return whether process is alive
- join(timeout=None)¶
Wait until child process terminates
- kill()¶
Terminate process; sends SIGKILL signal or uses TerminateProcess()
- property name¶
- property pid¶
Return identifier (PID) of process or None if it has yet to start
- property sentinel¶
Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.
- start()¶
Start child process
- terminate()¶
Terminate process; sends SIGTERM signal or uses TerminateProcess()
- run_task(task)[source]¶
Actually executes the task.
This default runner simply executes the task thunk.
This can be customized by subclasses in order to allow for injection of worker specific data.
- Parameters:
task (Task object) – The partially evaluated task; function plus arguments
- Returns:
Results of running the task.
- Return type:
task_result
- _run_task(task)[source]¶
Runs the given task and returns the results.
This manages handling exceptions and tracebacks from the actual run_task function which is intended to be specialized by different workers to inject worker specific arguments to tasks. Such as node and device identification.
- Parameters:
task (Task object) – The partially evaluated task; function plus arguments
- Returns:
Results of running the task.
- Return type:
task_result