TaskSystem

Here are all subprocess, threading etc related utilities, most of them quite low level.

TaskSystem.execInMainProc(func)[source]
TaskSystem.ExecInMainProcDecorator(func)[source]
exception TaskSystem.AsyncInterrupt[source]
exception TaskSystem.ForwardedKeyboardInterrupt[source]
TaskSystem.asyncCall(func, name=None, mustExec=False)[source]

This executes func() in another process and waits/blocks until it is finished. The returned value is passed back to this process and returned. Exceptions are passed back as well and will be reraised here.

If mustExec is set, the other process must exec() after the fork(). If it is not set, it might omit the exec(), depending on the platform.

class TaskSystem.SharedMem(size, shmid=None)[source]
exception ShmException[source]
exception CCallException[source]
libc_so = 'libc.so.6'[source]
libc = <CDLL 'libc.so.6', handle 7f22139c39b0>[source]
shm_key_t[source]

alias of c_int

IPC_PRIVATE = 0[source]
IPC_RMID = 0[source]
shmget = <_FuncPtr object>[source]
shmat = <_FuncPtr object>[source]
shmdt = <_FuncPtr object>[source]
shmctl = <_FuncPtr object>[source]
memcpy = <_FuncPtr object>[source]
classmethod check_ccall_error(check, f)[source]
classmethod is_shmget_functioning()[source]
remove()[source]
ctypes = <module 'ctypes' from '/usr/lib/python3.5/ctypes/__init__.py'>[source]
TaskSystem.next_power_of_two(n)[source]
class TaskSystem.SharedNumpyArray(shape, strides, typestr, mem=None, array_id=None)[source]

This class provides a way to create Numpy arrays in shared memory. It adds some logic to mark whether some shared memory segment can be reused - that is when the client marks it as unused.

Note that there are a few similar Python modules:
https://pypi.python.org/pypi/SharedArray http://parad0x.org/git/python/shared-array/about https://bitbucket.org/cleemesser/numpy-sharedmem/src http://stackoverflow.com/questions/5033799/how-do-i-pass-large-numpy-arrays http://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory
ServerLock = <unlocked _thread.lock object>[source]
ServerInstances = set()[source]
ServerArrayId = 0[source]
exception TooMuchInstances[source]
ExtraSpaceBytes = 4096[source]
static numpy_strides_for_fortran(shape, typestr)[source]
static numpy_strides_for_c_contiguous(shape, typestr)[source]
classmethod needed_mem_size(shape, typestr)[source]
classmethod as_shared(array)[source]
classmethod create_copy(array)[source]
classmethod create_new(shape, strides, typestr)[source]
is_server = False[source]
mem = None[source]
get_numpy_array_data_ptr()[source]
create_numpy_array()[source]
is_in_use()[source]
set_unused()[source]
shape = None[source]
strides = None[source]
typestr = None[source]
TaskSystem.attrChain(base, *attribs, **kwargs)[source]
TaskSystem.funcCall(attrChainArgs, args=())[source]
TaskSystem.get_func_closure(f)[source]
TaskSystem.get_func_tuple(f)[source]
TaskSystem.make_buffer(*args)[source]
class TaskSystem.BufferType[source]

Dummy

class TaskSystem.OldStyleClass[source]

Dummy

TaskSystem.makeFuncCell(value)[source]
TaskSystem.getModuleDict(modname, path=None)[source]
Parameters:
  • modname (str) – such that “import <modname>” would work
  • path (list[str]) – sys.path
Returns:

the dict of the mod

Return type:

dict[str]

TaskSystem.getModNameForModDict(obj)[source]
Return type:str | None

:returns The module name or None. It will not return ‘__main__’ in any case because that likely will not be the same in the unpickling environment.

TaskSystem.getNormalDict(d)[source]
Return type:dict[str]

It also removes getset_descriptor. New-style classes have those.

TaskSystem.make_numpy_ndarray_fromstring(s, dtype, shape)[source]
TaskSystem.use_shared_mem_for_numpy_array(obj)[source]
TaskSystem.numpy_set_unused(v)[source]
Parameters:v (numpy.ndarray) – array which will be marked as not-used-anymore

This will tell mechanisms like SharedNumpyArray that it can reuse the memory. On the client side, this will even unmap the memory, so any further access to it will cause a SEGFAULT.

TaskSystem.numpy_copy_and_set_unused(v)[source]
Parameters:| numpy.ndarray | object v (dict[str,numpy.ndarray|object]) – object to be handled

If v is a dict, we will return a new copied dict where every value is mapped through numpy_copy_and_set_unused. If v is a numpy.ndarray and its base is a SharedNumpyArray, we will copy it and

call numpy_set_unused on the old value.

If v is a numpy.ndarray and its base is not a SharedNumpyArray, we will just return it as it is and do nothing. In all other cases, we will also just return the object as it is and do nothing.

TaskSystem.numpy_alloc(shape, dtype, fortran_for_shared=False)[source]

If EnableAutoNumpySharedMemPickling is True, this will allocate a Numpy array in shared memory so we avoid a copy later on when this Numpy array would be transferred to another process via pickling.

class TaskSystem.Pickler(*args, **kwargs)[source]

We extend the standard Pickler to be able to pickle some more types, such as lambdas and functions, code, func cells, buffer and more.

dispatch = {<class 'TaskSystem.BufferType'>: <function Pickler.save_buffer>, <class 'str'>: <function Pickler.save_string>, <class 'TaskSystem.OldStyleClass'>: <function Pickler.save_class>, <class 'int'>: <function _Pickler.save_long>, <class 'frozenset'>: <function _Pickler.save_frozenset>, <class 'set'>: <function _Pickler.save_set>, <class 'bytes'>: <function _Pickler.save_bytes>, <class 'bool'>: <function _Pickler.save_bool>, <class '_io.BufferedWriter'>: <function Pickler.save_iobuffer_dummy>, <class 'tuple'>: <function _Pickler.save_tuple>, <class 'method'>: <function Pickler.save_method>, <class 'dict'>: <function Pickler.intellisave_dict>, <class 'type'>: <function Pickler.save_type>, <class 'code'>: <function Pickler.save_code>, <class 'float'>: <function _Pickler.save_float>, <class '_io.BufferedReader'>: <function Pickler.save_iobuffer_dummy>, <class 'function'>: <function Pickler.save_func>, <class 'NoneType'>: <function _Pickler.save_none>, <class 'cell'>: <function Pickler.save_cell>, <class 'module'>: <function Pickler.save_module>, <class 'numpy.ndarray'>: <function Pickler.save_ndarray>, <class 'list'>: <function _Pickler.save_list>}[source]
save_func(obj)[source]
save_method(obj)[source]
save_code(obj)[source]
save_cell(obj)[source]
intellisave_dict(obj)[source]
save_module(obj)[source]
save_buffer(obj)[source]
save_string(obj, pack=<built-in function pack>)[source]
save_ndarray(obj)[source]
save_iobuffer_dummy(obj)[source]
save_global(obj, name=None)[source]
save_type(obj)[source]
save_class(cls)[source]
class TaskSystem.ExecingProcess(target, args, name, env_update)[source]

This is a replacement for multiprocessing.Process which always uses fork+exec, not just fork. This ensures that you have a separate independent process. This can avoid many types of bugs, such as:

start()[source]
is_alive()[source]
join(timeout=None)[source]
Verbose = False[source]
static checkExec()[source]
exception TaskSystem.ProcConnectionDied[source]
class TaskSystem.ExecingProcess_ConnectionWrapper(fd=None, conn=None)[source]

Wrapper around multiprocessing.connection.Connection. This is needed to use our own Pickler.

poll(*args, **kwargs)[source]
send_bytes(value)[source]
send(value)[source]
recv_bytes()[source]
recv()[source]
TaskSystem.ExecingProcess_Pipe()[source]

This is like multiprocessing.Pipe(duplex=True). It uses our own ExecingProcess_ConnectionWrapper.

TaskSystem.Pipe_ConnectionWrapper(*args, **kwargs)[source]
class TaskSystem.AsyncTask(func, name=None, mustExec=False, env_update=None)[source]

This uses multiprocessing.Process or ExecingProcess to execute some function. In addition, it provides a duplex pipe for communication. This is either multiprocessing.Pipe or ExecingProcess_Pipe.

Parameters:
  • func – a function which gets a single parameter, which will be a reference to our instance in the fork, so that it can use our communication methods put/get.
  • mustExec (bool) – if True, we do fork+exec, not just fork
  • env_update (dict[str,str]) – for mustExec, also update these env vars
put(value)[source]
get()[source]
isParent[source]
isChild[source]
setCancel()[source]
terminate()[source]
join(timeout=None)[source]
is_alive()[source]
TaskSystem.WarnMustNotBeInForkDecorator(func)[source]
class TaskSystem.ReadWriteLock[source]

Classic implementation of ReadWriteLock. Note that this partly supports recursive lock usage: - Inside a readlock, a writelock will always block! - Inside a readlock, another readlock is fine. - Inside a writelock, any other writelock or readlock is fine.

readlock[source]
writelock[source]