TaskSystem

Here are all subprocess, threading etc related utilities, most of them quite low level.

TaskSystem.execInMainProc(func)[source]
TaskSystem.ExecInMainProcDecorator(func)[source]
exception TaskSystem.AsyncInterrupt[source]
exception TaskSystem.ForwardedKeyboardInterrupt[source]
TaskSystem.asyncCall(func, name=None, mustExec=False)[source]

This executes func() in another process and waits/blocks until it is finished. The returned value is passed back to this process and returned. Exceptions are passed back as well and will be reraised here.

If mustExec is set, the other process must exec() after the fork(). If it is not set, it might omit the exec(), depending on the platform.

class TaskSystem.SharedMem(size, shmid=None)[source]
exception ShmException[source]
exception SharedMem.CCallException[source]
SharedMem.libc_so = 'libc.so.6'[source]
SharedMem.libc = <CDLL 'libc.so.6', handle 7ffbb93de9b0 at 7ffb995c6710>[source]
SharedMem.shm_key_t[source]

alias of c_int

SharedMem.IPC_PRIVATE = 0[source]
SharedMem.IPC_RMID = 0[source]
SharedMem.shmget = <_FuncPtr object>[source]
SharedMem.shmat = <_FuncPtr object>[source]
SharedMem.shmdt = <_FuncPtr object>[source]
SharedMem.shmctl = <_FuncPtr object>[source]
SharedMem.memcpy = <_FuncPtr object>[source]
classmethod SharedMem.check_ccall_error(check, f)[source]
classmethod SharedMem.is_shmget_functioning()[source]
SharedMem.remove()[source]
SharedMem.ctypes = <module 'ctypes' from '/usr/lib/python2.7/ctypes/__init__.pyc'>[source]
TaskSystem.next_power_of_two(n)[source]
class TaskSystem.SharedNumpyArray(shape, strides, typestr, mem=None, array_id=None)[source]

This class provides a way to create Numpy arrays in shared memory. It adds some logic to mark whether some shared memory segment can be reused - that is when the client marks it as unused.

Note that there are a few similar Python modules:
https://pypi.python.org/pypi/SharedArray http://parad0x.org/git/python/shared-array/about https://bitbucket.org/cleemesser/numpy-sharedmem/src http://stackoverflow.com/questions/5033799/how-do-i-pass-large-numpy-arrays http://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory
ServerLock = <thread.lock object>[source]
ServerInstances = set([])[source]
ServerArrayId = 0[source]
exception TooMuchInstances[source]
SharedNumpyArray.ExtraSpaceBytes = 4096[source]
static SharedNumpyArray.numpy_strides_for_fortran(shape, typestr)[source]
static SharedNumpyArray.numpy_strides_for_c_contiguous(shape, typestr)[source]
classmethod SharedNumpyArray.needed_mem_size(shape, typestr)[source]
classmethod SharedNumpyArray.as_shared(array)[source]
classmethod SharedNumpyArray.create_copy(array)[source]
classmethod SharedNumpyArray.create_new(shape, strides, typestr)[source]
SharedNumpyArray.is_server = False[source]
SharedNumpyArray.mem = None[source]
SharedNumpyArray.get_numpy_array_data_ptr()[source]
SharedNumpyArray.create_numpy_array()[source]
SharedNumpyArray.is_in_use()[source]
SharedNumpyArray.set_unused()[source]
SharedNumpyArray.shape = None[source]
SharedNumpyArray.strides = None[source]
SharedNumpyArray.typestr = None[source]
TaskSystem.attrChain(base, *attribs, **kwargs)[source]
TaskSystem.funcCall(attrChainArgs, args=())[source]
TaskSystem.get_func_closure(f)[source]
TaskSystem.get_func_tuple(f)[source]
TaskSystem.bytes(x, *args)[source]
TaskSystem.makeFuncCell(value)[source]
TaskSystem.getModuleDict(modname, path=None)[source]
Parameters:
  • modname (str) – such that “import <modname>” would work
  • path (list[str]) – sys.path
Returns:

the dict of the mod

Return type:

dict[str]

TaskSystem.getModNameForModDict(obj)[source]
Return type:str | None

:returns The module name or None. It will not return ‘__main__’ in any case because that likely will not be the same in the unpickling environment.

TaskSystem.getNormalDict(d)[source]
Return type:dict[str]

It also removes getset_descriptor. New-style classes have those.

TaskSystem.make_numpy_ndarray_fromstring(s, dtype, shape)[source]
TaskSystem.use_shared_mem_for_numpy_array(obj)[source]
TaskSystem.numpy_set_unused(v)[source]
Parameters:v (numpy.ndarray) – array which will be marked as not-used-anymore

This will tell mechanisms like SharedNumpyArray that it can reuse the memory. On the client side, this will even unmap the memory, so any further access to it will cause a SEGFAULT.

TaskSystem.numpy_copy_and_set_unused(v)[source]
Parameters:| numpy.ndarray | object v (dict[str,numpy.ndarray|object]) – object to be handled

If v is a dict, we will return a new copied dict where every value is mapped through numpy_copy_and_set_unused. If v is a numpy.ndarray and its base is a SharedNumpyArray, we will copy it and

call numpy_set_unused on the old value.

If v is a numpy.ndarray and its base is not a SharedNumpyArray, we will just return it as it is and do nothing. In all other cases, we will also just return the object as it is and do nothing.

TaskSystem.numpy_alloc(shape, dtype, fortran_for_shared=False)[source]

If EnableAutoNumpySharedMemPickling is True, this will allocate a Numpy array in shared memory so we avoid a copy later on when this Numpy array would be transferred to another process via pickling.

class TaskSystem.Pickler(*args, **kwargs)[source]

We extend the standard Pickler to be able to pickle some more types, such as lambdas and functions, code, func cells, buffer and more.

dispatch = {<type '_io.BufferedWriter'>: <function save_iobuffer_dummy>, <type 'float'>: <function save_float>, <type 'str'>: <function save_string>, <type 'module'>: <function save_module>, <type 'instance'>: <function save_inst>, <type 'classobj'>: <function save_class>, <type 'code'>: <function save_code>, <type '_io.BufferedReader'>: <function save_iobuffer_dummy>, <type 'NoneType'>: <function save_none>, <type 'numpy.ndarray'>: <function save_ndarray>, <type 'cell'>: <function save_cell>, <type 'function'>: <function save_func>, <type 'int'>: <function save_int>, <type 'type'>: <function save_type>, <type 'list'>: <function save_list>, <type 'instancemethod'>: <function save_method>, <type 'long'>: <function save_long>, <type 'bool'>: <function save_bool>, <type 'tuple'>: <function save_tuple>, <type 'builtin_function_or_method'>: <function save_global>, <type 'buffer'>: <function save_buffer>, <type 'unicode'>: <function save_unicode>, <type 'dict'>: <function intellisave_dict>}[source]
save_func(obj)[source]
save_method(obj)[source]
save_code(obj)[source]
save_cell(obj)[source]
intellisave_dict(obj)[source]
save_module(obj)[source]
save_buffer(obj)[source]
save_string(obj, pack=<built-in function pack>)[source]
save_ndarray(obj)[source]
save_iobuffer_dummy(obj)[source]
save_global(obj, name=None)[source]
save_type(obj)[source]
save_class(cls)[source]
class TaskSystem.ExecingProcess(target, args, name, env_update)[source]

This is a replacement for multiprocessing.Process which always uses fork+exec, not just fork. This ensures that you have a separate independent process. This can avoid many types of bugs, such as:

start()[source]
is_alive()[source]
join(timeout=None)[source]
Verbose = False[source]
static checkExec()[source]
exception TaskSystem.ProcConnectionDied[source]
class TaskSystem.ExecingProcess_ConnectionWrapper(fd=None, conn=None)[source]

Wrapper around multiprocessing.connection.Connection. This is needed to use our own Pickler.

poll(*args, **kwargs)[source]
send_bytes(value)[source]
send(value)[source]
recv_bytes()[source]
recv()[source]
TaskSystem.ExecingProcess_Pipe()[source]

This is like multiprocessing.Pipe(duplex=True). It uses our own ExecingProcess_ConnectionWrapper.

TaskSystem.Pipe_ConnectionWrapper(*args, **kwargs)[source]
class TaskSystem.AsyncTask(func, name=None, mustExec=False, env_update=None)[source]

This uses multiprocessing.Process or ExecingProcess to execute some function. In addition, it provides a duplex pipe for communication. This is either multiprocessing.Pipe or ExecingProcess_Pipe.

Parameters:
  • func – a function which gets a single parameter, which will be a reference to our instance in the fork, so that it can use our communication methods put/get.
  • mustExec (bool) – if True, we do fork+exec, not just fork
  • env_update (dict[str,str]) – for mustExec, also update these env vars
put(value)[source]
get()[source]
isParent[source]
isChild[source]
setCancel()[source]
terminate()[source]
join(timeout=None)[source]
is_alive()[source]
TaskSystem.WarnMustNotBeInForkDecorator(func)[source]
class TaskSystem.ReadWriteLock[source]

Classic implementation of ReadWriteLock. Note that this partly supports recursive lock usage: - Inside a readlock, a writelock will always block! - Inside a readlock, another readlock is fine. - Inside a writelock, any other writelock or readlock is fine.

readlock[source]
writelock[source]