7.1.35. Job Queue (jobque)

This tutorial covers the jobque_boost module — daslang’s built-in multi-threading primitives. The module provides a work-stealing thread pool, typed channels for inter-thread communication, byte streams for cross-context messaging, job statuses and wait groups for synchronization, lock boxes for shared mutable state, and high-level helpers like parallel_for.

require daslib/jobque_boost

7.1.35.1. Core concepts

  • Job queue — a work-stealing thread pool initialized by with_job_que. All multi-threading operations must be performed inside this scope.

  • Job — a unit of work dispatched to the thread pool via new_job. Each job runs in a cloned context — it does not share memory with the caller.

  • Channel — a thread-safe FIFO queue for passing typed struct data between jobs. Created with with_channel(count) where count is the expected number of notify_and_release calls before the channel closes.

  • Stream — a byte-oriented FIFO whose buffers are owned C++-side, so either end may be freed without coordinating with the other. Preferred over channels when producer and consumer live in different contexts. Carries raw bytes (push / pop) or structured data serialized via daslib/archive (push_archive / pop_archive).

  • JobStatus — a counter that tracks asynchronous completion. notify_and_release decrements it, join blocks until it reaches zero.

  • Wait groupwith_wait_group wraps a JobStatus with automatic join on scope exit. done is an alias for notify_and_release.

  • LockBox — a mutex-protected single-value container. set stores a value, get reads it inside a block.

  • Threadnew_thread spawns a dedicated OS thread, outside the thread pool.

Channel and lock-box data must be a struct (or handled type) — primitives cannot be heap-allocated. Wrap them in a struct when needed.

7.1.35.2. Starting the job queue

with_job_que initializes the thread pool and runs the block. All job-queue operations must happen inside this scope:

with_job_que() {
    print("Job queue is active\n")
}
// output: Job queue is active

7.1.35.3. Spawning jobs

new_job dispatches work to the thread pool. Each job runs in a cloned context. Use channels to communicate results back:

with_job_que() {
    with_channel(1) $(ch) {
        new_job() @() {
            ch |> push_clone(IntVal(v = 42))
            ch |> notify_and_release
        }
        ch |> for_each_clone() $(val : IntVal#) {
            print("Received from job: {val.v}\n")
        }
    }
}
// output: Received from job: 42

7.1.35.4. Channels

A Channel is a thread-safe FIFO queue. Create one with with_channel(expected_count) where the count matches the number of notify_and_release calls that will be made. push_clone sends data and for_each_clone receives it, blocking until the channel is drained:

with_job_que() {
    with_channel(2) $(ch) {
        new_job() @() {
            ch |> push_clone(StringVal(s = "hello"))
            ch |> notify_and_release
        }
        new_job() @() {
            ch |> push_clone(StringVal(s = "world"))
            ch |> notify_and_release
        }
        var messages : array<string>
        ch |> for_each_clone() $(msg : StringVal#) {
            messages |> push(clone_string(msg.s))
        }
        sort(messages)
        for (m in messages) {
            print("  {m}\n")
        }
    }
}
// output:
//   hello
//   world

Channels work with any struct type — not just simple wrappers:

struct WorkResult {
    index : int
    value : int
}

with_channel(1) $(ch) {
    new_job() @() {
        for (i in range(3)) {
            ch |> push_clone(WorkResult(index = i, value = i * i))
        }
        ch |> notify_and_release
    }
    ch |> for_each_clone() $(r : WorkResult#) {
        print("  [{r.index}] = {r.value}\n")
    }
}
// output:
//   [0] = 0
//   [1] = 1
//   [2] = 4

7.1.35.5. Multiple producers

When multiple jobs push to the same channel, set the channel count to the number of producers. for_each_clone blocks until all have called notify_and_release:

let num_producers = 3
with_channel(num_producers) $(ch) {
    for (p in range(num_producers)) {
        new_job() @() {
            ch |> push_clone(IntVal(v = p * 100))
            ch |> notify_and_release
        }
    }
    var results : array<int>
    ch |> for_each_clone() $(val : IntVal#) {
        results |> push(val.v)
    }
    sort(results)
    print("producers: {results}\n")
}
// output: producers: [0, 100, 200]

7.1.35.6. Streams

A Stream is a byte-oriented FIFO backed by storage that lives outside any daslang context, so producer and consumer can release their references independently — this is what makes streams the preferred transport between threads or contexts whose lifetimes you cannot coordinate.

with_stream() opens a stream that is ready immediately. push copies bytes onto the tail; try_pop is non-blocking and hands the consumer a borrowed view valid only for the duration of the block:

with_job_que() {
    with_stream() $(var s : Stream?) {
        var payload = [1u8, 2u8, 3u8, 4u8]
        s |> push(payload)
        print("buffered blobs: {s.total}\n")

        let ok = s |> try_pop() $(view : array<uint8># -const) {
            print("popped {length(view)} bytes, first = {view[0]}\n")
        }
        print("try_pop succeeded: {ok}\n")
    }
}
// output:
// buffered blobs: 1
// popped 4 bytes, first = 1
// try_pop succeeded: true

7.1.35.7. Streams carrying structured data

push_archive serializes any struct or variant through daslib/archive and enqueues the resulting bytes. gather_archive drains the stream, invoking the block once per message in FIFO order:

require daslib/archive

struct Command {
    id : int
    name : string
    payload : array<int>
}

with_job_que() {
    with_stream() $(var s : Stream?) {
        for (i in range(3)) {
            s |> push_archive(Command(id = i, name = "cmd", payload <- [i, i * 10]))
        }
        s |> gather_archive() $(var c : Command) {
            print("  [{c.id}] {c.name} payload = {c.payload}\n")
        }
    }
}
// output:
//   [0] cmd payload = [0, 0]
//   [1] cmd payload = [1, 10]
//   [2] cmd payload = [2, 20]

Related: peek reads without draining; pop_with_timeout blocks up to a bounded interval; push_batch enqueues multiple blobs atomically under a single lock.

7.1.35.8. Stream across a worker job

A job that captures a Stream must release its reference before exiting. Two idioms cover the common cases:

  • Use s |> release when completion is signalled separately (typically alongside a wait group, as in the example below).

  • Use s |> notify_and_release when the stream itself is the completion signal — open it with with_stream(count) and the consumer can observe readiness via s.isReady.

with_job_que() {
    with_stream() $(var s : Stream?) {
        with_wait_group(1) $(wg) {
            new_job() @() {
                for (i in range(4)) {
                    s |> push_archive(Command(id = i, name = "work", payload <- [i * i]))
                }
                s |> release
                wg |> done
            }
        }
        var got : array<int>
        s |> gather_archive() $(var c : Command) {
            got |> push(c.payload[0])
        }
        print("squares from worker: {got}\n")
    }
}
// output: squares from worker: [0, 1, 4, 9]

7.1.35.9. JobStatus

A JobStatus tracks completion of asynchronous work. with_job_status(count) creates a status expecting count notifications. Each call to notify_and_release decrements the counter. join blocks until all notifications arrive:

with_job_que() {
    with_job_status(3) $(status) {
        for (i in range(3)) {
            new_job() @() {
                status |> notify_and_release
            }
        }
        status |> join
    }
    print("All 3 jobs finished\n")
}
// output: All 3 jobs finished

7.1.35.10. Wait groups

with_wait_group is a convenience wrapper — it combines with_job_status and join. done is an alias for notify_and_release:

with_job_que() {
    with_wait_group(3) $(wg) {
        for (i in range(3)) {
            new_job() @() {
                wg |> done
            }
        }
    }
    print("all jobs complete\n")
}
// output: all jobs complete

7.1.35.11. LockBox

A LockBox holds a single struct value protected by a mutex. set stores a value and get reads it inside a block. These operations must run inside a job context (new_job or new_thread):

struct BoxCounter {
    value : int
}

with_job_que() {
    with_lock_box() $(box) {
        with_channel(1) $(ch) {
            new_job() @() {
                box |> set(BoxCounter(value = 42))
                box |> get() $(c : BoxCounter#) {
                    ch |> push_clone(IntVal(v = c.value))
                }
                box |> release
                ch |> notify_and_release
            }
            ch |> for_each_clone() $(val : IntVal#) {
                print("lockbox value: {val.v}\n")
            }
        }
    }
}
// output: lockbox value: 42

7.1.35.12. parallel_for

parallel_for splits a range [begin..end) into chunks and runs them on the job queue. A macro automatically wraps the block body in new_job — you write sequential code inside:

with_job_que() {
    let num_jobs = 3
    with_channel(num_jobs) $(ch) {
        parallel_for(0, 10, num_jobs) $(job_begin, job_end, wg) {
            for (i in range(job_begin, job_end)) {
                ch |> push_clone(IntVal(v = i * i))
            }
            ch |> notify_and_release
            wg |> done
        }
        var results : array<int>
        ch |> for_each_clone() $(val : IntVal#) {
            results |> push(val.v)
        }
        sort(results)
        print("squares: {results}\n")
    }
}
// output: squares: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

The block receives three parameters:

  • job_begin, job_end — the index sub-range for this chunk

  • wg — a JobStatus? that must be signaled via done(wg) when the chunk finishes

When using a channel inside parallel_for, set the channel count to num_jobs since each chunk calls notify_and_release once.

7.1.35.13. new_thread

new_thread creates a dedicated OS thread outside the thread pool. Use it for long-lived work that should not block the job queue:

with_job_que() {
    with_channel(1) $(ch) {
        new_thread() @() {
            ch |> push_clone(StringVal(s = "from thread"))
            ch |> notify_and_release
        }
        ch |> for_each_clone() $(msg : StringVal#) {
            print("{msg.s}\n")
        }
    }
}
// output: from thread

See also

Full source: tutorials/language/35_jobque.das

Previous tutorial: Entity Component System (DECS)

Next tutorial: Pointers

Lambda — lambda language reference.

Blocks — block language reference.