9.2. Boost package for jobs and threads

The JOBQUE_BOOST module provides high-level job queue abstractions built on the low-level jobque primitives. It includes with_job, with_job_status, and channel-based patterns for simplified concurrent programming.

See also Jobs and threads for the low-level job queue primitives. See Job Queue (jobque) for a hands-on tutorial.

All functions and symbols are in “jobque_boost” module, use require to get access to it.

require daslib/jobque_boost

Example:

require daslib/jobque_boost

    [export]
    def main() {
        with_job_status(1) $(status) {
            new_thread() @() {
                print("from thread\n")
                status |> notify_and_release()
            }
            status |> join()
            print("thread done\n")
        }
    }
    // output:
    // from thread
    // thread done

9.2.1. Function annotations

jobque_boost::NewJobMacro

this macro handles new_job and new_thread calls. the call is replaced with new_job_invoke and new_thread_invoke accordingly. a cloning infrastructure is generated for the lambda, which is invoked in the new context.

jobque_boost::ParallelForJobMacro

Base macro for parallel_for, parallel_for_each, and parallel_map. Wraps the block body in new_job and redirects to the runtime implementation.

jobque_boost::ParallelForEachJobMacro

This macro handles parallel_for_each. Wraps block body in new_job and redirects to _parallel_for_each.

jobque_boost::ParallelMapJobMacro

This macro handles parallel_map. Wraps block body in new_job and redirects to _parallel_map.

9.2.2. Invocations

jobque_boost::new_job(l: lambda<():void>)
Create a new job.
  • new context is cloned from the current context.

  • lambda is cloned to the new context.

  • new job is added to the job queue.

  • once new job is invoked, lambda is invoked on the new context on the job thread.

Arguments
  • l : lambda<void>

jobque_boost::new_thread(l: lambda<():void>)
Create a new thread
  • new context is cloned from the current context.

  • lambda is cloned to the new context.

  • new thread is created.

  • lambda is invoked on the new context on the new thread.

Arguments
  • l : lambda<void>

9.2.3. Iteration

jobque_boost::each(channel: Channel?; tinfo: auto(TT)) : auto()

Warning

This function is deprecated.

this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.

Arguments
  • channel : Channel?

  • tinfo : auto(TT)

jobque_boost::each_clone(channel: Channel?; tinfo: auto(TT)) : auto()

this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.

Arguments
  • channel : Channel?

  • tinfo : auto(TT)

jobque_boost::for_each(channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto()

Warning

This function is deprecated.

reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.

Arguments
jobque_boost::for_each_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto()

reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.

Arguments

9.2.4. Passing data

jobque_boost::push(channel: Channel?; data: auto?) : auto()

pushes value to the channel (at the end)

Arguments
jobque_boost::push_batch(channel: Channel?; data: array<auto?>) : auto()

pushes values to the channel (at the end)

Arguments
  • channel : Channel?

  • data : array<auto?>

jobque_boost::push_batch_clone(channel: Channel?; data: array<auto(TT)>) : auto()

clones data and pushes values to the channel (at the end)

Arguments
  • channel : Channel?

  • data : array<auto(TT)>

jobque_boost::push_clone(channel: Channel?; data: auto(TT)) : auto()

clones data and pushes value to the channel (at the end)

Arguments
  • channel : Channel?

  • data : auto(TT)

9.2.5. Receiving data

jobque_boost::gather(ch: Channel?; blk: block<(arg:auto(TT)#):void>) : auto()

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed

Arguments
  • ch : Channel?

  • blk : block<(arg:auto(TT)#):void>

jobque_boost::gather_and_forward(ch: Channel?; toCh: Channel?; blk: block<(arg:auto(TT)#):void>) : auto()

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed

Arguments
jobque_boost::gather_ex(ch: Channel?; blk: block<(arg:auto(TT)#;info:TypeInfo const?;var ctx:Context):void>) : auto()

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed

Arguments
jobque_boost::peek(ch: Channel?; blk: block<(arg:auto(TT)#):void>) : auto()

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is not consumed

Arguments
  • ch : Channel?

  • blk : block<(arg:auto(TT)#):void>

jobque_boost::pop_and_clone_one(channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto()

reads one command from channel

Arguments
jobque_boost::pop_one(channel: Channel?; blk: block<(res:auto(TT)#):void>) : auto()

Warning

This function is deprecated.

reads one command from channel

Arguments
jobque_boost::pop_with_timeout(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>) : bool()

Pop from channel with timeout in milliseconds. Returns true if an item was available within the timeout, false if timed out or channel exhausted.

Arguments
jobque_boost::pop_with_timeout_clone(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>) : bool()

Pop from channel with timeout and clone. Returns true if an item was available within the timeout. The popped value is cloned to the current context before invoking the block.

Arguments
jobque_boost::try_pop(channel: Channel?; blk: block<(res:auto(TT)#):void>) : bool()

Non-blocking pop from channel. Returns true if an item was available, false if channel was empty. Does not wait for data — returns immediately.

Arguments
jobque_boost::try_pop_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>) : bool()

Non-blocking pop with clone from channel. Returns true if an item was available, false if channel was empty. The popped value is cloned to the current context before invoking the block.

Arguments

9.2.6. Synchronization

jobque_boost::done(status: JobStatus?&)

Mark one unit of work as done in a wait group. Alias for notify_and_release. Decrements the notification counter and releases the reference. Sets the pointer to null, preventing double-release.

Arguments

9.2.6.1. with_wait_group

jobque_boost::with_wait_group(blk: block<(var status:JobStatus?):void>)

Creates a wait group starting at count 0 with auto-join. Use append to dynamically add expected notifications before dispatching work. The block returns only after all notifications have been received.

Arguments
jobque_boost::with_wait_group(count: int; blk: block<(var status:JobStatus?):void>)

9.2.7. Parallel execution

jobque_boost::_parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)

Partitions [range_begin..range_end) into num_jobs chunks and invokes blk once per chunk on the calling thread with (chunk_begin, chunk_end, wg). The block is expected to dispatch work via new_job and call wg |> notify_and_release when each job finishes. parallel_for blocks until all notifications are received (via internal with_wait_group). Requires with_job_que context.

Arguments
  • range_begin : int

  • range_end : int

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

jobque_boost::_parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>) : auto()

Runtime implementation for parallel_for_each. Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk with (chunk_begin_idx, chunk_end_idx, wg) on the calling thread. The block should dispatch new_job calls that process arr[i] for i in [chunk_begin_idx..chunk_end_idx), then call wg |> notify_and_release. Blocks until all jobs finish. Requires with_job_que context.

Arguments
  • arr : array<auto(TT)>

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

jobque_boost::_parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>) : auto()

Runtime implementation for parallel_map. Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk on the calling thread with (chunk_begin_idx, chunk_end_idx, results_channel, wg). Blocks until all jobs finish. Results are available in results_channel after this call returns. Requires with_job_que context.

Arguments
  • arr : array<auto(TT)>

  • num_jobs : int

  • results_channel : Channel?

  • blk : block<(job_begin:int;job_end:int;ch: Channel?;wg: JobStatus?):void>

jobque_boost::parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)

this one is stub for _parallel_for

Arguments
  • range_begin : int

  • range_end : int

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

jobque_boost::parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>) : auto()

Convenience wrapper around parallel_for for arrays. Partitions array indices [0..length(arr)) into num_jobs chunks. The block body is automatically wrapped in new_job. Blocks until all jobs finish. Requires with_job_que context.

Arguments
  • arr : array<auto(TT)>

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

jobque_boost::parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>) : auto()

Partitions array indices [0..length(arr)) into num_jobs chunks. The block body is automatically wrapped in new_job. Blocks until all jobs finish. Results are available in results_channel after this call returns. Requires with_job_que context.

Arguments
  • arr : array<auto(TT)>

  • num_jobs : int

  • results_channel : Channel?

  • blk : block<(job_begin:int;job_end:int;ch: Channel?;wg: JobStatus?):void>

9.2.8. LockBox operations

jobque_boost::clear(box: LockBox?; type_: auto(TT)) : auto()

clear value from the lock box

Arguments
jobque_boost::get(box: LockBox?; blk: block<(res:auto(TT)#):void>) : auto()

reads value from the lock box and invokes the block on it

Arguments

9.2.8.1. set

jobque_boost::set(box: LockBox?; data: auto?) : auto()

sets value to the lock box

Arguments
jobque_boost::set(box: LockBox?; data: auto(TT)) : auto()

jobque_boost::update(box: LockBox?; blk: block<(var res:auto(TT)#):void>) : auto()

update value in the lock box and invokes the block on it

Arguments

9.2.9. Internal capture details

jobque_boost::capture_jobque_channel(ch: Channel?) : Channel?()

this function is used to capture a channel that is used by the jobque.

Arguments
jobque_boost::capture_jobque_job_status(js: JobStatus?) : JobStatus?()

this function is used to capture a job status that is used by the jobque.

Arguments
jobque_boost::capture_jobque_lock_box(js: LockBox?) : LockBox?()

this function is used to capture a lock box that is used by the jobque.

Arguments
jobque_boost::release_capture_jobque_channel(ch: Channel?)

this function is used to release a channel that is used by the jobque.

Arguments
jobque_boost::release_capture_jobque_job_status(js: JobStatus?)

this function is used to release a job status that is used by the jobque.

Arguments
jobque_boost::release_capture_jobque_lock_box(js: LockBox?)

this function is used to release a lock box that is used by the jobque.

Arguments