| Title: | Run Interruptible Code Asynchronously |
|---|---|
| Description: | Takes an R expression and returns a job object with a $stop() method which can be called to terminate the background job. Also provides timeouts and other mechanisms for automatically terminating a background job. The result of the expression is available synchronously via $result or asynchronously with callbacks or through the 'promises' package framework. |
| Authors: | Daniel P. Smith [aut, cre] (ORCID: <https://orcid.org/0000-0002-2479-2044>), Alkek Center for Metagenomics and Microbiome Research [cph, fnd] |
| Maintainer: | Daniel P. Smith <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 1.7.0.9000 |
| Built: | 2026-06-03 09:42:28 UTC |
| Source: | https://github.com/cmmr/jobqueue |
The job object encapsulates an expression and its evaluation
parameters. It also provides a way to check for and retrieve the result.
exprR expression that will be run by this job.
varsGet or set - List of variables that will be placed into the expression's environment before evaluation.
reformatGet or set - function (job) for defining <job>$result.
signalGet or set - Conditions to signal.
cpusGet or set - Number of CPUs to reserve for evaluating expr.
timeoutGet or set - Time limits to apply to this job.
proxyGet or set - job to proxy in place of running expr.
stateGet or set - The job's state: 'created', 'submitted',
'queued', 'dispatched', 'starting', 'running', or 'done'.
Assigning to <job>$state will trigger callback hooks.
outputGet or set - job's raw output.
Assigning to <job>$output will change the job's state
to 'done'.
jobqueueworkerresultResult of expr. Will block until job is finished.
hooksCurrently registered callback hooks as a named list of functions.
Set new hooks with <job>$on().
is_doneTRUE or FALSE depending on if the job's result is
ready.
uidA short string, e.g. 'J16', that uniquely identifies this
job.
new()
Creates a job object defining how to run an expression on
a background worker process.
Typically you won't need to call job_class$new(). Instead, create a
jobqueue and use <jobqueue>$run() to generate
job objects.
job_class$new( expr, vars = NULL, timeout = NULL, hooks = NULL, reformat = NULL, signal = FALSE, cpus = 1L, ... )
exprA call or R expression wrapped in curly braces to evaluate on a
worker. Will have access to any variables defined
by vars, as well as the worker's globals,
packages, and init configuration. See vignette('eval').
varsA named list of variables to make available to expr during
evaluation. Alternatively, an object that can be coerced to a named
list with as.list(), e.g. named vector, data.frame, or environment.
Or a function (job) that returns such an object.
timeoutA named numeric vector indicating the maximum number of
seconds allowed for each state the job passes through,
or 'total' to apply a single timeout from 'submitted' to 'done'. Or a
function (job) that returns the same. Example:
timeout = c(total = 2.5, running = 1). See vignette('stops').
hooksA named list of functions to run when the job
state changes, of the form
hooks = list(created = function (worker) {...}). Or a
function (job) that returns the same. Names of
worker hooks are typically 'created',
'submitted', 'queued', 'dispatched', 'starting', 'running',
'done', or '*' (duplicates okay). See vignette('hooks').
reformatSet reformat = function (job) to define what
<job>$result should return. The default, reformat = NULL passes
<job>$output to <job>$result unchanged.
See vignette('results').
signalShould calling <job>$result signal on condition objects?
When FALSE, <job>$result will return the object without
taking additional action. Setting to TRUE or a character vector of
condition classes, e.g. c('interrupt', 'error', 'warning'), will
cause the equivalent of stop(<condition>) to be called when those
conditions are produced. Alternatively, a function (job) that
returns TRUE or FALSE. See vignette('results').
cpusHow many CPU cores to reserve for this job. Or a
function (job) that returns the same. Used to limit the number of
jobs running simultaneously to respect
<jobqueue>$max_cpus. Does not prevent a job from
using more CPUs than reserved.
...Arbitrary named values to add to the returned job
object.
A job object.
print()
Print method for a job.
job_class$print(...)
...Arguments are not used currently.
This job, invisibly.
on()
Attach a callback function to execute when the job enters
state.
job_class$on(state, func)
stateThe name of a job state. Typically one of:
'*' - Every time the state changes.
'.next' - Only one time, the next time the state changes.
'created' - After job_class$new() initialization.
'submitted' - After <job>$jobqueue is assigned.
'queued' - After stop_id and copy_id are resolved.
'dispatched' - After <job>$worker is assigned.
'starting' - Before evaluation begins.
'running' - After evaluation begins.
'done' - After <job>$output is assigned.
Custom states can also be specified.
funcA function that accepts a job object as input.
You can call <job>$stop() or edit <job>$ values and the changes
will be persisted (since jobs are reference class
objects). You can also edit/stop other queued jobs by
modifying the jobs in <job>$jobqueue$jobs. Return
value is ignored.
A function that when called removes this callback from the
job.
wait()
Blocks until the job enters the given state.
job_class$wait(state = "done", timeout = NULL)
stateThe name of a job state. Typically one of:
'*' - Every time the state changes.
'.next' - Only one time, the next time the state changes.
'created' - After job_class$new() initialization.
'submitted' - After <job>$jobqueue is assigned.
'queued' - After stop_id and copy_id are resolved.
'dispatched' - After <job>$worker is assigned.
'starting' - Before evaluation begins.
'running' - After evaluation begins.
'done' - After <job>$output is assigned.
Custom states can also be specified.
timeoutStop the job if it takes longer than this
number of seconds, or NULL.
This job, invisibly.
stop()
Stop this job. If the job is running, its
worker will be restarted.
job_class$stop(reason = "job stopped by user", cls = NULL)
reasonA message to include in the 'interrupt' condition object that
will be returned as the job's result. Or a condition
object.
clsCharacter vector of additional classes to prepend to
c('interrupt', 'condition').
This job, invisibly.
Jobs go in. Results come out.
jobqueue( globals = NULL, packages = NULL, namespace = NULL, init = NULL, max_cpus = availableCores(), workers = ceiling(max_cpus * 1.2), timeout = NULL, hooks = NULL, reformat = NULL, signal = FALSE, cpus = 1L, stop_id = NULL, copy_id = NULL )jobqueue( globals = NULL, packages = NULL, namespace = NULL, init = NULL, max_cpus = availableCores(), workers = ceiling(max_cpus * 1.2), timeout = NULL, hooks = NULL, reformat = NULL, signal = FALSE, cpus = 1L, stop_id = NULL, copy_id = NULL )
globals |
A named list of variables that all |
packages |
Character vector of package names to load on
|
namespace |
The name of a package to attach to the
|
init |
A call or R expression wrapped in curly braces to evaluate on
each |
max_cpus |
Total number of CPU cores that can be reserved by all
running |
workers |
How many background |
timeout |
A named numeric vector indicating the maximum number of
seconds allowed for each state the |
hooks |
A named list of functions to run when the |
reformat |
Set |
signal |
Should calling |
cpus |
The default number of CPU cores per |
stop_id |
If an existing |
copy_id |
If an existing |
A jobqueue object.
jq <- jobqueue(globals = list(N = 42), workers = 2) print(jq) job <- jq$run({ paste("N is", N) }) job$result jq$stop()jq <- jobqueue(globals = list(N = 42), workers = 2) print(jq) job <- jq$run({ paste("N is", N) }) job$result jq$stop()
Jobs go in. Results come out.
hooksA named list of currently registered callback hooks.
jobsGet or set - List of jobs currently managed by this
jobqueue.
stateThe jobqueue's state: 'starting', 'idle',
'busy', 'stopped', or 'error.'
uidA short string, e.g. 'Q1', that uniquely identifies this
jobqueue.
tmpThe jobqueue's temporary directory.
workerscndThe error that caused the jobqueue to stop.
new()
Creates a pool of background processes for handling $run() and
$submit() calls. These workers are initialized
according to the globals, packages, and init arguments.
jobqueue_class$new( globals = NULL, packages = NULL, namespace = NULL, init = NULL, max_cpus = availableCores(), workers = ceiling(max_cpus * 1.2), timeout = NULL, hooks = NULL, reformat = NULL, signal = FALSE, cpus = 1L, stop_id = NULL, copy_id = NULL )
globalsA named list of variables that all <job>$exprs will have
access to. Alternatively, an object that can be coerced to a named
list with as.list(), e.g. named vector, data.frame, or environment.
packagesCharacter vector of package names to load on
workers.
namespaceThe name of a package to attach to the
worker's environment.
initA call or R expression wrapped in curly braces to evaluate on
each worker just once, immediately after start-up.
Will have access to variables defined by globals and assets from
packages and namespace. Returned value is ignored.
max_cpusTotal number of CPU cores that can be reserved by all
running jobs (sum(<job>$cpus)). Does not enforce
limits on actual CPU utilization.
workersHow many background worker processes to
start. Set to more than max_cpus to enable standby
workers to quickly swap out with
workers that need to restart.
timeout, hooks, reformat, signal, cpus, stop_id, copy_idDefaults for this jobqueue's $run() method.
Here only, stop_id and copy_id must be either a
function (job) or NULL. hooks can set
jobqueue, worker, and/or
job hooks - see the "Attaching" section in
vignette('hooks').
A jobqueue object.
print()
Print method for a jobqueue.
jobqueue_class$print(...)
...Arguments are not used currently.
run()
Creates a job object and submits it to the
jobqueue for running. Any NA arguments will be
replaced with their value from jobqueue_class$new().
jobqueue_class$run( expr, vars = list(), timeout = NA, hooks = NA, reformat = NA, signal = NA, cpus = NA, stop_id = NA, copy_id = NA, ... )
exprA call or R expression wrapped in curly braces to evaluate on a
worker. Will have access to any variables defined
by vars, as well as the jobqueue's globals,
packages, and init configuration. See vignette('eval').
varsA named list of variables to make available to expr during
evaluation. Alternatively, an object that can be coerced to a named
list with as.list(), e.g. named vector, data.frame, or environment.
Or a function (job) that returns such an object.
timeoutA named numeric vector indicating the maximum number of
seconds allowed for each state the job passes through,
or 'total' to apply a single timeout from 'submitted' to 'done'. Can
also limit the 'starting' state for workers. A
function (job) can be used in place of a number.
Example: timeout = c(total = 2.5, running = 1).
See vignette('stops').
hooksA named list of functions to run when the job
state changes, of the form
hooks = list(created = function (worker) {...}). Or a
function (job) that returns the same. Names of
worker hooks are typically 'created',
'submitted', 'queued', 'dispatched', 'starting', 'running',
'done', or '*' (duplicates okay). See vignette('hooks').
reformatSet reformat = function (job) to define what
<job>$result should return. The default, reformat = NULL passes
<job>$output to <job>$result unchanged.
See vignette('results').
signalShould calling <job>$result signal on condition objects?
When FALSE, <job>$result will return the object without
taking additional action. Setting to TRUE or a character vector of
condition classes, e.g. c('interrupt', 'error', 'warning'), will
cause the equivalent of stop(<condition>) to be called when those
conditions are produced. Alternatively, a function (job) that
returns TRUE or FALSE. See vignette('results').
cpusHow many CPU cores to reserve for this job. Or a
function (job) that returns the same. Used to limit the number of
jobs running simultaneously to respect
<jobqueue>$max_cpus. Does not prevent a job from
using more CPUs than reserved.
stop_idIf an existing job in the
jobqueue has the same stop_id, that
job will be stopped and return an 'interrupt'
condition object as its result. stop_id can also be a
function (job) that returns the stop_id to assign to a given
job. A stop_id of NULL disables this feature.
See vignette('stops').
copy_idIf an existing job in the
jobqueue has the same copy_id, the newly
submitted job will become a "proxy" for that earlier
job, returning whatever result the earlier
job returns. copy_id can also be a function (job)
that returns the copy_id to assign to a given job.
A copy_id of NULL disables this feature.
See vignette('stops').
...Arbitrary named values to add to the returned job
object.
The new job object.
submit()
Adds a job to the jobqueue for
running on a background process.
jobqueue_class$submit(job)
jobA job object, as created by job_class$new().
This jobqueue, invisibly.
wait()
Blocks until the jobqueue enters the given state.
jobqueue_class$wait(state = "idle", timeout = NULL, signal = TRUE)
This jobqueue, invisibly.
on()
Attach a callback function to execute when the
jobqueue enters state.
jobqueue_class$on(state, func)
A function that when called removes this callback from the
jobqueue.
stop()
jobqueue_class$stop(reason = "jobqueue shut down by user", cls = NULL)
This jobqueue, invisibly.
Where job expressions are evaluated.
hooksA named list of currently registered callback hooks.
jobThe currently running job.
psThe ps::ps_handle() object for the background process.
stateThe worker's state: 'starting', 'idle', 'busy',
or 'stopped'.
uidA short string, e.g. 'W11', that uniquely identifies this
worker.
tmpThe worker's temporary directory.
cndThe error that caused the worker to stop.
new()
Creates a background R process for running jobs.
worker_class$new( globals = NULL, packages = NULL, namespace = NULL, init = NULL, hooks = NULL, wait = TRUE, timeout = Inf )
globalsA named list of variables that all <job>$exprs will have
access to. Alternatively, an object that can be coerced to a named
list with as.list(), e.g. named vector, data.frame, or environment.
packagesCharacter vector of package names to load on
workers.
namespaceThe name of a package to attach to the
worker's environment.
initA call or R expression wrapped in curly braces to evaluate on
each worker just once, immediately after start-up.
Will have access to variables defined by globals and assets from
packages and namespace. Returned value is ignored.
hooksA named list of functions to run when the
worker state changes, of the form
hooks = list(idle = function (worker) {...}). Names of
worker hooks are typically starting, idle,
busy, stopped, or '*' (duplicates okay).
See vignette('hooks').
waitIf TRUE, blocks until the worker is 'idle'.
If FALSE, the worker object is returned in the
'starting' state.
timeoutHow long to wait for the worker to finish
starting (in seconds). If NA, defaults to the worker_class$new()
argument.
A worker object.
print()
Print method for a worker.
worker_class$print(...)
...Arguments are not used currently.
The worker, invisibly.
start()
Restarts a stopped worker.
worker_class$start(wait = TRUE, timeout = NA)
The worker, invisibly.
stop()
Stops a worker by terminating the background process
and calling <job>$stop(reason) on any jobs currently
assigned to this worker.
worker_class$stop(reason = "worker stopped by user", cls = NULL)
The worker, invisibly.
restart()
Restarts a worker by calling <worker>$stop(reason)
and <worker>$start() in succession.
worker_class$restart( wait = TRUE, timeout = NA, reason = "restarting worker", cls = NULL )
waitIf TRUE, blocks until the worker is 'idle'.
If FALSE, the worker object is returned in the
'starting' state.
timeoutHow long to wait for the worker to finish
starting (in seconds). If NA, defaults to the worker_class$new()
argument.
reasonPassed to <job>$stop() for any jobs
currently managed by this worker.
clsPassed to <job>$stop() for any jobs currently
managed by this worker.
The worker, invisibly.
on()
Attach a callback function to execute when the worker
enters state.
worker_class$on(state, func)
A function that when called removes this callback from the
worker.
wait()
Blocks until the worker enters the given state.
worker_class$wait(state = "idle", timeout = Inf, signal = TRUE)
This worker, invisibly.
run()
Assigns a job to this worker for
evaluation on the background process.
worker_class$run(job)
jobA job object, as created by job_class$new().
This worker, invisibly.