9.1. Jobs and threads

The JOBQUE module provides low-level job queue and threading primitives. It includes thread-safe channels for inter-thread communication, lock boxes for shared data access, job status tracking, and fine-grained thread management. For higher-level job abstractions, see jobque_boost.

See Job Queue (jobque) for a hands-on tutorial.

All functions and symbols are in “jobque” module, use require to get access to it.

require jobque

Example:

require jobque

    [export]
    def main() {
        with_atomic32() $(counter) {
            counter |> set(10)
            print("value = {counter |> get}\n")
            let after_inc = counter |> inc
            print("after inc = {after_inc}\n")
            let after_dec = counter |> dec
            print("after dec = {after_dec}\n")
        }
    }
    // output:
    // value = 10
    // after inc = 11
    // after dec = 10

9.1.1. Handled structures

Atomic64

Atomic 64 bit integer.

LockBox

Lockbox. Similar to channel, only for single object.

Atomic32

Atomic 32 bit integer.

JobStatus
JobStatus.isReady(): bool

Whether the job has completed execution.

JobStatus.isValid(): bool

Whether the job status object refers to a valid, active job.

JobStatus.size(): int

Returns the current entry count of the JobStatus or Channel.

Properties:
  • isReady : bool

    • isValid : bool

    • size : int

Job status indicator (ready or not, as well as entry count).

Channel
Channel.isEmpty(): bool

Whether the channel or pipe contains no remaining elements.

Channel.total(): int

Total number of elements that have been added to the pipe.

Properties:
  • isEmpty : bool

    • total : int

Channel provides a way to communicate between multiple contexts, including threads and jobs. Channel has internal entry count.

9.1.2. Channel, JobStatus, Lockbox

add_ref(status: JobStatus?)

Increases the reference count of a JobStatus or Channel, preventing premature deletion.

Arguments:
append(channel: JobStatus?; size: int): int

Increases the entry count of the channel, signaling that new work has been added.

Arguments:
channel_create(): Channel?

Warning

This is unsafe operation.

Creates a new Channel for inter-thread communication and synchronization.

channel_remove(channel: Channel?&)

Warning

This is unsafe operation.

Destroys a Channel and releases its resources.

Arguments:
job_status_create(): JobStatus?

Creates a new JobStatus object for tracking the completion state of asynchronous jobs.

job_status_remove(jobStatus: JobStatus?&)

Warning

This is unsafe operation.

Destroys a JobStatus object and releases its resources.

Arguments:
join(job: JobStatus?)

Blocks the current thread until the job or channel’s entry count reaches zero, indicating all work is complete.

Arguments:
lock_box_create(): LockBox?

Creates a new LockBox for thread-safe shared access to a single value.

lock_box_remove(box: LockBox?&)

Warning

This is unsafe operation.

Destroys a LockBox and releases its resources.

Arguments:
notify(job: JobStatus?)

Decreases the channel’s entry count, signaling that one unit of work has completed.

Use notify when the caller does not own a reference to the channel — for example when a Channel? is passed as a plain function argument via invoke_in_context. In that scenario no lambda captures the channel, so no extra reference was added and there is nothing to release.

Compare with notify_and_release, which additionally releases a reference and should be used inside lambdas that captured the channel (adding a reference).

Arguments:
notify_and_release(job: JobStatus?&)

Decreases the entry count and the reference count of a Channel or JobStatus in a single operation. After the call the channel/status variable is set to null.

Use notify_and_release inside lambdas that captured the channel. Capturing adds a reference, so the lambda must release it when done. This function combines notify + release into one atomic step and nulls the variable to prevent accidental reuse.

If the caller does not own a reference (e.g. the channel was passed as a plain argument via invoke_in_context, with no lambda capture), use notify instead — calling notify_and_release in that case would release a reference the caller never added, leading to a premature free.

Arguments:
release(status: JobStatus?&)

Decreases the reference count of a JobStatus or Channel; the object is deleted when the count reaches zero.

Arguments:

9.1.3. Queries

get_total_hw_jobs(): int

Returns the total number of hardware threads allocated to the job system.

get_total_hw_threads(): int

Returns the total number of hardware threads available on the system.

is_job_que_shutting_down(): bool

Returns true if the job queue infrastructure is shutting down or has not been initialized.

9.1.4. Internal invocations

new_debugger_thread(block: block<():void>)

Creates a new debugger tick thread for servicing debug connections.

Arguments:
  • block : block<void> implicit

new_job_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)

Clones the current context, moves the attached lambda into it, and submits it to the job queue.

Arguments:
  • lambda : lambda<void>

  • function : function<void>

  • lambdaSize : int

new_thread_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)

Clones the current context, moves the attached lambda into it, and runs it on a new dedicated thread.

Arguments:
  • lambda : lambda<void>

  • function : function<void>

  • lambdaSize : int

9.1.5. Construction

9.1.5.1. with_channel

with_channel(block: block<(Channel?):void>)

Creates a Channel scoped to the given block and automatically destroys it afterward.

Arguments:
  • block : block<( Channel?):void> implicit

with_channel(count: int; block: block<(Channel?):void>)

with_job_que(block: block<():void>)

Ensures job queue infrastructure is initialized for the duration of the block.

Arguments:
  • block : block<void> implicit

with_job_status(total: int; block: block<(JobStatus?):void>)

Creates a JobStatus scoped to the given block and automatically destroys it afterward.

Arguments:
  • total : int

  • block : block<( JobStatus?):void> implicit

with_lock_box(block: block<(LockBox?):void>)

Creates a LockBox scoped to the given block and automatically destroys it afterward.

Arguments:
  • block : block<( LockBox?):void> implicit

9.1.6. Atomic

atomic32_create(): Atomic32?

Creates an Atomic32 — a thread-safe 32-bit integer for lock-free concurrent access.

atomic32_remove(atomic: Atomic32?&)

Warning

This is unsafe operation.

Destroys an Atomic32 and releases its resources.

Arguments:
atomic64_create(): Atomic64?

Creates an Atomic64 — a thread-safe 64-bit integer for lock-free concurrent access.

atomic64_remove(atomic: Atomic64?&)

Warning

This is unsafe operation.

Destroys an Atomic64 and releases its resources.

Arguments:

9.1.6.1. dec

dec(atomic: Atomic32?): int

Atomically decrements the integer value and returns the result.

Arguments:
dec(atomic: Atomic64?): int64

9.1.6.2. get

get(atomic: Atomic32?): int

Returns the current value of the atomic integer.

Arguments:
get(atomic: Atomic64?): int64

9.1.6.3. inc

inc(atomic: Atomic32?): int

Atomically increments the integer value and returns the result.

Arguments:
inc(atomic: Atomic64?): int64

9.1.6.4. set

set(atomic: Atomic32?; value: int)

Sets the atomic integer to the specified value.

Arguments:
  • atomic : Atomic32? implicit

  • value : int

set(atomic: Atomic64?; value: int64)

with_atomic32(block: block<(Atomic32?):void>)

Creates an Atomic32 scoped to the given block and automatically destroys it afterward.

Arguments:
  • block : block<( Atomic32?):void> implicit

with_atomic64(block: block<(Atomic64?):void>)

Creates an Atomic64 scoped to the given block and automatically destroys it afterward.

Arguments:
  • block : block<( Atomic64?):void> implicit