9.1. Jobs and threads

The JOBQUE module provides low-level job queue and threading primitives. It includes thread-safe channels for inter-thread communication, lock boxes for shared data access, job status tracking, and fine-grained thread management. For higher-level job abstractions, see jobque_boost.

See Job Queue (jobque) for a hands-on tutorial.

All functions and symbols are in “jobque” module, use require to get access to it.

require jobque

Example:

require jobque

    [export]
    def main() {
        with_atomic32() $(counter) {
            counter |> set(10)
            print("value = {counter |> get}\n")
            let after_inc = counter |> inc
            print("after inc = {after_inc}\n")
            let after_dec = counter |> dec
            print("after dec = {after_dec}\n")
        }
    }
    // output:
    // value = 10
    // after inc = 11
    // after dec = 10

9.1.1. Handled structures

jobque::JobStatus
JobStatus.isReady() : bool()

Whether the job has completed execution.

JobStatus.isValid() : bool()

Whether the job status object refers to a valid, active job.

JobStatus.size() : int()

Returns the current entry count of the JobStatus or Channel.

Properties
  • isReady : bool

    • isValid : bool

    • size : int

Job status indicator (ready or not, as well as entry count).

jobque::Channel
Channel.isEmpty() : bool()

Whether the channel or pipe contains no remaining elements.

Channel.total() : int()

Total number of elements that have been added to the pipe.

Properties
  • isEmpty : bool

    • total : int

Channel provides a way to communicate between multiple contexts, including threads and jobs. Channel has internal entry count.

jobque::LockBox

Lockbox. Similar to channel, only for single object.

jobque::Atomic32

Atomic 32 bit integer.

jobque::Atomic64

Atomic 64 bit integer.

9.1.2. Channel, JobStatus, Lockbox

jobque::add_ref(status: JobStatus?)

Increases the reference count of a JobStatus or Channel, preventing premature deletion.

Arguments
jobque::append(channel: JobStatus?; size: int) : int()

Increases the entry count of the channel, signaling that new work has been added.

Arguments
jobque::channel_create() : Channel?()

Warning

This is unsafe operation.

Creates a new Channel for inter-thread communication and synchronization.

jobque::channel_remove(channel: Channel?&)

Warning

This is unsafe operation.

Destroys a Channel and releases its resources.

Arguments
jobque::job_status_create() : JobStatus?()

Creates a new JobStatus object for tracking the completion state of asynchronous jobs.

jobque::job_status_remove(jobStatus: JobStatus?&)

Warning

This is unsafe operation.

Destroys a JobStatus object and releases its resources.

Arguments
jobque::join(job: JobStatus?)

Blocks the current thread until the job or channel’s entry count reaches zero, indicating all work is complete.

Arguments
jobque::lock_box_create() : LockBox?()

Creates a new LockBox for thread-safe shared access to a single value.

jobque::lock_box_remove(box: LockBox?&)

Warning

This is unsafe operation.

Destroys a LockBox and releases its resources.

Arguments
jobque::notify(job: JobStatus?)

Decreases the channel’s entry count, signaling that one unit of work has completed.

Use notify when the caller does not own a reference to the channel — for example when a Channel? is passed as a plain function argument via invoke_in_context. In that scenario no lambda captures the channel, so no extra reference was added and there is nothing to release.

Compare with notify_and_release, which additionally releases a reference and should be used inside lambdas that captured the channel (adding a reference).

Arguments
jobque::notify_and_release(job: JobStatus?&)

Decreases the entry count and the reference count of a Channel or JobStatus in a single operation. After the call the channel/status variable is set to null.

Use notify_and_release inside lambdas that captured the channel. Capturing adds a reference, so the lambda must release it when done. This function combines notify + release into one atomic step and nulls the variable to prevent accidental reuse.

If the caller does not own a reference (e.g. the channel was passed as a plain argument via invoke_in_context, with no lambda capture), use notify instead — calling notify_and_release in that case would release a reference the caller never added, leading to a premature free.

Arguments
jobque::release(status: JobStatus?&)

Decreases the reference count of a JobStatus or Channel; the object is deleted when the count reaches zero.

Arguments

9.1.3. Queries

jobque::get_total_hw_jobs() : int()

Returns the total number of hardware threads allocated to the job system.

jobque::get_total_hw_threads() : int()

Returns the total number of hardware threads available on the system.

jobque::is_job_que_shutting_down() : bool()

Returns true if the job queue infrastructure is shutting down or has not been initialized.

9.1.4. Internal invocations

jobque::new_debugger_thread(block: block<():void>)

Creates a new debugger tick thread for servicing debug connections.

Arguments
  • block : block<void> implicit

jobque::new_job_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)

Clones the current context, moves the attached lambda into it, and submits it to the job queue.

Arguments
  • lambda : lambda<void>

  • function : function<void>

  • lambdaSize : int

jobque::new_thread_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)

Clones the current context, moves the attached lambda into it, and runs it on a new dedicated thread.

Arguments
  • lambda : lambda<void>

  • function : function<void>

  • lambdaSize : int

9.1.5. Construction

9.1.5.1. with_channel

jobque::with_channel(block: block<(Channel?):void>)

Creates a Channel scoped to the given block and automatically destroys it afterward.

Arguments
  • block : block<( Channel?):void> implicit

jobque::with_channel(count: int; block: block<(Channel?):void>)

jobque::with_job_que(block: block<():void>)

Ensures job queue infrastructure is initialized for the duration of the block.

Arguments
  • block : block<void> implicit

jobque::with_job_status(total: int; block: block<(JobStatus?):void>)

Creates a JobStatus scoped to the given block and automatically destroys it afterward.

Arguments
  • total : int

  • block : block<( JobStatus?):void> implicit

jobque::with_lock_box(block: block<(LockBox?):void>)

Creates a LockBox scoped to the given block and automatically destroys it afterward.

Arguments
  • block : block<( LockBox?):void> implicit

9.1.6. Atomic

jobque::atomic32_create() : Atomic32?()

Creates an Atomic32 — a thread-safe 32-bit integer for lock-free concurrent access.

jobque::atomic32_remove(atomic: Atomic32?&)

Warning

This is unsafe operation.

Destroys an Atomic32 and releases its resources.

Arguments
jobque::atomic64_create() : Atomic64?()

Creates an Atomic64 — a thread-safe 64-bit integer for lock-free concurrent access.

jobque::atomic64_remove(atomic: Atomic64?&)

Warning

This is unsafe operation.

Destroys an Atomic64 and releases its resources.

Arguments

9.1.6.1. dec

jobque::dec(atomic: Atomic32?) : int()

Atomically decrements the integer value and returns the result.

Arguments
jobque::dec(atomic: Atomic64?) : int64()

9.1.6.2. get

jobque::get(atomic: Atomic32?) : int()

Returns the current value of the atomic integer.

Arguments
jobque::get(atomic: Atomic64?) : int64()

9.1.6.3. inc

jobque::inc(atomic: Atomic32?) : int()

Atomically increments the integer value and returns the result.

Arguments
jobque::inc(atomic: Atomic64?) : int64()

9.1.6.4. set

jobque::set(atomic: Atomic32?; value: int)

Sets the atomic integer to the specified value.

Arguments
  • atomic : Atomic32? implicit

  • value : int

jobque::set(atomic: Atomic64?; value: int64)

jobque::with_atomic32(block: block<(Atomic32?):void>)

Creates an Atomic32 scoped to the given block and automatically destroys it afterward.

Arguments
  • block : block<( Atomic32?):void> implicit

jobque::with_atomic64(block: block<(Atomic64?):void>)

Creates an Atomic64 scoped to the given block and automatically destroys it afterward.

Arguments
  • block : block<( Atomic64?):void> implicit