9.2. Boost package for jobs and threads

The JOBQUE_BOOST module provides high-level job queue abstractions built on the low-level jobque primitives. It includes with_job, with_job_status, and channel-based patterns for simplified concurrent programming.

See also Jobs and threads for the low-level job queue primitives. See Job Queue (jobque) for a hands-on tutorial.

All functions and symbols are in “jobque_boost” module, use require to get access to it.

require daslib/jobque_boost

Example:

require daslib/jobque_boost

    [export]
    def main() {
        with_job_status(1) $(status) {
            new_thread() @() {
                print("from thread\n")
                status |> notify_and_release()
            }
            status |> join()
            print("thread done\n")
        }
    }
    // output:
    // from thread
    // thread done

9.2.1. Function annotations

ParallelForJobMacro

Base macro for parallel_for, parallel_for_each, and parallel_map. Wraps the block body in new_job and redirects to the runtime implementation.

ParallelMapJobMacro

This macro handles parallel_map. Wraps block body in new_job and redirects to _parallel_map.

NewJobMacro

this macro handles new_job and new_thread calls. the call is replaced with new_job_invoke and new_thread_invoke accordingly. a cloning infrastructure is generated for the lambda, which is invoked in the new context.

ParallelForEachJobMacro

This macro handles parallel_for_each. Wraps block body in new_job and redirects to _parallel_for_each.

9.2.2. Invocations

new_job(l: lambda<():void>)
Create a new job.
  • new context is cloned from the current context.

  • lambda is cloned to the new context.

  • new job is added to the job queue.

  • once new job is invoked, lambda is invoked on the new context on the job thread.

Arguments:
  • l : lambda<void>

new_thread(l: lambda<():void>)
Create a new thread
  • new context is cloned from the current context.

  • lambda is cloned to the new context.

  • new thread is created.

  • lambda is invoked on the new context on the new thread.

Arguments:
  • l : lambda<void>

9.2.3. Iteration

each(channel: Channel?; tinfo: auto(TT)): auto

Warning

This function is deprecated.

this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.

Arguments:
  • channel : Channel?

  • tinfo : auto(TT)

each_clone(channel: Channel?; tinfo: auto(TT)): auto

this iterator is used to iterate over the channel in order it was pushed. iterator stops once channel is depleted (internal entry counter is 0) iteration can happen on multiple threads or jobs at the same time.

Arguments:
  • channel : Channel?

  • tinfo : auto(TT)

for_each(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto

Warning

This function is deprecated.

reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.

Arguments:
for_each_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto

reads input from the channel (in order it was pushed) and invokes the block on each input. stops once channel is depleted (internal entry counter is 0) this can happen on multiple threads or jobs at the same time.

Arguments:

9.2.4. Passing data

9.2.4.1. push

push(stream: Stream?; data: array<uint8>#)

Overload for temporary/borrowed views (e.g. bytes popped from another stream).

Arguments:
  • stream : Stream?

  • data : array<uint8>#

push(stream: Stream?; data: array<uint8>)
push(channel: Channel?; data: auto?): auto

push_archive(stream: Stream?; value: auto(TT)): auto

Serialize value via daslib/archive and push the resulting bytes onto the stream. Use pop_archive or gather_archive on the consumer side to deserialize.

Arguments:
  • stream : Stream?

  • value : auto(TT)

9.2.4.2. push_batch

push_batch(channel: Channel?; data: array<auto?>): auto

pushes values to the channel (at the end)

Arguments:
  • channel : Channel?

  • data : array<auto?>

push_batch(stream: Stream?; data: array<array<uint8>>)

push_batch_clone(channel: Channel?; data: array<auto(TT)>): auto

clones data and pushes values to the channel (at the end)

Arguments:
  • channel : Channel?

  • data : array<auto(TT)>

push_clone(channel: Channel?; data: auto(TT)): auto

clones data and pushes value to the channel (at the end)

Arguments:
  • channel : Channel?

  • data : auto(TT)

9.2.5. Receiving data

9.2.5.1. gather

gather(ch: Channel?; blk: block<(arg:auto(TT)#):void>): auto

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed

Arguments:
  • ch : Channel?

  • blk : block<(arg:auto(TT)#):void>

gather(stream: Stream?; blk: block<(var bytes:array<uint8>#):void>)

gather_and_forward(ch: Channel?; toCh: Channel?; blk: block<(arg:auto(TT)#):void>): auto

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed

Arguments:
gather_archive(stream: Stream?; blk: block<(var value:auto(TT)&):void>): auto

Drain the stream, deserializing each blob via daslib/archive and invoking the block on each.

Arguments:
  • stream : Stream?

  • blk : block<(value:auto(TT)&):void>

gather_ex(ch: Channel?; blk: block<(arg:auto(TT)#;info:TypeInfo const?;var ctx:Context):void>): auto

reads input from the channel (in order it was pushed) and invokes the block on each input. afterwards input is consumed

Arguments:

9.2.5.2. peek

peek(stream: Stream?; blk: block<(var bytes:array<uint8>#):void>)

Iterate buffered blobs in push order WITHOUT consuming them.

Arguments:
  • stream : Stream?

  • blk : block<(bytes:array<uint8>#):void>

peek(ch: Channel?; blk: block<(arg:auto(TT)#):void>): auto

pop(stream: Stream?; blk: block<(var bytes:array<uint8>#):void>)

Blocking pop from the stream. Invokes the block with a borrowed view over the popped bytes. The view is only valid for the duration of the block.

Arguments:
  • stream : Stream?

  • blk : block<(bytes:array<uint8>#):void>

pop_and_clone_one(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto

reads one command from channel

Arguments:
pop_archive(stream: Stream?; blk: block<(var value:auto(TT)&):void>): bool

Blocking pop + deserialize via daslib/archive. Returns true if a value was consumed. The block receives an owned reference — safe to pass fields to functions expecting const.

Arguments:
  • stream : Stream?

  • blk : block<(value:auto(TT)&):void>

pop_one(channel: Channel?; blk: block<(res:auto(TT)#):void>): auto

Warning

This function is deprecated.

reads one command from channel

Arguments:

9.2.5.3. pop_with_timeout

pop_with_timeout(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>): bool

Pop from channel with timeout in milliseconds. Returns true if an item was available within the timeout, false if timed out or channel exhausted.

Arguments:
pop_with_timeout(stream: Stream?; timeout_ms: int; blk: block<(var bytes:array<uint8>#):void>): bool

pop_with_timeout_clone(channel: Channel?; timeout_ms: int; blk: block<(res:auto(TT)#):void>): bool

Pop from channel with timeout and clone. Returns true if an item was available within the timeout. The popped value is cloned to the current context before invoking the block.

Arguments:

9.2.5.4. try_pop

try_pop(channel: Channel?; blk: block<(res:auto(TT)#):void>): bool

Non-blocking pop from channel. Returns true if an item was available, false if channel was empty. Does not wait for data — returns immediately.

Arguments:
try_pop(stream: Stream?; blk: block<(var bytes:array<uint8>#):void>): bool

try_pop_archive(stream: Stream?; blk: block<(var value:auto(TT)&):void>): bool

Non-blocking pop + deserialize. Returns true if a value was consumed.

Arguments:
  • stream : Stream?

  • blk : block<(value:auto(TT)&):void>

try_pop_clone(channel: Channel?; blk: block<(res:auto(TT)#):void>): bool

Non-blocking pop with clone from channel. Returns true if an item was available, false if channel was empty. The popped value is cloned to the current context before invoking the block.

Arguments:

9.2.6. Synchronization

done(status: JobStatus?&)

Mark one unit of work as done in a wait group. Alias for notify_and_release. Decrements the notification counter and releases the reference. Sets the pointer to null, preventing double-release.

Arguments:

9.2.6.1. with_wait_group

with_wait_group(blk: block<(var status:JobStatus?):void>)

Creates a wait group starting at count 0 with auto-join. Use append to dynamically add expected notifications before dispatching work. The block returns only after all notifications have been received.

Arguments:
with_wait_group(count: int; blk: block<(var status:JobStatus?):void>)

9.2.7. Parallel execution

_parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)

Partitions [range_begin..range_end) into num_jobs chunks and invokes blk once per chunk on the calling thread with (chunk_begin, chunk_end, wg). The block is expected to dispatch work via new_job and call wg |> notify_and_release when each job finishes. parallel_for blocks until all notifications are received (via internal with_wait_group). Requires with_job_que context.

Arguments:
  • range_begin : int

  • range_end : int

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

_parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>): auto

Runtime implementation for parallel_for_each. Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk with (chunk_begin_idx, chunk_end_idx, wg) on the calling thread. The block should dispatch new_job calls that process arr[i] for i in [chunk_begin_idx..chunk_end_idx), then call wg |> notify_and_release. Blocks until all jobs finish. Requires with_job_que context.

Arguments:
  • arr : array<auto(TT)>

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

_parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>): auto

Runtime implementation for parallel_map. Partitions array indices [0..length(arr)) into num_jobs chunks and invokes blk on the calling thread with (chunk_begin_idx, chunk_end_idx, results_channel, wg). Blocks until all jobs finish. Results are available in results_channel after this call returns. Requires with_job_que context.

Arguments:
  • arr : array<auto(TT)>

  • num_jobs : int

  • results_channel : Channel?

  • blk : block<(job_begin:int;job_end:int;ch: Channel?;wg: JobStatus?):void>

parallel_for(range_begin: int; range_end: int; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>)

this one is stub for _parallel_for

Arguments:
  • range_begin : int

  • range_end : int

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

parallel_for_each(arr: array<auto(TT)>; num_jobs: int; blk: block<(job_begin:int;job_end:int;var wg:JobStatus?):void>): auto

Convenience wrapper around parallel_for for arrays. Partitions array indices [0..length(arr)) into num_jobs chunks. The block body is automatically wrapped in new_job. Blocks until all jobs finish. Requires with_job_que context.

Arguments:
  • arr : array<auto(TT)>

  • num_jobs : int

  • blk : block<(job_begin:int;job_end:int;wg: JobStatus?):void>

parallel_map(arr: array<auto(TT)>; num_jobs: int; results_channel: Channel?; blk: block<(job_begin:int;job_end:int;var ch:Channel?;var wg:JobStatus?):void>): auto

Partitions array indices [0..length(arr)) into num_jobs chunks. The block body is automatically wrapped in new_job. Blocks until all jobs finish. Results are available in results_channel after this call returns. Requires with_job_que context.

Arguments:
  • arr : array<auto(TT)>

  • num_jobs : int

  • results_channel : Channel?

  • blk : block<(job_begin:int;job_end:int;ch: Channel?;wg: JobStatus?):void>

9.2.8. LockBox operations

clear(box: LockBox?; type_: auto(TT)): auto

clear value from the lock box

Arguments:
fill(box: LockBox?; data: auto(TT)): auto

stores pointer to data in the lock box and marks it as full (isReady becomes false). data must outlive the grab — caller owns the lifetime.

Arguments:
get(box: LockBox?; blk: block<(res:auto(TT)#):void>): auto

reads value from the lock box and invokes the block on it

Arguments:
grab(box: LockBox?; blk: block<(var res:auto(TT)):void>): bool

grabs data from lock box, marks it as empty (isReady becomes true), invokes block. does not free the data — caller owns the lifetime.

Arguments:

9.2.8.1. set

set(box: LockBox?; data: auto?): auto

sets value to the lock box

Arguments:
set(box: LockBox?; data: auto(TT)): auto

update(box: LockBox?; blk: block<(var res:auto(TT)#):void>): auto

update value in the lock box and invokes the block on it

Arguments:

9.2.9. Internal capture details

capture_jobque_channel(ch: Channel?): Channel?

this function is used to capture a channel that is used by the jobque.

Arguments:
capture_jobque_job_status(js: JobStatus?): JobStatus?

this function is used to capture a job status that is used by the jobque.

Arguments:
capture_jobque_lock_box(js: LockBox?): LockBox?

this function is used to capture a lock box that is used by the jobque.

Arguments:
capture_jobque_stream(s: Stream?): Stream?

this function is used to capture a stream that is used by the jobque.

Arguments:
release_capture_jobque_channel(ch: Channel?)

this function is used to release a channel that is used by the jobque.

Arguments:
release_capture_jobque_job_status(js: JobStatus?)

this function is used to release a job status that is used by the jobque.

Arguments:
release_capture_jobque_lock_box(js: LockBox?)

this function is used to release a lock box that is used by the jobque.

Arguments:
release_capture_jobque_stream(s: Stream?)

this function is used to release a stream that is used by the jobque.

Arguments: