9.1. Jobs and threads
The JOBQUE module provides low-level job queue and threading primitives.
It includes thread-safe channels for inter-thread communication, lock boxes
for shared data access, job status tracking, and fine-grained thread
management. For higher-level job abstractions, see jobque_boost.
See Job Queue (jobque) for a hands-on tutorial.
All functions and symbols are in “jobque” module, use require to get access to it.
require jobque
Example:
require jobque
[export]
def main() {
with_atomic32() $(counter) {
counter |> set(10)
print("value = {counter |> get}\n")
let after_inc = counter |> inc
print("after inc = {after_inc}\n")
let after_dec = counter |> dec
print("after dec = {after_dec}\n")
}
}
// output:
// value = 10
// after inc = 11
// after dec = 10
9.1.1. Handled structures
- Atomic64
Atomic 64 bit integer.
- LockBox
Lockbox. Similar to channel, only for single object.
- Atomic32
Atomic 32 bit integer.
- JobStatus
- JobStatus.isReady(): bool
Whether the job has completed execution.
- JobStatus.isValid(): bool
Whether the job status object refers to a valid, active job.
- JobStatus.size(): int
Returns the current entry count of the JobStatus or Channel.
- Properties:
isReady : bool
isValid : bool
size : int
Job status indicator (ready or not, as well as entry count).
- Channel
- Channel.isEmpty(): bool
Whether the channel or pipe contains no remaining elements.
- Channel.total(): int
Total number of elements that have been added to the pipe.
- Properties:
isEmpty : bool
total : int
Channel provides a way to communicate between multiple contexts, including threads and jobs. Channel has internal entry count.
9.1.2. Channel, JobStatus, Lockbox
- add_ref(status: JobStatus?)
Increases the reference count of a JobStatus or Channel, preventing premature deletion.
- Arguments:
status : JobStatus? implicit
- append(channel: JobStatus?; size: int): int
Increases the entry count of the channel, signaling that new work has been added.
- Arguments:
channel : JobStatus? implicit
size : int
- channel_create(): Channel?
Warning
This is unsafe operation.
Creates a new Channel for inter-thread communication and synchronization.
- channel_remove(channel: Channel?&)
Warning
This is unsafe operation.
Destroys a Channel and releases its resources.
- Arguments:
channel : Channel?& implicit
- job_status_create(): JobStatus?
Creates a new JobStatus object for tracking the completion state of asynchronous jobs.
- job_status_remove(jobStatus: JobStatus?&)
Warning
This is unsafe operation.
Destroys a JobStatus object and releases its resources.
- Arguments:
jobStatus : JobStatus?& implicit
- join(job: JobStatus?)
Blocks the current thread until the job or channel’s entry count reaches zero, indicating all work is complete.
- Arguments:
job : JobStatus? implicit
- lock_box_create(): LockBox?
Creates a new LockBox for thread-safe shared access to a single value.
- lock_box_remove(box: LockBox?&)
Warning
This is unsafe operation.
Destroys a LockBox and releases its resources.
- Arguments:
box : LockBox?& implicit
- notify(job: JobStatus?)
Decreases the channel’s entry count, signaling that one unit of work has completed.
Use notify when the caller does not own a reference to the channel — for example
when a Channel? is passed as a plain function argument via invoke_in_context.
In that scenario no lambda captures the channel, so no extra reference was added and
there is nothing to release.
Compare with notify_and_release, which additionally releases a reference and should
be used inside lambdas that captured the channel (adding a reference).
- Arguments:
job : JobStatus? implicit
- notify_and_release(job: JobStatus?&)
Decreases the entry count and the reference count of a Channel or JobStatus
in a single operation. After the call the channel/status variable is set to null.
Use notify_and_release inside lambdas that captured the channel. Capturing adds a
reference, so the lambda must release it when done. This function combines
notify + release into one atomic step and nulls the variable to prevent
accidental reuse.
If the caller does not own a reference (e.g. the channel was passed as a plain
argument via invoke_in_context, with no lambda capture), use notify instead —
calling notify_and_release in that case would release a reference the caller never
added, leading to a premature free.
- Arguments:
job : JobStatus?& implicit
- release(status: JobStatus?&)
Decreases the reference count of a JobStatus or Channel; the object is deleted when the count reaches zero.
- Arguments:
status : JobStatus?& implicit
9.1.3. Queries
- get_total_hw_jobs(): int
Returns the total number of hardware threads allocated to the job system.
- get_total_hw_threads(): int
Returns the total number of hardware threads available on the system.
- is_job_que_shutting_down(): bool
Returns true if the job queue infrastructure is shutting down or has not been initialized.
9.1.4. Internal invocations
- new_debugger_thread(block: block<():void>)
Creates a new debugger tick thread for servicing debug connections.
- Arguments:
block : block<void> implicit
- new_job_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)
Clones the current context, moves the attached lambda into it, and submits it to the job queue.
- Arguments:
lambda : lambda<void>
function : function<void>
lambdaSize : int
- new_thread_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)
Clones the current context, moves the attached lambda into it, and runs it on a new dedicated thread.
- Arguments:
lambda : lambda<void>
function : function<void>
lambdaSize : int
9.1.5. Construction
9.1.5.1. with_channel
- with_channel(block: block<(Channel?):void>)
Creates a Channel scoped to the given block and automatically destroys it afterward.
- Arguments:
block : block<( Channel?):void> implicit
- with_channel(count: int; block: block<(Channel?):void>)
- with_job_que(block: block<():void>)
Ensures job queue infrastructure is initialized for the duration of the block.
- Arguments:
block : block<void> implicit
- with_job_status(total: int; block: block<(JobStatus?):void>)
Creates a JobStatus scoped to the given block and automatically destroys it afterward.
- Arguments:
total : int
block : block<( JobStatus?):void> implicit
- with_lock_box(block: block<(LockBox?):void>)
Creates a LockBox scoped to the given block and automatically destroys it afterward.
- Arguments:
block : block<( LockBox?):void> implicit
9.1.6. Atomic
- atomic32_create(): Atomic32?
Creates an Atomic32 — a thread-safe 32-bit integer for lock-free concurrent access.
- atomic32_remove(atomic: Atomic32?&)
Warning
This is unsafe operation.
Destroys an Atomic32 and releases its resources.
- Arguments:
atomic : Atomic32?& implicit
- atomic64_create(): Atomic64?
Creates an Atomic64 — a thread-safe 64-bit integer for lock-free concurrent access.
- atomic64_remove(atomic: Atomic64?&)
Warning
This is unsafe operation.
Destroys an Atomic64 and releases its resources.
- Arguments:
atomic : Atomic64?& implicit
9.1.6.1. dec
- dec(atomic: Atomic32?): int
Atomically decrements the integer value and returns the result.
- Arguments:
atomic : Atomic32? implicit
- dec(atomic: Atomic64?): int64
9.1.6.2. get
- get(atomic: Atomic32?): int
Returns the current value of the atomic integer.
- Arguments:
atomic : Atomic32? implicit
- get(atomic: Atomic64?): int64
9.1.6.3. inc
- inc(atomic: Atomic32?): int
Atomically increments the integer value and returns the result.
- Arguments:
atomic : Atomic32? implicit
- inc(atomic: Atomic64?): int64
9.1.6.4. set
- set(atomic: Atomic32?; value: int)
Sets the atomic integer to the specified value.
- Arguments:
atomic : Atomic32? implicit
value : int
- set(atomic: Atomic64?; value: int64)
- with_atomic32(block: block<(Atomic32?):void>)
Creates an Atomic32 scoped to the given block and automatically destroys it afterward.
- Arguments:
block : block<( Atomic32?):void> implicit
- with_atomic64(block: block<(Atomic64?):void>)
Creates an Atomic64 scoped to the given block and automatically destroys it afterward.
- Arguments:
block : block<( Atomic64?):void> implicit