9.2. Boost package for jobs and threads
The JOBQUE_BOOST module provides high-level job queue abstractions built on
the low-level jobque primitives. It includes with_job, with_job_status,
and channel-based patterns for simplified concurrent programming.
See also Jobs and threads for the low-level job queue primitives. See Job Queue (jobque) for a hands-on tutorial.
All functions and symbols are in “jobque_boost” module, use require to get access to it.
require daslib/jobque_boost
Example:
require daslib/jobque_boost
[export]
def main() {
with_job_status(1) $(status) {
new_thread() @() {
print("from thread\n")
status |> notify_and_release()
}
status |> join()
print("thread done\n")
}
}
// output:
// from thread
// thread done
9.2.1. Function annotations
- jobque_boost::NewJobMacro
this macro handles new_job and new_thread calls. the call is replaced with new_job_invoke and new_thread_invoke accordingly. a cloning infrastructure is generated for the lambda, which is invoked in the new context.
- jobque_boost::ParallelForJobMacro
Base macro for parallel_for, parallel_for_each, and parallel_map.
Wraps the block body in new_job and redirects to the runtime implementation.
- jobque_boost::ParallelForEachJobMacro
This macro handles parallel_for_each.
Wraps block body in new_job and redirects to _parallel_for_each.
- jobque_boost::ParallelMapJobMacro
This macro handles parallel_map.
Wraps block body in new_job and redirects to _parallel_map.
9.2.2. Invocations
- jobque_boost::new_job(l: lambda<():void>)
- Create a new job.
new context is cloned from the current context.
lambda is cloned to the new context.
new job is added to the job queue.
once new job is invoked, lambda is invoked on the new context on the job thread.
- Arguments
l : lambda<void>
- jobque_boost::new_thread(l: lambda<():void>)
- Create a new thread
new context is cloned from the current context.
lambda is cloned to the new context.
new thread is created.
lambda is invoked on the new context on the new thread.
- Arguments
l : lambda<void>
9.2.3. Iteration
- jobque_boost::each(channel: Channel?; tinfo: auto(TT)) : auto()
Warning
This function is deprecated.
this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.
- Arguments
channel : Channel?
tinfo : auto(TT)
- jobque_boost::each_clone(channel: Channel?; tinfo: auto(TT)) : auto()
this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.
- Arguments
channel : Channel?
tinfo : auto(TT)
- jobque_boost::for_each(channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto()
Warning
This function is deprecated.
reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.
- Arguments
channel : Channel?
blk : block<(res:auto(TT)#):void>
- jobque_boost::for_each_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto()
reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.
- Arguments
channel : Channel?
blk : block<(res:auto(TT)#):void>
9.2.4. Passing data
- jobque_boost::push(channel: Channel?; data: auto?) : auto()
pushes value to the channel (at the end)
- Arguments
channel : Channel?
data : auto?
- jobque_boost::push_batch(channel: Channel?; data: array<auto?>) : auto()
pushes values to the channel (at the end)
- Arguments
channel : Channel?
data : array<auto?>
- jobque_boost::push_batch_clone(channel: Channel?; data: array<auto(TT)>) : auto()
clones data and pushes values to the channel (at the end)
- Arguments
channel : Channel?
data : array<auto(TT)>
- jobque_boost::push_clone(channel: Channel?; data: auto(TT)) : auto()
clones data and pushes value to the channel (at the end)
- Arguments
channel : Channel?
data : auto(TT)
9.2.5. Receiving data
gather (ch: Channel?; blk: block<(arg:auto(TT)#):void>) : auto
gather_and_forward (ch: Channel?; toCh: Channel?; blk: block<(arg:auto(TT)#):void>) : auto
peek (ch: Channel?; blk: block<(arg:auto(TT)#):void>) : auto
pop_and_clone_one (channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto
pop_one (channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto
pop_with_timeout (channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>) : bool
pop_with_timeout_clone (channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>) : bool
try_pop (channel: Channel?; blk: block<(res:auto(TT)#):void>) : bool
try_pop_clone (channel: Channel?; blk: block<(res:auto(TT)#):void>) : bool
- jobque_boost::gather(ch: Channel?; blk: block<(arg:auto(TT)#):void>) : auto()
reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed
- Arguments
ch : Channel?
blk : block<(arg:auto(TT)#):void>
- jobque_boost::gather_and_forward(ch: Channel?; toCh: Channel?; blk: block<(arg:auto(TT)#):void>) : auto()
reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed
- jobque_boost::gather_ex(ch: Channel?; blk: block<(arg:auto(TT)#;info:TypeInfo const?;var ctx:Context):void>) : auto()
reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed
- jobque_boost::peek(ch: Channel?; blk: block<(arg:auto(TT)#):void>) : auto()
reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is not consumed
- Arguments
ch : Channel?
blk : block<(arg:auto(TT)#):void>
- jobque_boost::pop_and_clone_one(channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto()
reads one command from channel
- Arguments
channel : Channel?
blk : block<(res:auto(TT)#):void>
- jobque_boost::pop_one(channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto()
Warning
This function is deprecated.
reads one command from channel
- Arguments
channel : Channel?
blk : block<(res:auto(TT)#):void>
- jobque_boost::pop_with_timeout(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>) : bool()
Pop from channel with timeout in milliseconds. Returns true if an item was available within the timeout, false if timed out or channel exhausted.
- Arguments
channel : Channel?
timeout_ms : int
blk : block<(res:auto(TT)#):void>
- jobque_boost::pop_with_timeout_clone(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>) : bool()
Pop from channel with timeout and clone. Returns true if an item was available within the timeout. The popped value is cloned to the current context before invoking the block.
- Arguments
channel : Channel?
timeout_ms : int
blk : block<(res:auto(TT)#):void>
- jobque_boost::try_pop(channel: Channel?; blk: block<(res:auto(TT)#):void>) : bool()
Non-blocking pop from channel. Returns true if an item was available, false if channel was empty. Does not wait for data — returns immediately.
- Arguments
channel : Channel?
blk : block<(res:auto(TT)#):void>
- jobque_boost::try_pop_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>) : bool()
Non-blocking pop with clone from channel. Returns true if an item was available, false if channel was empty. The popped value is cloned to the current context before invoking the block.
- Arguments
channel : Channel?
blk : block<(res:auto(TT)#):void>
9.2.6. Synchronization
- jobque_boost::done(status: JobStatus?&)
Mark one unit of work as done in a wait group.
Alias for notify_and_release. Decrements the notification counter and releases the reference.
Sets the pointer to null, preventing double-release.
- Arguments
status : JobStatus?&
9.2.6.1. with_wait_group
- jobque_boost::with_wait_group(blk: block<(var status:JobStatus?):void>)
Creates a wait group starting at count 0 with auto-join.
Use append to dynamically add expected notifications before dispatching work.
The block returns only after all notifications have been received.
- Arguments
blk : block<(status: JobStatus?):void>
- jobque_boost::with_wait_group(count: int; blk: block<(var status:JobStatus?):void>)
9.2.7. Parallel execution
- jobque_boost::_parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)
Partitions [range_begin..range_end) into num_jobs chunks and invokes blk once per chunk
on the calling thread with (chunk_begin, chunk_end, wg). The block is expected to dispatch work
via new_job and call wg |> notify_and_release when each job finishes.
parallel_for blocks until all notifications are received (via internal with_wait_group).
Requires with_job_que context.
- Arguments
range_begin : int
range_end : int
num_jobs : int
blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>
- jobque_boost::_parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>) : auto()
Runtime implementation for parallel_for_each.
Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk
with (chunk_begin_idx, chunk_end_idx, wg) on the calling thread.
The block should dispatch new_job calls that process arr[i] for i in [chunk_begin_idx..chunk_end_idx),
then call wg |> notify_and_release.
Blocks until all jobs finish. Requires with_job_que context.
- Arguments
arr : array<auto(TT)>
num_jobs : int
blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>
- jobque_boost::_parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>) : auto()
Runtime implementation for parallel_map.
Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk
on the calling thread with (chunk_begin_idx, chunk_end_idx, results_channel, wg).
Blocks until all jobs finish. Results are available in results_channel after this call returns.
Requires with_job_que context.
- Arguments
- jobque_boost::parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)
this one is stub for _parallel_for
- Arguments
range_begin : int
range_end : int
num_jobs : int
blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>
- jobque_boost::parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>) : auto()
Convenience wrapper around parallel_for for arrays.
Partitions array indices [0..length(arr)) into num_jobs chunks.
The block body is automatically wrapped in new_job.
Blocks until all jobs finish. Requires with_job_que context.
- Arguments
arr : array<auto(TT)>
num_jobs : int
blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>
- jobque_boost::parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>) : auto()
Partitions array indices [0..length(arr)) into num_jobs chunks.
The block body is automatically wrapped in new_job.
Blocks until all jobs finish. Results are available in results_channel after this call returns.
Requires with_job_que context.
9.2.8. LockBox operations
- jobque_boost::clear(box: LockBox?; type_: auto(TT)) : auto()
clear value from the lock box
- Arguments
box : LockBox?
type_ : auto(TT)
- jobque_boost::get(box: LockBox?; blk: block<(res:auto(TT)#):void>) : auto()
reads value from the lock box and invokes the block on it
- Arguments
box : LockBox?
blk : block<(res:auto(TT)#):void>
9.2.8.1. set
- jobque_boost::set(box: LockBox?; data: auto?) : auto()
sets value to the lock box
- Arguments
box : LockBox?
data : auto?
- jobque_boost::set(box: LockBox?; data: auto(TT)) : auto()
- jobque_boost::update(box: LockBox?; blk: block<(var res:auto(TT)#):void>) : auto()
update value in the lock box and invokes the block on it
- Arguments
box : LockBox?
blk : block<(res:auto(TT)#):void>
9.2.9. Internal capture details
- jobque_boost::capture_jobque_channel(ch: Channel?) : Channel?()
this function is used to capture a channel that is used by the jobque.
- Arguments
ch : Channel?
- jobque_boost::capture_jobque_job_status(js: JobStatus?) : JobStatus?()
this function is used to capture a job status that is used by the jobque.
- Arguments
js : JobStatus?
- jobque_boost::capture_jobque_lock_box(js: LockBox?) : LockBox?()
this function is used to capture a lock box that is used by the jobque.
- Arguments
js : LockBox?
- jobque_boost::release_capture_jobque_channel(ch: Channel?)
this function is used to release a channel that is used by the jobque.
- Arguments
ch : Channel?
- jobque_boost::release_capture_jobque_job_status(js: JobStatus?)
this function is used to release a job status that is used by the jobque.
- Arguments
js : JobStatus?
- jobque_boost::release_capture_jobque_lock_box(js: LockBox?)
this function is used to release a lock box that is used by the jobque.
- Arguments
js : LockBox?