9.2. Boost package for jobs and threads

The JOBQUE_BOOST module provides high-level job queue abstractions built on the low-level jobque primitives. It includes with_job, with_job_status, and channel-based patterns for simplified concurrent programming.

See also Jobs and threads for the low-level job queue primitives. See Job Queue (jobque) for a hands-on tutorial.

All functions and symbols are in “jobque_boost” module, use require to get access to it.

require daslib/jobque_boost

Example:

require daslib/jobque_boost

    [export]
    def main() {
        with_job_status(1) $(status) {
            new_thread() @() {
                print("from thread\n")
                status |> notify_and_release()
            }
            status |> join()
            print("thread done\n")
        }
    }
    // output:
    // from thread
    // thread done

9.2.1. Function annotations

ParallelForJobMacro

Base macro for parallel_for, parallel_for_each, and parallel_map. Wraps the block body in new_job and redirects to the runtime implementation.

NewJobMacro

this macro handles new_job and new_thread calls. the call is replaced with new_job_invoke and new_thread_invoke accordingly. a cloning infrastructure is generated for the lambda, which is invoked in the new context.

ParallelForEachJobMacro

This macro handles parallel_for_each. Wraps block body in new_job and redirects to _parallel_for_each.

ParallelMapJobMacro

This macro handles parallel_map. Wraps block body in new_job and redirects to _parallel_map.

9.2.2. Invocations

new_job(l: lambda<():void>)
Create a new job.
  • new context is cloned from the current context.

  • lambda is cloned to the new context.

  • new job is added to the job queue.

  • once new job is invoked, lambda is invoked on the new context on the job thread.

Arguments:
  • l : lambda<void>

new_thread(l: lambda<():void>)
Create a new thread
  • new context is cloned from the current context.

  • lambda is cloned to the new context.

  • new thread is created.

  • lambda is invoked on the new context on the new thread.

Arguments:
  • l : lambda<void>

9.2.3. Iteration

each(channel: Channel?; tinfo: auto(TT)): auto

Warning

This function is deprecated.

this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.

Arguments:
  • channel : Channel?

  • tinfo : auto(TT)

each_clone(channel: Channel?; tinfo: auto(TT)): auto

this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.

Arguments:
  • channel : Channel?

  • tinfo : auto(TT)

for_each(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto

Warning

This function is deprecated.

reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.

Arguments:
for_each_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto

reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.

Arguments:

9.2.4. Passing data

push(channel: Channel?; data: auto?): auto

pushes value to the channel (at the end)

Arguments:
push_batch(channel: Channel?; data: array<auto?>): auto

pushes values to the channel (at the end)

Arguments:
  • channel : Channel?

  • data : array<auto?>

push_batch_clone(channel: Channel?; data: array<auto(TT)>): auto

clones data and pushes values to the channel (at the end)

Arguments:
  • channel : Channel?

  • data : array<auto(TT)>

push_clone(channel: Channel?; data: auto(TT)): auto

clones data and pushes value to the channel (at the end)

Arguments:
  • channel : Channel?

  • data : auto(TT)

9.2.5. Receiving data

gather(ch: Channel?; blk: block<(arg:auto(TT)#):void>): auto

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed

Arguments:
  • ch : Channel?

  • blk : block<(arg:auto(TT)#):void>

gather_and_forward(ch: Channel?; toCh: Channel?; blk: block<(arg:auto(TT)#):void>): auto

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed

Arguments:
gather_ex(ch: Channel?; blk: block<(arg:auto(TT)#;info:TypeInfo const?;var ctx:Context):void>): auto

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed

Arguments:
peek(ch: Channel?; blk: block<(arg:auto(TT)#):void>): auto

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is not consumed

Arguments:
  • ch : Channel?

  • blk : block<(arg:auto(TT)#):void>

pop_and_clone_one(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto

reads one command from channel

Arguments:
pop_one(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto

Warning

This function is deprecated.

reads one command from channel

Arguments:
pop_with_timeout(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>): bool

Pop from channel with timeout in milliseconds. Returns true if an item was available within the timeout, false if timed out or channel exhausted.

Arguments:
pop_with_timeout_clone(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>): bool

Pop from channel with timeout and clone. Returns true if an item was available within the timeout. The popped value is cloned to the current context before invoking the block.

Arguments:
try_pop(channel: Channel?; blk: block<(res:auto(TT)#):void>): bool

Non-blocking pop from channel. Returns true if an item was available, false if channel was empty. Does not wait for data — returns immediately.

Arguments:
try_pop_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>): bool

Non-blocking pop with clone from channel. Returns true if an item was available, false if channel was empty. The popped value is cloned to the current context before invoking the block.

Arguments:

9.2.6. Synchronization

done(status: JobStatus?&)

Mark one unit of work as done in a wait group. Alias for notify_and_release. Decrements the notification counter and releases the reference. Sets the pointer to null, preventing double-release.

Arguments:

9.2.6.1. with_wait_group

with_wait_group(blk: block<(var status:JobStatus?):void>)

Creates a wait group starting at count 0 with auto-join. Use append to dynamically add expected notifications before dispatching work. The block returns only after all notifications have been received.

Arguments:
with_wait_group(count: int; blk: block<(var status:JobStatus?):void>)

9.2.7. Parallel execution

_parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)

Partitions [range_begin..range_end) into num_jobs chunks and invokes blk once per chunk on the calling thread with (chunk_begin, chunk_end, wg). The block is expected to dispatch work via new_job and call wg |> notify_and_release when each job finishes. parallel_for blocks until all notifications are received (via internal with_wait_group). Requires with_job_que context.

Arguments:
  • range_begin : int

  • range_end : int

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

_parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>): auto

Runtime implementation for parallel_for_each. Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk with (chunk_begin_idx, chunk_end_idx, wg) on the calling thread. The block should dispatch new_job calls that process arr[i] for i in [chunk_begin_idx..chunk_end_idx), then call wg |> notify_and_release. Blocks until all jobs finish. Requires with_job_que context.

Arguments:
  • arr : array<auto(TT)>

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

_parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>): auto

Runtime implementation for parallel_map. Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk on the calling thread with (chunk_begin_idx, chunk_end_idx, results_channel, wg). Blocks until all jobs finish. Results are available in results_channel after this call returns. Requires with_job_que context.

Arguments:
  • arr : array<auto(TT)>

  • num_jobs : int

  • results_channel : Channel?

  • blk : block<(job_begin:int;job_end:int;ch: Channel?;wg: JobStatus?):void>

parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)

this one is stub for _parallel_for

Arguments:
  • range_begin : int

  • range_end : int

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>): auto

Convenience wrapper around parallel_for for arrays. Partitions array indices [0..length(arr)) into num_jobs chunks. The block body is automatically wrapped in new_job. Blocks until all jobs finish. Requires with_job_que context.

Arguments:
  • arr : array<auto(TT)>

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>): auto

Partitions array indices [0..length(arr)) into num_jobs chunks. The block body is automatically wrapped in new_job. Blocks until all jobs finish. Results are available in results_channel after this call returns. Requires with_job_que context.

Arguments:
  • arr : array<auto(TT)>

  • num_jobs : int

  • results_channel : Channel?

  • blk : block<(job_begin:int;job_end:int;ch: Channel?;wg: JobStatus?):void>

9.2.8. LockBox operations

clear(box: LockBox?; type_: auto(TT)): auto

clear value from the lock box

Arguments:
fill(box: LockBox?; data: auto(TT)): auto

stores pointer to data in the lock box and marks it as full (isReady becomes false). data must outlive the grab — caller owns the lifetime.

Arguments:
get(box: LockBox?; blk: block<(res:auto(TT)#):void>): auto

reads value from the lock box and invokes the block on it

Arguments:
grab(box: LockBox?; blk: block<(var res:auto(TT)):void>): bool

grabs data from lock box, marks it as empty (isReady becomes true), invokes block. does not free the data — caller owns the lifetime.

Arguments:

9.2.8.1. set

set(box: LockBox?; data: auto?): auto

sets value to the lock box

Arguments:
set(box: LockBox?; data: auto(TT)): auto

update(box: LockBox?; blk: block<(var res:auto(TT)#):void>): auto

update value in the lock box and invokes the block on it

Arguments:

9.2.9. Internal capture details

capture_jobque_channel(ch: Channel?): Channel?

this function is used to capture a channel that is used by the jobque.

Arguments:
capture_jobque_job_status(js: JobStatus?): JobStatus?

this function is used to capture a job status that is used by the jobque.

Arguments:
capture_jobque_lock_box(js: LockBox?): LockBox?

this function is used to capture a lock box that is used by the jobque.

Arguments:
release_capture_jobque_channel(ch: Channel?)

this function is used to release a channel that is used by the jobque.

Arguments:
release_capture_jobque_job_status(js: JobStatus?)

this function is used to release a job status that is used by the jobque.

Arguments:
release_capture_jobque_lock_box(js: LockBox?)

this function is used to release a lock box that is used by the jobque.

Arguments: