Synchronization primitives

Gruvi contains a set of primitives that can be used for synchronization between multiple fibers and threads. All primitives documented below are thread safe. They are modeled after the primitives in the Python threading and queue modules.

class Lock

A lock.

The lock can be locked and unlocked explicitly using acquire() and release(), and it can also be used as a context manager.

switchpoint acquire(blocking=True, timeout=None)

Acquire the lock.

If blocking is true (the default), then this will block until the lock can be acquired. The timeout parameter specifies an optional timeout in seconds.

The return value is a boolean indicating whether the lock was acquired.

locked()

Whether the lock is currently locked.

release()

Release the lock.

class RLock

A reentrant lock.

A reentrant lock has the notion of a “lock owner” and a “lock count”. If a reentrant lock is acquired, and it was already acquired by the current fiber, then the lock count is increased and the acquire call will be successful. Unlocking a reentrant lock may only be done by the lock owner. The lock becomes unlocked only after it is released as many times as it was acquired.

switchpoint acquire(blocking=True, timeout=None)

Acquire the lock.

If blocking is true (the default), then this will block until the lock can be acquired. The timeout parameter specifies an optional timeout in seconds.

The return value is a boolean indicating whether the lock was acquired.

locked()

Whether the lock is currently locked.

release()

Release the lock.

class Event

An event.

An event contains an internal flag that is initially False. The flag can be set using the set() method and cleared using the clear() method. Fibers can wait for the flag to become set using wait().

Events are level triggered, meaning that the condition set by set() is “sticky”. Setting the event will unblock all current waiters and will cause future calls to wait() not to block, until clear() is called again.

set()

Set the internal flag, and wake up any fibers blocked on wait().

clear()

Clear the internal flag.

switchpoint wait(timeout=None)

If the internal flag is set, return immediately. Otherwise block until the flag gets set by another fiber calling set().

class Condition(lock=None)

A condition.

A condition is always associated with a lock. The state of the condition may only change when the caller has acquired the lock. While the lock is held, a condition can be waited for using wait(). The wait method will release the lock just before blocking itself, so that another fiber can call notify() to notify the condition.

The difference between a condition and an Event is that a condition is edge-trigerred. This means that when a condition is notified, only fibers that are waiting at the time of notification are unblocked. Any fiber that calls wait() after the notification, will block until the next notification. This also explains why a lock is needed. Without the lock there would be a race condition between notification and waiting.

The lock argument can be used to share a lock between multiple conditions. It must be a Lock or RLock instance. If no lock is provided, a RLock is allocated.

notify(n=1)

Raise the condition and wake up fibers waiting on it.

The optional n parameter specifies how many fibers will be notified. By default, one fiber is notified.

notify_all()

Raise the condition and wake up all fibers waiting on it.

switchpoint wait(timeout=None)

Wait for the condition to be notified.

The return value is True, unless a timeout occurred in which case the return value is False.

The lock must be held before calling this method. This method will release the lock just before blocking itself, and it will re-acquire it before returning.

switchpoint wait_for(predicate, timeout=None)

Like wait() but additionally for predicate to be true.

The predicate argument must be a callable that takes no arguments. Its result is interpreted as a boolean value.

exception QueueEmpty

Queue is empty.

exception QueueFull

Queue is full.

class Queue(maxsize=0)

A synchronized FIFO queue.

The maxsize argument specifies the maximum queue size. If it is less than or equal to zero, the queue size is infinite.

qsize()

Return the size of the queue, which is the sum of the size of all its elements.

switchpoint put(item, block=True, timeout=None, size=None)

Put item into the queue.

If the queue is currently full and block is True (the default), then wait up to timeout seconds for space to become available. If no timeout is specified, then wait indefinitely.

If the queue is full and block is False or a timeout occurs, then raise a QueueFull exception.

The optional size argument may be used to specify a custom size for the item. The total qsize() of the queue is the sum of the sizes of all the items. The default size for an item is 1.

put_nowait(item, size=None)

“Equivalent of put(item, False).

switchpoint get(block=True, timeout=None)

Pop an item from the queue.

If the queue is not empty, an item is returned immediately. Otherwise, if block is True (the default), wait up to timeout seconds for an item to become available. If not timeout is provided, then wait indefinitely.

If the queue is empty and block is false or a timeout occurs, then raise a QueueEmpty exception.

get_nowait()

“Equivalent of get(False).

task_done()

Mark a task as done.

join()

Wait until all tasks are done.

class LifoQueue(maxsize=0)

A queue with LIFO behavior.

See Queue for a description of the API.

The maxsize argument specifies the maximum queue size. If it is less than or equal to zero, the queue size is infinite.

class PriorityQueue(maxsize=0)

A priority queue.

Items that are added via put() are typically (priority, item) tuples. Lower values for priority indicate a higher priority.

See Queue for a description of the API.

The maxsize argument specifies the maximum queue size. If it is less than or equal to zero, the queue size is infinite.