rlib
Convenience library for useful things
Loading...
Searching...
No Matches
Task queue

Deferred-work queue: callers enqueue RTask objects with optional dependencies, worker threads owned by the queue pick them up and run them to completion. More...

Macros

#define r_task_ref   r_ref_ref
 Take a reference on a task (alias for r_ref_ref).
 
#define r_task_unref   r_ref_unref
 Drop a reference on a task (alias for r_ref_unref).
 
#define r_task_queue_ref   r_ref_ref
 Take a reference on the queue (alias for r_ref_ref).
 
#define r_task_queue_unref   r_ref_unref
 Drop a reference on the queue (alias for r_ref_unref).
 
#define r_task_queue_add_task(queue, task)    r_task_queue_add_task_with_group (queue, task, RUINT_MAX)
 Dispatch a pre-allocated task to any worker group.
 
#define r_task_queue_add(queue, func, data, datanotify)    r_task_queue_add_full (queue, RUINT_MAX, func, data, datanotify, NULL)
 Allocate-and-dispatch a task in one call, no dependencies.
 

Typedefs

typedef struct RTask RTask
 Opaque, refcounted task handle.
 
typedef struct RTaskQueue RTaskQueue
 Opaque, refcounted task-queue handle.
 
typedef void(* RTaskFunc) (rpointer data, RTaskQueue *queue, RTask *task)
 Task entry-point signature.
 

Functions

rboolean r_task_add_dep (RTask *task, RTask *dep,...)
 Add one or more dependencies to a task that has not yet run.
 
rboolean r_task_add_dep_v (RTask *task, RTask *dep, va_list args)
 va_list variant of r_task_add_dep.
 
rboolean r_task_cancel (RTask *task, rboolean wait_if_running)
 Cancel a pending or running task.
 
rboolean r_task_wait (RTask *task)
 Block until task has finished (cancelled tasks count as finished).
 
RTaskQueuer_task_queue_new (ruint groups, ruint threads_per_group)
 Create a queue with groups worker groups, each containing threads_per_group worker threads.
 
RTaskQueuer_task_queue_new_pin_and_group_on_numa_node (const RBitset *nodeset, ruint threads_per_group)
 One group per NUMA node selected by nodeset, each with threads_per_group workers pinned to that node's CPUs.
 
RTaskQueuer_task_queue_new_pin_on_cpu_group_numa_node (const RBitset *nodeset)
 One worker per CPU on the selected NUMA nodes, grouped per NUMA node.
 
RTaskQueuer_task_queue_new_pin_on_each_cpu (const RBitset *cpuset, ruint groups)
 One worker per CPU in cpuset; workers are partitioned evenly across groups groups.
 
RTaskQueuer_task_queue_new_pin_on_each_cpu_group_numa_node (const RBitset *cpuset)
 One worker per CPU in cpuset, grouped by the NUMA node each CPU belongs to.
 
RTaskQueuer_task_queue_current (void)
 Return the queue owning the calling thread, or NULL if the calling thread is not a queue worker.
 
RTaskr_task_queue_allocate (RTaskQueue *queue, RTaskFunc func, rpointer data, RDestroyNotify datanotify)
 Allocate a task bound to queue without dispatching it.
 
rboolean r_task_queue_add_task_with_group (RTaskQueue *queue, RTask *task, ruint group)
 Dispatch a pre-allocated task to a specific group.
 
RTaskr_task_queue_add_full (RTaskQueue *queue, ruint group, RTaskFunc func, rpointer data, RDestroyNotify datanotify,...) R_ATTR_WARN_UNUSED_RESULT
 Allocate-and-dispatch a task with group and dependencies.
 
RTaskr_task_queue_add_full_v (RTaskQueue *queue, ruint group, RTaskFunc func, rpointer data, RDestroyNotify datanotify, va_list args) R_ATTR_WARN_UNUSED_RESULT
 va_list variant of r_task_queue_add_full.
 
rsize r_task_queue_queued_tasks (const RTaskQueue *queue)
 Number of tasks currently queued but not yet running.
 
ruint r_task_queue_group_count (const RTaskQueue *queue)
 Number of worker groups in queue.
 
ruint r_task_queue_thread_count (const RTaskQueue *queue)
 Total number of worker threads across all groups.
 

Detailed Description

Deferred-work queue: callers enqueue RTask objects with optional dependencies, worker threads owned by the queue pick them up and run them to completion.

A queue is partitioned into one or more groups, each backed by a set of worker threads. r_task_queue_add_task_with_group (or the r_task_queue_add_full convenience) pins the task to a specific group; RUINT_MAX means "any group". Workers can be pinned to individual CPUs or NUMA nodes via the _pin_on_* constructors for cache locality.

Each task is refcounted and one-shot. Tasks may declare dependencies on other tasks; the queue only dispatches a task once every dep has finished. r_task_cancel removes a task from the queue (optionally waiting if it has already started), r_task_wait blocks until completion.

Macro Definition Documentation

◆ r_task_queue_add

#define r_task_queue_add (   queue,
  func,
  data,
  datanotify 
)     r_task_queue_add_full (queue, RUINT_MAX, func, data, datanotify, NULL)

Allocate-and-dispatch a task in one call, no dependencies.

Convenience wrapper around r_task_queue_add_full with RUINT_MAX group and no deps.

◆ r_task_queue_add_task

#define r_task_queue_add_task (   queue,
  task 
)     r_task_queue_add_task_with_group (queue, task, RUINT_MAX)

Dispatch a pre-allocated task to any worker group.

Convenience wrapper that passes RUINT_MAX as the group.

Typedef Documentation

◆ RTaskFunc

typedef void(* RTaskFunc) (rpointer data, RTaskQueue *queue, RTask *task)

Task entry-point signature.

Parameters
dataOpaque payload supplied at task creation.
queueQueue dispatching this task.
taskThe task being run (useful for spawning continuations).

Function Documentation

◆ r_task_add_dep()

rboolean r_task_add_dep ( RTask task,
RTask dep,
  ... 
)

Add one or more dependencies to a task that has not yet run.

The variadic argument list is a NULL-terminated sequence of additional RTask pointers; dep is required, the rest are optional. task will not be dispatched until every listed dependency has finished (cancelled deps also satisfy the wait).

Returns
TRUE if all deps were attached, FALSE if task had already been dispatched.

◆ r_task_cancel()

rboolean r_task_cancel ( RTask task,
rboolean  wait_if_running 
)

Cancel a pending or running task.

Parameters
taskTask to cancel.
wait_if_runningIf TRUE and the task has already started, block until it returns. If FALSE, return immediately leaving the task to finish on its own.
Returns
TRUE if the task was cancelled before running, FALSE if it had already started.

◆ r_task_queue_add_full()

RTask * r_task_queue_add_full ( RTaskQueue queue,
ruint  group,
RTaskFunc  func,
rpointer  data,
RDestroyNotify  datanotify,
  ... 
)

Allocate-and-dispatch a task with group and dependencies.

Parameters
queueQueue the task will run on.
groupGroup index, or RUINT_MAX for "any group".
funcTask entry point.
dataPayload forwarded to func.
datanotifyDestructor for data; may be NULL.
...NULL-terminated list of RTask dependencies.
Returns
Refcounted handle to the newly-queued task.

◆ r_task_queue_add_task_with_group()

rboolean r_task_queue_add_task_with_group ( RTaskQueue queue,
RTask task,
ruint  group 
)

Dispatch a pre-allocated task to a specific group.

Parameters
queueQueue the task is bound to.
taskTask to dispatch.
groupGroup index, or RUINT_MAX to let the queue pick.
Returns
TRUE on success, FALSE if the group is invalid or the task has already been dispatched.

◆ r_task_queue_allocate()

RTask * r_task_queue_allocate ( RTaskQueue queue,
RTaskFunc  func,
rpointer  data,
RDestroyNotify  datanotify 
)

Allocate a task bound to queue without dispatching it.

Useful when the task needs dependencies (see r_task_add_dep) attached before it goes live. Use r_task_queue_add_task to dispatch.

Parameters
queueQueue the task will run on.
funcTask entry point.
dataPayload forwarded to func.
datanotifyDestructor called on data when the task is freed; may be NULL.

◆ r_task_queue_new()

RTaskQueue * r_task_queue_new ( ruint  groups,
ruint  threads_per_group 
)

Create a queue with groups worker groups, each containing threads_per_group worker threads.

No affinity hints are applied; threads run wherever the OS schedules them. Use one of the _pin_on_* constructors for NUMA-aware pinning.

◆ r_task_wait()

rboolean r_task_wait ( RTask task)

Block until task has finished (cancelled tasks count as finished).

Returns
TRUE on a successful wait, FALSE if task is invalid.