9.1. Jobs and threads

The JOBQUE module provides low-level job queue and threading primitives. It includes thread-safe channels for inter-thread communication, lock boxes for shared data access, job status tracking, and fine-grained thread management. For higher-level job abstractions, see jobque_boost.

See Job Queue (jobque) for a hands-on tutorial.

All functions and symbols are in “jobque” module, use require to get access to it.

require jobque

Example:

require jobque

    [export]
    def main() {
        with_atomic32() $(counter) {
            counter |> set(10)
            print("value = {counter |> get}\n")
            let after_inc = counter |> inc
            print("after inc = {after_inc}\n")
            let after_dec = counter |> dec
            print("after dec = {after_dec}\n")
        }
    }
    // output:
    // value = 10
    // after inc = 11
    // after dec = 10

9.1.1. Handled structures

LockBox

Lockbox. Similar to channel, only for single object.

Stream
Stream.isEmpty(): bool

Whether the channel or pipe contains no remaining elements.

Stream.total(): int

Total number of elements that have been added to the pipe.

Properties:
  • isEmpty : bool

  • total : int

FIFO pipe of opaque byte buffers shared between contexts and threads. Built on top of JobStatus so it supports refcounting and join. Unlike Channel, a Stream stores raw bytes copied into runtime-owned memory, so the consumer can safely free a popped buffer even if the producer’s context is long gone. Typically used together with the push_archive / pop_archive / gather_archive helpers in jobque_boost to ship serialized command records across thread boundaries.

Atomic64

Atomic 64 bit integer.

Atomic32

Atomic 32 bit integer.

JobStatus
JobStatus.isReady(): bool

Whether the job has completed execution.

JobStatus.isValid(): bool

Whether the job status object refers to a valid, active job.

JobStatus.size(): int

Returns the current entry count of the JobStatus or Channel.

Properties:
  • isReady : bool

    • isValid : bool

    • size : int

Job status indicator (ready or not, as well as entry count).

Channel
Channel.isEmpty(): bool

Whether the channel or pipe contains no remaining elements.

Channel.total(): int

Total number of elements that have been added to the pipe.

Properties:
  • isEmpty : bool

    • total : int

Channel provides a way to communicate between multiple contexts, including threads and jobs. Channel has internal entry count.

9.1.2. Channel, JobStatus, Lockbox, Stream

add_ref(status: JobStatus?)

Increases the reference count of a JobStatus or Channel, preventing premature deletion.

Arguments:
append(channel: JobStatus?; size: int): int

Increases the entry count of the channel, signaling that new work has been added.

Arguments:
channel_create(): Channel?

Warning

This is unsafe operation.

Creates a new Channel for inter-thread communication and synchronization.

channel_remove(channel: Channel?&)

Warning

This is unsafe operation.

Destroys a Channel and releases its resources.

Arguments:
job_status_create(): JobStatus?

Creates a new JobStatus object for tracking the completion state of asynchronous jobs.

job_status_remove(jobStatus: JobStatus?&)

Warning

This is unsafe operation.

Destroys a JobStatus object and releases its resources.

Arguments:
join(job: JobStatus?)

Blocks the current thread until the job or channel’s entry count reaches zero, indicating all work is complete.

Arguments:
lock_box_create(): LockBox?

Creates a new LockBox for thread-safe shared access to a single value.

lock_box_remove(box: LockBox?&)

Warning

This is unsafe operation.

Destroys a LockBox and releases its resources.

Arguments:
notify(job: JobStatus?)

Decreases the channel’s entry count, signaling that one unit of work has completed.

Use notify when the caller does not own a reference to the channel — for example when a Channel? is passed as a plain function argument via invoke_in_context. In that scenario no lambda captures the channel, so no extra reference was added and there is nothing to release.

Compare with notify_and_release, which additionally releases a reference and should be used inside lambdas that captured the channel (adding a reference).

Arguments:
notify_and_release(job: JobStatus?&)

Decreases the entry count and the reference count of a Channel or JobStatus in a single operation. After the call the channel/status variable is set to null.

Use notify_and_release inside lambdas that captured the channel. Capturing adds a reference, so the lambda must release it when done. This function combines notify + release into one atomic step and nulls the variable to prevent accidental reuse.

If the caller does not own a reference (e.g. the channel was passed as a plain argument via invoke_in_context, with no lambda capture), use notify instead — calling notify_and_release in that case would release a reference the caller never added, leading to a premature free.

Arguments:
release(status: JobStatus?&)

Decreases the reference count of a JobStatus or Channel; the object is deleted when the count reaches zero.

Arguments:
stream_create(): Stream?

Warning

This is unsafe operation.

Creates a new empty Stream, a FIFO of opaque byte buffers owned by the runtime. Payload bytes are copied into the stream on push and produced as non-owning views on pop / gather. Either side can safely destroy the stream — memory belongs to the stream itself rather than to a particular context.

stream_remove(stream: Stream?&)

Warning

This is unsafe operation.

Releases the caller’s reference to a Stream and sets the local pointer to null. When the last reference is released the stream is destroyed along with any buffered byte blobs it still owns.

Arguments:

9.1.3. Queries

get_total_hw_jobs(): int

Returns the total number of hardware threads allocated to the job system.

get_total_hw_threads(): int

Returns the total number of hardware threads available on the system.

is_job_que_shutting_down(): bool

Returns true if the job queue infrastructure is shutting down or has not been initialized.

9.1.4. Internal invocations

new_debugger_thread(block: block<():void>)

Creates a new debugger tick thread for servicing debug connections.

Arguments:
  • block : block<void> implicit

new_job_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)

Clones the current context, moves the attached lambda into it, and submits it to the job queue.

Arguments:
  • lambda : lambda<void>

  • function : function<void>

  • lambdaSize : int

new_thread_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)

Clones the current context, moves the attached lambda into it, and runs it on a new dedicated thread.

Arguments:
  • lambda : lambda<void>

  • function : function<void>

  • lambdaSize : int

9.1.5. Construction

9.1.5.1. with_channel

with_channel(block: block<(Channel?):void>)

Creates a Channel scoped to the given block and automatically destroys it afterward.

Arguments:
  • block : block<( Channel?):void> implicit

with_channel(count: int; block: block<(Channel?):void>)

with_job_que(block: block<():void>)

Ensures job queue infrastructure is initialized for the duration of the block.

Arguments:
  • block : block<void> implicit

with_job_status(total: int; block: block<(JobStatus?):void>)

Creates a JobStatus scoped to the given block and automatically destroys it afterward.

Arguments:
  • total : int

  • block : block<( JobStatus?):void> implicit

with_lock_box(block: block<(LockBox?):void>)

Creates a LockBox scoped to the given block and automatically destroys it afterward.

Arguments:
  • block : block<( LockBox?):void> implicit

9.1.5.2. with_stream

with_stream(block: block<(Stream?):void>)

Creates an unbounded Stream, invokes the given block with a stable pointer to it, and automatically releases the reference when the block returns. The simplest way to scope a stream to a producer/consumer pair.

Arguments:
  • block : block<( Stream?):void> implicit

with_stream(count: int; block: block<(Stream?):void>)

9.1.6. Atomic

atomic32_create(): Atomic32?

Creates an Atomic32 — a thread-safe 32-bit integer for lock-free concurrent access.

atomic32_remove(atomic: Atomic32?&)

Warning

This is unsafe operation.

Destroys an Atomic32 and releases its resources.

Arguments:
atomic64_create(): Atomic64?

Creates an Atomic64 — a thread-safe 64-bit integer for lock-free concurrent access.

atomic64_remove(atomic: Atomic64?&)

Warning

This is unsafe operation.

Destroys an Atomic64 and releases its resources.

Arguments:

9.1.6.1. dec

dec(atomic: Atomic32?): int

Atomically decrements the integer value and returns the result.

Arguments:
dec(atomic: Atomic64?): int64

9.1.6.2. get

get(atomic: Atomic32?): int

Returns the current value of the atomic integer.

Arguments:
get(atomic: Atomic64?): int64

9.1.6.3. inc

inc(atomic: Atomic32?): int

Atomically increments the integer value and returns the result.

Arguments:
inc(atomic: Atomic64?): int64

9.1.6.4. set

set(atomic: Atomic32?; value: int)

Sets the atomic integer to the specified value.

Arguments:
  • atomic : Atomic32? implicit

  • value : int

set(atomic: Atomic64?; value: int64)

with_atomic32(block: block<(Atomic32?):void>)

Creates an Atomic32 scoped to the given block and automatically destroys it afterward.

Arguments:
  • block : block<( Atomic32?):void> implicit

with_atomic64(block: block<(Atomic64?):void>)

Creates an Atomic64 scoped to the given block and automatically destroys it afterward.

Arguments:
  • block : block<( Atomic64?):void> implicit