9.2. Boost package for jobs and threads
The JOBQUE_BOOST module provides high-level job queue abstractions built on
the low-level jobque primitives. It includes with_job, with_job_status,
and channel-based patterns for simplified concurrent programming.
See also Jobs and threads for the low-level job queue primitives. See Job Queue (jobque) for a hands-on tutorial.
All functions and symbols are in “jobque_boost” module, use require to get access to it.
require daslib/jobque_boost
Example:
require daslib/jobque_boost
[export]
def main() {
with_job_status(1) $(status) {
new_thread() @() {
print("from thread\n")
status |> notify_and_release()
}
status |> join()
print("thread done\n")
}
}
// output:
// from thread
// thread done
9.2.1. Function annotations
- ParallelForJobMacro
Base macro for parallel_for, parallel_for_each, and parallel_map.
Wraps the block body in new_job and redirects to the runtime implementation.
- NewJobMacro
this macro handles new_job and new_thread calls. the call is replaced with new_job_invoke and new_thread_invoke accordingly. a cloning infrastructure is generated for the lambda, which is invoked in the new context.
- ParallelForEachJobMacro
This macro handles parallel_for_each.
Wraps block body in new_job and redirects to _parallel_for_each.
- ParallelMapJobMacro
This macro handles parallel_map.
Wraps block body in new_job and redirects to _parallel_map.
9.2.2. Invocations
- new_job(l: lambda<():void>)
- Create a new job.
new context is cloned from the current context.
lambda is cloned to the new context.
new job is added to the job queue.
once new job is invoked, lambda is invoked on the new context on the job thread.
- Arguments:
l : lambda<void>
- new_thread(l: lambda<():void>)
- Create a new thread
new context is cloned from the current context.
lambda is cloned to the new context.
new thread is created.
lambda is invoked on the new context on the new thread.
- Arguments:
l : lambda<void>
9.2.3. Iteration
- each(channel: Channel?; tinfo: auto(TT)): auto
Warning
This function is deprecated.
this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.
- Arguments:
channel : Channel?
tinfo : auto(TT)
- each_clone(channel: Channel?; tinfo: auto(TT)): auto
this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.
- Arguments:
channel : Channel?
tinfo : auto(TT)
- for_each(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto
Warning
This function is deprecated.
reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.
- Arguments:
channel : Channel?
blk : block<(res:auto(TT)#):void>
- for_each_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto
reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.
- Arguments:
channel : Channel?
blk : block<(res:auto(TT)#):void>
9.2.4. Passing data
- push(channel: Channel?; data: auto?): auto
pushes value to the channel (at the end)
- Arguments:
channel : Channel?
data : auto?
- push_batch(channel: Channel?; data: array<auto?>): auto
pushes values to the channel (at the end)
- Arguments:
channel : Channel?
data : array<auto?>
- push_batch_clone(channel: Channel?; data: array<auto(TT)>): auto
clones data and pushes values to the channel (at the end)
- Arguments:
channel : Channel?
data : array<auto(TT)>
- push_clone(channel: Channel?; data: auto(TT)): auto
clones data and pushes value to the channel (at the end)
- Arguments:
channel : Channel?
data : auto(TT)
9.2.5. Receiving data
gather (ch: Channel?; blk: block<(arg:auto(TT)#):void>) : auto
gather_and_forward (ch: Channel?; toCh: Channel?; blk: block<(arg:auto(TT)#):void>) : auto
peek (ch: Channel?; blk: block<(arg:auto(TT)#):void>) : auto
pop_and_clone_one (channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto
pop_one (channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto
pop_with_timeout (channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>) : bool
pop_with_timeout_clone (channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>) : bool
try_pop (channel: Channel?; blk: block<(res:auto(TT)#):void>) : bool
try_pop_clone (channel: Channel?; blk: block<(res:auto(TT)#):void>) : bool
- gather(ch: Channel?; blk: block<(arg:auto(TT)#):void>): auto
reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed
- Arguments:
ch : Channel?
blk : block<(arg:auto(TT)#):void>
- gather_and_forward(ch: Channel?; toCh: Channel?; blk: block<(arg:auto(TT)#):void>): auto
reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed
- gather_ex(ch: Channel?; blk: block<(arg:auto(TT)#;info:TypeInfo const?;var ctx:Context):void>): auto
reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed
- peek(ch: Channel?; blk: block<(arg:auto(TT)#):void>): auto
reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is not consumed
- Arguments:
ch : Channel?
blk : block<(arg:auto(TT)#):void>
- pop_and_clone_one(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto
reads one command from channel
- Arguments:
channel : Channel?
blk : block<(res:auto(TT)#):void>
- pop_one(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto
Warning
This function is deprecated.
reads one command from channel
- Arguments:
channel : Channel?
blk : block<(res:auto(TT)#):void>
- pop_with_timeout(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>): bool
Pop from channel with timeout in milliseconds. Returns true if an item was available within the timeout, false if timed out or channel exhausted.
- Arguments:
channel : Channel?
timeout_ms : int
blk : block<(res:auto(TT)#):void>
- pop_with_timeout_clone(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>): bool
Pop from channel with timeout and clone. Returns true if an item was available within the timeout. The popped value is cloned to the current context before invoking the block.
- Arguments:
channel : Channel?
timeout_ms : int
blk : block<(res:auto(TT)#):void>
- try_pop(channel: Channel?; blk: block<(res:auto(TT)#):void>): bool
Non-blocking pop from channel. Returns true if an item was available, false if channel was empty. Does not wait for data — returns immediately.
- Arguments:
channel : Channel?
blk : block<(res:auto(TT)#):void>
- try_pop_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>): bool
Non-blocking pop with clone from channel. Returns true if an item was available, false if channel was empty. The popped value is cloned to the current context before invoking the block.
- Arguments:
channel : Channel?
blk : block<(res:auto(TT)#):void>
9.2.6. Synchronization
- done(status: JobStatus?&)
Mark one unit of work as done in a wait group.
Alias for notify_and_release. Decrements the notification counter and releases the reference.
Sets the pointer to null, preventing double-release.
- Arguments:
status : JobStatus?&
9.2.6.1. with_wait_group
- with_wait_group(blk: block<(var status:JobStatus?):void>)
Creates a wait group starting at count 0 with auto-join.
Use append to dynamically add expected notifications before dispatching work.
The block returns only after all notifications have been received.
- Arguments:
blk : block<(status: JobStatus?):void>
- with_wait_group(count: int; blk: block<(var status:JobStatus?):void>)
9.2.7. Parallel execution
- _parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)
Partitions [range_begin..range_end) into num_jobs chunks and invokes blk once per chunk
on the calling thread with (chunk_begin, chunk_end, wg). The block is expected to dispatch work
via new_job and call wg |> notify_and_release when each job finishes.
parallel_for blocks until all notifications are received (via internal with_wait_group).
Requires with_job_que context.
- Arguments:
range_begin : int
range_end : int
num_jobs : int
blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>
- _parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>): auto
Runtime implementation for parallel_for_each.
Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk
with (chunk_begin_idx, chunk_end_idx, wg) on the calling thread.
The block should dispatch new_job calls that process arr[i] for i in [chunk_begin_idx..chunk_end_idx),
then call wg |> notify_and_release.
Blocks until all jobs finish. Requires with_job_que context.
- Arguments:
arr : array<auto(TT)>
num_jobs : int
blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>
- _parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>): auto
Runtime implementation for parallel_map.
Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk
on the calling thread with (chunk_begin_idx, chunk_end_idx, results_channel, wg).
Blocks until all jobs finish. Results are available in results_channel after this call returns.
Requires with_job_que context.
- Arguments:
- parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)
this one is stub for _parallel_for
- Arguments:
range_begin : int
range_end : int
num_jobs : int
blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>
- parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>): auto
Convenience wrapper around parallel_for for arrays.
Partitions array indices [0..length(arr)) into num_jobs chunks.
The block body is automatically wrapped in new_job.
Blocks until all jobs finish. Requires with_job_que context.
- Arguments:
arr : array<auto(TT)>
num_jobs : int
blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>
- parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>): auto
Partitions array indices [0..length(arr)) into num_jobs chunks.
The block body is automatically wrapped in new_job.
Blocks until all jobs finish. Results are available in results_channel after this call returns.
Requires with_job_que context.
9.2.8. LockBox operations
- clear(box: LockBox?; type_: auto(TT)): auto
clear value from the lock box
- Arguments:
box : LockBox?
type_ : auto(TT)
- fill(box: LockBox?; data: auto(TT)): auto
stores pointer to data in the lock box and marks it as full (isReady becomes false). data must outlive the grab — caller owns the lifetime.
- Arguments:
box : LockBox?
data : auto(TT)
- get(box: LockBox?; blk: block<(res:auto(TT)#):void>): auto
reads value from the lock box and invokes the block on it
- Arguments:
box : LockBox?
blk : block<(res:auto(TT)#):void>
- grab(box: LockBox?; blk: block<(var res:auto(TT)):void>): bool
grabs data from lock box, marks it as empty (isReady becomes true), invokes block. does not free the data — caller owns the lifetime.
- Arguments:
box : LockBox?
blk : block<(res:auto(TT)):void>
9.2.8.1. set
- set(box: LockBox?; data: auto?): auto
sets value to the lock box
- Arguments:
box : LockBox?
data : auto?
- set(box: LockBox?; data: auto(TT)): auto
- update(box: LockBox?; blk: block<(var res:auto(TT)#):void>): auto
update value in the lock box and invokes the block on it
- Arguments:
box : LockBox?
blk : block<(res:auto(TT)#):void>
9.2.9. Internal capture details
- capture_jobque_channel(ch: Channel?): Channel?
this function is used to capture a channel that is used by the jobque.
- Arguments:
ch : Channel?
- capture_jobque_job_status(js: JobStatus?): JobStatus?
this function is used to capture a job status that is used by the jobque.
- Arguments:
js : JobStatus?
- capture_jobque_lock_box(js: LockBox?): LockBox?
this function is used to capture a lock box that is used by the jobque.
- Arguments:
js : LockBox?
- release_capture_jobque_channel(ch: Channel?)
this function is used to release a channel that is used by the jobque.
- Arguments:
ch : Channel?
- release_capture_jobque_job_status(js: JobStatus?)
this function is used to release a job status that is used by the jobque.
- Arguments:
js : JobStatus?
- release_capture_jobque_lock_box(js: LockBox?)
this function is used to release a lock box that is used by the jobque.
- Arguments:
js : LockBox?