9.1. Jobs and threads¶
The JOBQUE module provides low-level job queue and threading primitives.
It includes thread-safe channels for inter-thread communication, lock boxes
for shared data access, job status tracking, and fine-grained thread
management. For higher-level job abstractions, see jobque_boost.
See tutorial_jobque for a hands-on tutorial.
All functions and symbols are in “jobque” module, use require to get access to it.
require jobque
Example:
require jobque
[export]
def main() {
with_atomic32() $(counter) {
counter |> set(10)
print("value = {counter |> get}\n")
let after_inc = counter |> inc
print("after inc = {after_inc}\n")
let after_dec = counter |> dec
print("after dec = {after_dec}\n")
}
}
// output:
// value = 10
// after inc = 11
// after dec = 10
9.1.1. Handled structures¶
- LockBox¶
Lockbox. Similar to channel, only for single object.
- Stream¶
- Stream.isEmpty(): bool¶
Whether the channel or pipe contains no remaining elements.
- Stream.total(): int¶
Total number of elements that have been added to the pipe.
- Properties:
isEmpty : bool
total : int
FIFO pipe of opaque byte buffers shared between contexts and threads. Built on top of JobStatus so it supports refcounting and join. Unlike Channel, a Stream stores raw bytes copied into runtime-owned memory, so the consumer can safely free a popped buffer even if the producer’s context is long gone. Typically used together with the push_archive / pop_archive / gather_archive helpers in jobque_boost to ship serialized command records across thread boundaries.
- Atomic64¶
Atomic 64 bit integer.
- Atomic32¶
Atomic 32 bit integer.
- JobStatus¶
- JobStatus.isReady(): bool¶
Whether the job has completed execution.
- JobStatus.isValid(): bool¶
Whether the job status object refers to a valid, active job.
- JobStatus.size(): int¶
Returns the current entry count of the JobStatus or Channel.
- Properties:
isReady : bool
isValid : bool
size : int
Job status indicator (ready or not, as well as entry count).
- Channel¶
- Channel.isEmpty(): bool¶
Whether the channel or pipe contains no remaining elements.
- Channel.total(): int¶
Total number of elements that have been added to the pipe.
- Properties:
isEmpty : bool
total : int
Channel provides a way to communicate between multiple contexts, including threads and jobs. Channel has internal entry count.
9.1.2. Channel, JobStatus, Lockbox, Stream¶
- add_ref(status: JobStatus?)¶
Increases the reference count of a JobStatus or Channel, preventing premature deletion.
- Arguments:
status : JobStatus? implicit
- append(channel: JobStatus?; size: int): int¶
Increases the entry count of the channel, signaling that new work has been added.
- Arguments:
channel : JobStatus? implicit
size : int
- channel_create(): Channel?¶
Warning
This is unsafe operation.
Creates a new Channel for inter-thread communication and synchronization.
- channel_remove(channel: Channel?&)¶
Warning
This is unsafe operation.
Destroys a Channel and releases its resources.
- Arguments:
channel : Channel?& implicit
- job_status_create(): JobStatus?¶
Creates a new JobStatus object for tracking the completion state of asynchronous jobs.
- job_status_remove(jobStatus: JobStatus?&)¶
Warning
This is unsafe operation.
Destroys a JobStatus object and releases its resources.
- Arguments:
jobStatus : JobStatus?& implicit
- join(job: JobStatus?)¶
Blocks the current thread until the job or channel’s entry count reaches zero, indicating all work is complete.
- Arguments:
job : JobStatus? implicit
- lock_box_create(): LockBox?¶
Creates a new LockBox for thread-safe shared access to a single value.
- lock_box_remove(box: LockBox?&)¶
Warning
This is unsafe operation.
Destroys a LockBox and releases its resources.
- Arguments:
box : LockBox?& implicit
- notify(job: JobStatus?)¶
Decreases the channel’s entry count, signaling that one unit of work has completed.
Use notify when the caller does not own a reference to the channel — for example
when a Channel? is passed as a plain function argument via invoke_in_context.
In that scenario no lambda captures the channel, so no extra reference was added and
there is nothing to release.
Compare with notify_and_release, which additionally releases a reference and should
be used inside lambdas that captured the channel (adding a reference).
- Arguments:
job : JobStatus? implicit
- notify_and_release(job: JobStatus?&)¶
Decreases the entry count and the reference count of a Channel or JobStatus
in a single operation. After the call the channel/status variable is set to null.
Use notify_and_release inside lambdas that captured the channel. Capturing adds a
reference, so the lambda must release it when done. This function combines
notify + release into one atomic step and nulls the variable to prevent
accidental reuse.
If the caller does not own a reference (e.g. the channel was passed as a plain
argument via invoke_in_context, with no lambda capture), use notify instead —
calling notify_and_release in that case would release a reference the caller never
added, leading to a premature free.
- Arguments:
job : JobStatus?& implicit
- release(status: JobStatus?&)¶
Decreases the reference count of a JobStatus or Channel; the object is deleted when the count reaches zero.
- Arguments:
status : JobStatus?& implicit
- stream_create(): Stream?¶
Warning
This is unsafe operation.
Creates a new empty Stream, a FIFO of opaque byte buffers owned by the runtime. Payload bytes are copied into the stream on push and produced as non-owning views on pop / gather. Either side can safely destroy the stream — memory belongs to the stream itself rather than to a particular context.
- stream_remove(stream: Stream?&)¶
Warning
This is unsafe operation.
Releases the caller’s reference to a Stream and sets the local pointer to null. When the last reference is released the stream is destroyed along with any buffered byte blobs it still owns.
- Arguments:
stream : Stream?& implicit
9.1.3. Queries¶
- get_total_hw_jobs(): int¶
Returns the total number of hardware threads allocated to the job system.
- get_total_hw_threads(): int¶
Returns the total number of hardware threads available on the system.
- is_job_que_shutting_down(): bool¶
Returns true if the job queue infrastructure is shutting down or has not been initialized.
9.1.4. Internal invocations¶
- new_debugger_thread(block: block<():void>)¶
Creates a new debugger tick thread for servicing debug connections.
- Arguments:
block : block<void> implicit
- new_job_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)¶
Clones the current context, moves the attached lambda into it, and submits it to the job queue.
- Arguments:
lambda : lambda<void>
function : function<void>
lambdaSize : int
- new_thread_invoke(lambda: lambda<():void>; function: function<():void>; lambdaSize: int)¶
Clones the current context, moves the attached lambda into it, and runs it on a new dedicated thread.
- Arguments:
lambda : lambda<void>
function : function<void>
lambdaSize : int
9.1.5. Construction¶
9.1.5.1. with_channel¶
- with_channel(block: block<(Channel?):void>)¶
Creates a Channel scoped to the given block and automatically destroys it afterward.
- Arguments:
block : block<( Channel?):void> implicit
- with_channel(count: int; block: block<(Channel?):void>)
- with_job_que(block: block<():void>)¶
Ensures job queue infrastructure is initialized for the duration of the block.
- Arguments:
block : block<void> implicit
- with_job_status(total: int; block: block<(JobStatus?):void>)¶
Creates a JobStatus scoped to the given block and automatically destroys it afterward.
- Arguments:
total : int
block : block<( JobStatus?):void> implicit
- with_lock_box(block: block<(LockBox?):void>)¶
Creates a LockBox scoped to the given block and automatically destroys it afterward.
- Arguments:
block : block<( LockBox?):void> implicit
9.1.5.2. with_stream¶
- with_stream(block: block<(Stream?):void>)¶
Creates an unbounded Stream, invokes the given block with a stable pointer to it, and automatically releases the reference when the block returns. The simplest way to scope a stream to a producer/consumer pair.
- Arguments:
block : block<( Stream?):void> implicit
- with_stream(count: int; block: block<(Stream?):void>)
9.1.6. Atomic¶
- atomic32_create(): Atomic32?¶
Creates an Atomic32 — a thread-safe 32-bit integer for lock-free concurrent access.
- atomic32_remove(atomic: Atomic32?&)¶
Warning
This is unsafe operation.
Destroys an Atomic32 and releases its resources.
- Arguments:
atomic : Atomic32?& implicit
- atomic64_create(): Atomic64?¶
Creates an Atomic64 — a thread-safe 64-bit integer for lock-free concurrent access.
- atomic64_remove(atomic: Atomic64?&)¶
Warning
This is unsafe operation.
Destroys an Atomic64 and releases its resources.
- Arguments:
atomic : Atomic64?& implicit
9.1.6.1. dec¶
- dec(atomic: Atomic32?): int¶
Atomically decrements the integer value and returns the result.
- Arguments:
atomic : Atomic32? implicit
- dec(atomic: Atomic64?): int64
9.1.6.2. get¶
- get(atomic: Atomic32?): int¶
Returns the current value of the atomic integer.
- Arguments:
atomic : Atomic32? implicit
- get(atomic: Atomic64?): int64
9.1.6.3. inc¶
- inc(atomic: Atomic32?): int¶
Atomically increments the integer value and returns the result.
- Arguments:
atomic : Atomic32? implicit
- inc(atomic: Atomic64?): int64
9.1.6.4. set¶
- set(atomic: Atomic32?; value: int)¶
Sets the atomic integer to the specified value.
- Arguments:
atomic : Atomic32? implicit
value : int
- set(atomic: Atomic64?; value: int64)
- with_atomic32(block: block<(Atomic32?):void>)¶
Creates an Atomic32 scoped to the given block and automatically destroys it afterward.
- Arguments:
block : block<( Atomic32?):void> implicit
- with_atomic64(block: block<(Atomic64?):void>)¶
Creates an Atomic64 scoped to the given block and automatically destroys it afterward.
- Arguments:
block : block<( Atomic64?):void> implicit