wepy.work_mapper.task_mapper module¶
-
exception
wepy.work_mapper.task_mapper.
TaskProcessException
(message, wrapped_exception=None, tb=None)[source]¶ Bases:
wepy.work_mapper.mapper.WrapperException
-
args
¶
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
-
exception
wepy.work_mapper.task_mapper.
TaskProcessKilledError
[source]¶ Bases:
ChildProcessError
-
args
¶
-
characters_written
¶
-
errno
¶ POSIX exception code
-
filename
¶ exception filename
-
filename2
¶ second exception filename
-
strerror
¶ exception strerror
-
with_traceback
()¶ Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
-
-
class
wepy.work_mapper.task_mapper.
TaskMapper
(walker_task_type=None, num_workers=None, segment_func=None, **kwargs)[source]¶ Bases:
wepy.work_mapper.mapper.ABCWorkerMapper
Process-per-task mapper.
This method of work mapper starts new processes for each runner segment task that needs to be run. This allows cheap copying of shared state using the operating system primitives. On linux this would be either ‘fork’ (default) or ‘spawn’. Fork is cheap but doesn’t initialize certain process namespace things, whereas spawn is much more expensive but properly cleans things up. Fork should be sufficient in most cases, however spawn may be needed when you have some special contexts in the parent process. This is the case with starting CUDA contexts in the main parent process and then forking new processes from it. We suggest using fork and avoiding making these kinds of contexts in the main process.
This method avoids using shared memory or sending objects through interprocess communication (that has a serialization and deserialization cost associated with them) by using OS copying mechanism. However, a new process will be created each cycle for each walker in the simulation. So if you want a large number of walkers you may experience a large overhead. If your walker states are very small or a very fast serializer is available you may also not benefit from full process address space copies. Instead the WorkerMapper may be better suited.
Constructor for WorkerMapper.
- Parameters
num_workers (int) – The number of worker processes to spawn.
segment_func (callable, optional) – Set a default segment_func. Typically set at runtime.
proc_start_method (str or None) – A string indicating the type of process start method to use from python multiprocessing typically ‘fork’, ‘spawn’, or ‘forkserver’, or the platform default for None. See documentation. Generates a context with the method multiprocessing.get_context(proc_start_method) on init.
-
init
(**kwargs)[source]¶ Runtime initialization and setting of function to map over walkers.
- Parameters
num_workers (int) – The number of worker processes to spawn
segment_func (callable implementing the Runner.run_segment interface) –
-
property
walker_task_type
¶ The callable that generates a worker object.
Typically this is just the type from the class definition of the Worker where the constructor is called.
-
_make_task
(*args, **kwargs)¶ Generate a task from ‘segment_func’ attribute.
Similar to partial evaluation (or currying).
Args will be eventually used as the arguments to the call of ‘segment_func’ by the worker processes when they receive the task from the queue.
- Returns
task
- Return type
Task object
-
property
attributes
¶
-
cleanup
(**kwargs)¶ Runtime post-simulation tasks.
This is run either at the end of a successful simulation or upon an error in the main process of the simulation manager call to run_cycle.
The Mapper class performs no actions here and all arguments are ignored.
-
property
num_workers
¶ The number of worker processes.
-
property
segment_func
¶ The function that will be called for new data in the map method.
-
property
worker_segment_times
¶ The run timings for each segment for each walker.
- Returns
worker_seg_times – Dictionary mapping worker indices to a list of times in seconds for each segment run.
- Return type
dict of int : list of float
-
class
wepy.work_mapper.task_mapper.
WalkerTaskProcess
(walker_idx, mapper_attributes, func, task_args, task_kwargs, worker_queue, results_list, worker_segment_times, interrupt_connection, **kwargs)[source]¶ Bases:
multiprocessing.context.Process
-
NAME_TEMPLATE
= 'Walker-{}'¶
-
property
attributes
¶
-
static
_Popen
(process_obj)¶
-
_bootstrap
()¶
-
_check_closed
()¶
-
_start_method
= None¶
-
property
authkey
¶
-
close
()¶ Close the Process object.
This method releases resources held by the Process object. It is an error to call this method if the child process is still running.
-
property
daemon
¶ Return whether process is a daemon
-
property
exitcode
¶ Return exit code of process or None if it has yet to stop
-
property
ident
¶ Return identifier (PID) of process or None if it has yet to start
-
is_alive
()¶ Return whether process is alive
-
join
(timeout=None)¶ Wait until child process terminates
-
kill
()¶ Terminate process; sends SIGKILL signal or uses TerminateProcess()
-
property
name
¶
-
property
pid
¶ Return identifier (PID) of process or None if it has yet to start
-
property
sentinel
¶ Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.
-
start
()¶ Start child process
-
terminate
()¶ Terminate process; sends SIGTERM signal or uses TerminateProcess()
-