7.1.35. Job Queue (jobque)
This tutorial covers the jobque_boost module — daslang’s built-in
multi-threading primitives. The module provides a work-stealing thread pool,
typed channels for inter-thread communication, byte streams for
cross-context messaging, job statuses and wait groups for synchronization,
lock boxes for shared mutable state, and high-level helpers like
parallel_for.
require daslib/jobque_boost
7.1.35.1. Core concepts
Job queue — a work-stealing thread pool initialized by
with_job_que. All multi-threading operations must be performed inside this scope.Job — a unit of work dispatched to the thread pool via
new_job. Each job runs in a cloned context — it does not share memory with the caller.Channel — a thread-safe FIFO queue for passing typed struct data between jobs. Created with
with_channel(count)wherecountis the expected number ofnotify_and_releasecalls before the channel closes.Stream — a byte-oriented FIFO whose buffers are owned C++-side, so either end may be freed without coordinating with the other. Preferred over channels when producer and consumer live in different contexts. Carries raw bytes (
push/pop) or structured data serialized viadaslib/archive(push_archive/pop_archive).JobStatus — a counter that tracks asynchronous completion.
notify_and_releasedecrements it,joinblocks until it reaches zero.Wait group —
with_wait_groupwraps aJobStatuswith automaticjoinon scope exit.doneis an alias fornotify_and_release.LockBox — a mutex-protected single-value container.
setstores a value,getreads it inside a block.Thread —
new_threadspawns a dedicated OS thread, outside the thread pool.
Channel and lock-box data must be a struct (or handled type) — primitives cannot be heap-allocated. Wrap them in a struct when needed.
7.1.35.2. Starting the job queue
with_job_que initializes the thread pool and runs the block. All
job-queue operations must happen inside this scope:
with_job_que() {
print("Job queue is active\n")
}
// output: Job queue is active
7.1.35.3. Spawning jobs
new_job dispatches work to the thread pool. Each job runs in a cloned
context. Use channels to communicate results back:
with_job_que() {
with_channel(1) $(ch) {
new_job() @() {
ch |> push_clone(IntVal(v = 42))
ch |> notify_and_release
}
ch |> for_each_clone() $(val : IntVal#) {
print("Received from job: {val.v}\n")
}
}
}
// output: Received from job: 42
7.1.35.4. Channels
A Channel is a thread-safe FIFO queue. Create one with
with_channel(expected_count) where the count matches the number of
notify_and_release calls that will be made. push_clone sends data
and for_each_clone receives it, blocking until the channel is drained:
with_job_que() {
with_channel(2) $(ch) {
new_job() @() {
ch |> push_clone(StringVal(s = "hello"))
ch |> notify_and_release
}
new_job() @() {
ch |> push_clone(StringVal(s = "world"))
ch |> notify_and_release
}
var messages : array<string>
ch |> for_each_clone() $(msg : StringVal#) {
messages |> push(clone_string(msg.s))
}
sort(messages)
for (m in messages) {
print(" {m}\n")
}
}
}
// output:
// hello
// world
Channels work with any struct type — not just simple wrappers:
struct WorkResult {
index : int
value : int
}
with_channel(1) $(ch) {
new_job() @() {
for (i in range(3)) {
ch |> push_clone(WorkResult(index = i, value = i * i))
}
ch |> notify_and_release
}
ch |> for_each_clone() $(r : WorkResult#) {
print(" [{r.index}] = {r.value}\n")
}
}
// output:
// [0] = 0
// [1] = 1
// [2] = 4
7.1.35.5. Multiple producers
When multiple jobs push to the same channel, set the channel count to the
number of producers. for_each_clone blocks until all have called
notify_and_release:
let num_producers = 3
with_channel(num_producers) $(ch) {
for (p in range(num_producers)) {
new_job() @() {
ch |> push_clone(IntVal(v = p * 100))
ch |> notify_and_release
}
}
var results : array<int>
ch |> for_each_clone() $(val : IntVal#) {
results |> push(val.v)
}
sort(results)
print("producers: {results}\n")
}
// output: producers: [0, 100, 200]
7.1.35.6. Streams
A Stream is a byte-oriented FIFO backed by storage that lives outside any
daslang context, so producer and consumer can release their references
independently — this is what makes streams the preferred transport between
threads or contexts whose lifetimes you cannot coordinate.
with_stream() opens a stream that is ready immediately. push copies
bytes onto the tail; try_pop is non-blocking and hands the consumer a
borrowed view valid only for the duration of the block:
with_job_que() {
with_stream() $(var s : Stream?) {
var payload = [1u8, 2u8, 3u8, 4u8]
s |> push(payload)
print("buffered blobs: {s.total}\n")
let ok = s |> try_pop() $(view : array<uint8># -const) {
print("popped {length(view)} bytes, first = {view[0]}\n")
}
print("try_pop succeeded: {ok}\n")
}
}
// output:
// buffered blobs: 1
// popped 4 bytes, first = 1
// try_pop succeeded: true
7.1.35.7. Streams carrying structured data
push_archive serializes any struct or variant through daslib/archive
and enqueues the resulting bytes. gather_archive drains the stream,
invoking the block once per message in FIFO order:
require daslib/archive
struct Command {
id : int
name : string
payload : array<int>
}
with_job_que() {
with_stream() $(var s : Stream?) {
for (i in range(3)) {
s |> push_archive(Command(id = i, name = "cmd", payload <- [i, i * 10]))
}
s |> gather_archive() $(var c : Command) {
print(" [{c.id}] {c.name} payload = {c.payload}\n")
}
}
}
// output:
// [0] cmd payload = [0, 0]
// [1] cmd payload = [1, 10]
// [2] cmd payload = [2, 20]
Related: peek reads without draining; pop_with_timeout blocks up to
a bounded interval; push_batch enqueues multiple blobs atomically under
a single lock.
7.1.35.8. Stream across a worker job
A job that captures a Stream must release its reference before exiting.
Two idioms cover the common cases:
Use
s |> releasewhen completion is signalled separately (typically alongside a wait group, as in the example below).Use
s |> notify_and_releasewhen the stream itself is the completion signal — open it withwith_stream(count)and the consumer can observe readiness vias.isReady.
with_job_que() {
with_stream() $(var s : Stream?) {
with_wait_group(1) $(wg) {
new_job() @() {
for (i in range(4)) {
s |> push_archive(Command(id = i, name = "work", payload <- [i * i]))
}
s |> release
wg |> done
}
}
var got : array<int>
s |> gather_archive() $(var c : Command) {
got |> push(c.payload[0])
}
print("squares from worker: {got}\n")
}
}
// output: squares from worker: [0, 1, 4, 9]
7.1.35.9. JobStatus
A JobStatus tracks completion of asynchronous work.
with_job_status(count) creates a status expecting count notifications.
Each call to notify_and_release decrements the counter. join blocks
until all notifications arrive:
with_job_que() {
with_job_status(3) $(status) {
for (i in range(3)) {
new_job() @() {
status |> notify_and_release
}
}
status |> join
}
print("All 3 jobs finished\n")
}
// output: All 3 jobs finished
7.1.35.10. Wait groups
with_wait_group is a convenience wrapper — it combines
with_job_status and join. done is an alias for
notify_and_release:
with_job_que() {
with_wait_group(3) $(wg) {
for (i in range(3)) {
new_job() @() {
wg |> done
}
}
}
print("all jobs complete\n")
}
// output: all jobs complete
7.1.35.11. LockBox
A LockBox holds a single struct value protected by a mutex. set
stores a value and get reads it inside a block. These operations must
run inside a job context (new_job or new_thread):
struct BoxCounter {
value : int
}
with_job_que() {
with_lock_box() $(box) {
with_channel(1) $(ch) {
new_job() @() {
box |> set(BoxCounter(value = 42))
box |> get() $(c : BoxCounter#) {
ch |> push_clone(IntVal(v = c.value))
}
box |> release
ch |> notify_and_release
}
ch |> for_each_clone() $(val : IntVal#) {
print("lockbox value: {val.v}\n")
}
}
}
}
// output: lockbox value: 42
7.1.35.12. parallel_for
parallel_for splits a range [begin..end) into chunks and runs them on
the job queue. A macro automatically wraps the block body in new_job — you
write sequential code inside:
with_job_que() {
let num_jobs = 3
with_channel(num_jobs) $(ch) {
parallel_for(0, 10, num_jobs) $(job_begin, job_end, wg) {
for (i in range(job_begin, job_end)) {
ch |> push_clone(IntVal(v = i * i))
}
ch |> notify_and_release
wg |> done
}
var results : array<int>
ch |> for_each_clone() $(val : IntVal#) {
results |> push(val.v)
}
sort(results)
print("squares: {results}\n")
}
}
// output: squares: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
The block receives three parameters:
job_begin,job_end— the index sub-range for this chunkwg— aJobStatus?that must be signaled viadone(wg)when the chunk finishes
When using a channel inside parallel_for, set the channel count to
num_jobs since each chunk calls notify_and_release once.
7.1.35.13. new_thread
new_thread creates a dedicated OS thread outside the thread pool. Use it
for long-lived work that should not block the job queue:
with_job_que() {
with_channel(1) $(ch) {
new_thread() @() {
ch |> push_clone(StringVal(s = "from thread"))
ch |> notify_and_release
}
ch |> for_each_clone() $(msg : StringVal#) {
print("{msg.s}\n")
}
}
}
// output: from thread
See also
Full source: tutorials/language/35_jobque.das
Previous tutorial: Entity Component System (DECS)
Next tutorial: Pointers
Lambda — lambda language reference.
Blocks — block language reference.