Asynchronous function calls

Gruvi provides functionality to execute functions asynchronously outside the current flow of control. The API is a futures based interface, modeled after the concurrent.futures and asyncio packages in the Python standard library.

class Future

The state of an asynchronous function call.

A future captures the state and the future result of an asynchronous function call. Futures are not instantiated directly but are created by a PoolBase implementation. The pool accepts work in the form of a Python function via its submit() method. This method returns a future representing the state and the result of the function that will be executed asynchronously in the pool.

A future progresses through different states during its lifecyle:

  • Initially the future is in the pending state.
  • Once the pool has capacity to run the function, it moves to the running state. In this state, running() will return True.
  • Once the function completes or raises an exception, the future moves to the done state. In this state, done() will return True.

The future and the asynchronous function it represents are two distinct entities but for brevity the terms are often used interchangeably, including in this manual. For example, if the asynchronous function completes it is said that the future has completed, and cancelling the future really means cancelling the asynchronous function it is capturing the state of.

running()

Return whether this future is running.

cancelled()

Return whether this future was successfully cancelled.

done()

Return whether this future is done.

cancel()

Cancel the execution of the async function, if possible.

This method marks the future as done and sets the Cancelled exception.

A future that is not running can always be cancelled. However when a future is running, the ability to cancel it depends on the pool implementation. For example, a fiber pool can cancel running fibers but a thread pool cannot.

Return True if the future could be cancelled, False otherwise.

switchpoint result(timeout=None)

Wait for the future to complete and return its result.

If the function returned normally, its return value is returned here. If the function raised an exception, the exception is re-raised here.

switchpoint exception(timeout=None)

Wait for the async function to complete and return its exception.

If the function did not raise an exception this returns None.

add_done_callback(callback, *args)

Add a callback that gets called when the future completes.

The callback will be called in the context of the fiber that sets the future’s result. The callback is called with the positional arguments args provided to this method.

The return value is an opaque handle that can be used with remove_done_callback() to remove the callback.

If the future has already completed, then the callback is called immediately from this method and the return value will be None.

remove_done_callback(handle)

Remove a callback that was added by add_done_callback().

It is not an error to remove a callback that was already removed.

class PoolBase(maxsize=None, minsize=0, name=None)

Base class for pools.

A pool contains a set of workers that can execute function calls asynchronously.

The maxsize argument specifies the maximum numbers of workers that will be created in the pool. If maxsize is None then the pool can grow without bound.

Normally the pool starts with zero workers and grows up to maxsize on demand. The minsize parameter can be used to change this behavior an make sure that there will always be at least this many workers in the pool.

The name parameter gives a name for this pool. The pool name will show up in log messages related to the pool.

maxsize

The maximum size of this pool.

minsize

The minimum size of this pool.

name

The pool name.

submit(func, *args)

Run func asynchronously.

The function is run in the pool which will run it asynchrously. The function is called with positional argument args.

The return value is a Future that captures the state and the future result of the asynchronous function call.

switchpoint map(func, *iterables, **kwargs)

Apply func to the elements of the sequences in iterables.

All invocations of func are run in the pool. If multiple iterables are provided, then func must take this many arguments, and is applied with one element from each iterable. All iterables must yield the same number of elements.

An optional timeout keyword argument may be provided to specify a timeout.

This returns a generator yielding the results.

switchpoint join()

Wait until all jobs in the pool have completed.

New submissions are not blocked. This means that if you continue adding work via submit() or map() then this method might never finish.

switchpoint close()

Close the pool and wait for all workers to exit.

New submissions will be blocked. Workers will exit once their current job is finished. This method will return after all workers have exited.

class FiberPool(*args, **kwargs)

A pool that uses fiber workers.

class ThreadPool(*args, **kwargs)

A pool that uses thread workers.

get_io_pool()

Return the thread pool for IO tasks.

By default there is one IO thread pool per application, which is shared with all threads.

get_cpu_pool()

Return the thread pool for CPU intenstive tasks.

By default there is one CPU thread pool per application, which it is shared with all threads.

blocking(func, *args, **kwargs)

Run a function that uses blocking IO.

The function is run in the IO thread pool.

switchpoint wait(objects, count=None, timeout=None)

Wait for one or more waitable objects.

This method waits until count elements from the sequence of waitable objects objects have become ready. If count is None (the default), then wait for all objects to become ready.

What “ready” is means depends on the object type. A waitable object is a objects that implements the add_done_callback() and remove_done_callback methods. This currently includes:

  • Event - an event is ready when its internal flag is set.
  • Future - a future is ready when its result is set.
  • Fiber - a fiber is ready when has terminated.
  • Process - a process is ready when the child has exited.
switchpoint as_completed(objects, count=None, timeout=None)

Wait for one or more waitable objects, yielding them as they become ready.

This is the iterator/generator version of wait().