5.1.35. Job Queue (jobque)
This tutorial covers the jobque_boost module — daslang’s built-in
multi-threading primitives. The module provides a work-stealing thread pool,
typed channels for inter-thread communication, job statuses and wait groups
for synchronization, lock boxes for shared mutable state, and high-level
helpers like parallel_for.
require daslib/jobque_boost
5.1.35.1. Core concepts
Job queue — a work-stealing thread pool initialized by
with_job_que. All multi-threading operations must be performed inside this scope.Job — a unit of work dispatched to the thread pool via
new_job. Each job runs in a cloned context — it does not share memory with the caller.Channel — a thread-safe FIFO queue for passing typed struct data between jobs. Created with
with_channel(count)wherecountis the expected number ofnotify_and_releasecalls before the channel closes.JobStatus — a counter that tracks asynchronous completion.
notify_and_releasedecrements it,joinblocks until it reaches zero.Wait group —
with_wait_groupwraps aJobStatuswith automaticjoinon scope exit.doneis an alias fornotify_and_release.LockBox — a mutex-protected single-value container.
setstores a value,getreads it inside a block.Thread —
new_threadspawns a dedicated OS thread, outside the thread pool.
Channel and lock-box data must be a struct (or handled type) — primitives cannot be heap-allocated. Wrap them in a struct when needed.
5.1.35.2. Starting the job queue
with_job_que initializes the thread pool and runs the block. All
job-queue operations must happen inside this scope:
with_job_que() {
print("Job queue is active\n")
}
// output: Job queue is active
5.1.35.3. Spawning jobs
new_job dispatches work to the thread pool. Each job runs in a cloned
context. Use channels to communicate results back:
with_job_que() {
with_channel(1) $(ch) {
new_job() @() {
ch |> push_clone(IntVal(v = 42))
ch |> notify_and_release
}
ch |> for_each_clone() $(val : IntVal#) {
print("Received from job: {val.v}\n")
}
}
}
// output: Received from job: 42
5.1.35.4. Channels
A Channel is a thread-safe FIFO queue. Create one with
with_channel(expected_count) where the count matches the number of
notify_and_release calls that will be made. push_clone sends data
and for_each_clone receives it, blocking until the channel is drained:
with_job_que() {
with_channel(2) $(ch) {
new_job() @() {
ch |> push_clone(StringVal(s = "hello"))
ch |> notify_and_release
}
new_job() @() {
ch |> push_clone(StringVal(s = "world"))
ch |> notify_and_release
}
var messages : array<string>
ch |> for_each_clone() $(msg : StringVal#) {
messages |> push(clone_string(msg.s))
}
sort(messages)
for (m in messages) {
print(" {m}\n")
}
}
}
// output:
// hello
// world
Channels work with any struct type — not just simple wrappers:
struct WorkResult {
index : int
value : int
}
with_channel(1) $(ch) {
new_job() @() {
for (i in range(3)) {
ch |> push_clone(WorkResult(index = i, value = i * i))
}
ch |> notify_and_release
}
ch |> for_each_clone() $(r : WorkResult#) {
print(" [{r.index}] = {r.value}\n")
}
}
// output:
// [0] = 0
// [1] = 1
// [2] = 4
5.1.35.5. Multiple producers
When multiple jobs push to the same channel, set the channel count to the
number of producers. for_each_clone blocks until all have called
notify_and_release:
let num_producers = 3
with_channel(num_producers) $(ch) {
for (p in range(num_producers)) {
new_job() @() {
ch |> push_clone(IntVal(v = p * 100))
ch |> notify_and_release
}
}
var results : array<int>
ch |> for_each_clone() $(val : IntVal#) {
results |> push(val.v)
}
sort(results)
print("producers: {results}\n")
}
// output: producers: [0, 100, 200]
5.1.35.6. JobStatus
A JobStatus tracks completion of asynchronous work.
with_job_status(count) creates a status expecting count notifications.
Each call to notify_and_release decrements the counter. join blocks
until all notifications arrive:
with_job_que() {
with_job_status(3) $(status) {
for (i in range(3)) {
new_job() @() {
status |> notify_and_release
}
}
status |> join
}
print("All 3 jobs finished\n")
}
// output: All 3 jobs finished
5.1.35.7. Wait groups
with_wait_group is a convenience wrapper — it combines
with_job_status and join. done is an alias for
notify_and_release:
with_job_que() {
with_wait_group(3) $(wg) {
for (i in range(3)) {
new_job() @() {
wg |> done
}
}
}
print("all jobs complete\n")
}
// output: all jobs complete
5.1.35.8. LockBox
A LockBox holds a single struct value protected by a mutex. set
stores a value and get reads it inside a block. These operations must
run inside a job context (new_job or new_thread):
struct BoxCounter {
value : int
}
with_job_que() {
with_lock_box() $(box) {
with_channel(1) $(ch) {
new_job() @() {
box |> set(BoxCounter(value = 42))
box |> get() $(c : BoxCounter#) {
ch |> push_clone(IntVal(v = c.value))
}
box |> release
ch |> notify_and_release
}
ch |> for_each_clone() $(val : IntVal#) {
print("lockbox value: {val.v}\n")
}
}
}
}
// output: lockbox value: 42
5.1.35.9. parallel_for
parallel_for splits a range [begin..end) into chunks and runs them on
the job queue. A macro automatically wraps the block body in new_job — you
write sequential code inside:
with_job_que() {
let num_jobs = 3
with_channel(num_jobs) $(ch) {
parallel_for(0, 10, num_jobs) $(job_begin, job_end, wg) {
for (i in range(job_begin, job_end)) {
ch |> push_clone(IntVal(v = i * i))
}
ch |> notify_and_release
wg |> done
}
var results : array<int>
ch |> for_each_clone() $(val : IntVal#) {
results |> push(val.v)
}
sort(results)
print("squares: {results}\n")
}
}
// output: squares: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
The block receives three parameters:
job_begin,job_end— the index sub-range for this chunkwg— aJobStatus?that must be signaled viadone(wg)when the chunk finishes
When using a channel inside parallel_for, set the channel count to
num_jobs since each chunk calls notify_and_release once.
5.1.35.10. new_thread
new_thread creates a dedicated OS thread outside the thread pool. Use it
for long-lived work that should not block the job queue:
with_job_que() {
with_channel(1) $(ch) {
new_thread() @() {
ch |> push_clone(StringVal(s = "from thread"))
ch |> notify_and_release
}
ch |> for_each_clone() $(msg : StringVal#) {
print("{msg.s}\n")
}
}
}
// output: from thread
See also
Full source: tutorials/language/35_jobque.das
Previous tutorial: Entity Component System (DECS)
Next tutorial: Pointers
Lambda — lambda language reference.
Blocks — block language reference.