Title: | Run Interruptible Code Asynchronously |
---|---|
Description: | Takes an R expression and returns a Job object with a $stop() method which can be called to terminate the background job. Also provides timeouts and other mechanisms for automatically terminating a background job. The result of the expression is available synchronously via $result or asynchronously with callbacks or through the 'promises' package framework. |
Authors: | Daniel P. Smith [aut, cre] |
Maintainer: | Daniel P. Smith <[email protected]> |
License: | MIT + file LICENSE |
Version: | 1.5.1 |
Built: | 2025-02-28 18:30:44 UTC |
Source: | https://github.com/cmmr/jobqueue |
The Job object encapsulates an expression and its evaluation parameters. It also provides a way to check for and retrieve the result.
expr
R expression that will be run by this Job.
vars
Get or set - List of variables that will be placed into the expression's environment before evaluation.
reformat
Get or set - function (job)
for defining <Job>$result
.
signal
Get or set - Conditions to signal.
cpus
Get or set - Number of CPUs to reserve for evaluating expr
.
timeout
Get or set - Time limits to apply to this Job.
proxy
Get or set - Job to proxy in place of running expr
.
state
Get or set - The Job's state: 'created'
, 'submitted'
, 'queued'
,
'dispatched'
, 'starting'
, 'running'
, or 'done'
.
Assigning to <Job>$state
will trigger callback hooks.
output
Get or set - Job's raw output.
Assigning to <Job>$output
will change the Job's state to 'done'
.
result
Result of expr
. Will block until Job is finished.
hooks
Currently registered callback hooks as a named list of functions.
Set new hooks with <Job>$on()
.
is_done
TRUE
or FALSE
depending on if the Job's result is ready.
uid
A short string, e.g. 'J16'
, that uniquely identifies this Job.
new()
Creates a Job object defining how to run an expression on a background worker process.
Typically you won't need to call Job$new()
. Instead, create a Queue and use
<Queue>$run()
to generate Job objects.
Job$new( expr, vars = NULL, timeout = NULL, hooks = NULL, reformat = NULL, signal = FALSE, cpus = 1L, ... )
expr
A call or R expression wrapped in curly braces to evaluate on a
worker. Will have access to any variables defined by vars
, as well
as the Worker's globals
, packages
, and init
configuration.
See vignette('eval')
.
vars
A named list of variables to make available to expr
during
evaluation. Alternatively, an object that can be coerced to a named
list with as.list()
, e.g. named vector, data.frame, or environment.
Or a function (job)
that returns such an object.
timeout
A named numeric vector indicating the maximum number of
seconds allowed for each state the job passes through, or 'total' to
apply a single timeout from 'submitted' to 'done'. Or a
function (job)
that returns the same. Example:
timeout = c(total = 2.5, running = 1)
. See vignette('stops')
.
hooks
A named list of functions to run when the Job state changes,
of the form hooks = list(created = function (worker) {...})
. Or a
function (job)
that returns the same. Names of worker hooks are
typically 'created'
, 'submitted'
, 'queued'
, 'dispatched'
,
'starting'
, 'running'
, 'done'
, or '*'
(duplicates okay).
See vignette('hooks')
.
reformat
Set reformat = function (job)
to define what
<Job>$result
should return. The default, reformat = NULL
passes
<Job>$output
to <Job>$result
unchanged.
See vignette('results')
.
signal
Should calling <Job>$result
signal on condition objects?
When FALSE
, <Job>$result
will return the object without
taking additional action. Setting to TRUE
or a character vector of
condition classes, e.g. c('interrupt', 'error', 'warning')
, will
cause the equivalent of stop(<condition>)
to be called when those
conditions are produced. Alternatively, a function (job)
that
returns TRUE
or FALSE
. See vignette('results')
.
cpus
How many CPU cores to reserve for this Job. Or a
function (job)
that returns the same. Used to limit the number of
Jobs running simultaneously to respect <Queue>$max_cpus
. Does not
prevent a Job from using more CPUs than reserved.
...
Arbitrary named values to add to the returned Job object.
A Job object.
print()
Print method for a Job.
Job$print(...)
...
Arguments are not used currently.
This Job, invisibly.
on()
Attach a callback function to execute when the Job enters state
.
Job$on(state, func)
state
The name of a Job state. Typically one of:
'*'
- Every time the state changes.
'.next'
- Only one time, the next time the state changes.
'created'
- After Job$new()
initialization.
'submitted'
- After <Job>$queue
is assigned.
'queued'
- After stop_id
and copy_id
are resolved.
'dispatched'
- After <Job>$worker
is assigned.
'starting'
- Before evaluation begins.
'running'
- After evaluation begins.
'done'
- After <Job>$output
is assigned.
Custom states can also be specified.
func
A function that accepts a Job object as input. You can call
<Job>$stop()
or edit <Job>$
values and the changes will be
persisted (since Jobs are reference class objects). You can also
edit/stop other queued jobs by modifying the Jobs in
<Job>$queue$jobs
. Return value is ignored.
A function that when called removes this callback from the Job.
wait()
Blocks until the Job enters the given state.
Job$wait(state = "done", timeout = NULL)
state
The name of a Job state. Typically one of:
'*'
- Every time the state changes.
'.next'
- Only one time, the next time the state changes.
'created'
- After Job$new()
initialization.
'submitted'
- After <Job>$queue
is assigned.
'queued'
- After stop_id
and copy_id
are resolved.
'dispatched'
- After <Job>$worker
is assigned.
'starting'
- Before evaluation begins.
'running'
- After evaluation begins.
'done'
- After <Job>$output
is assigned.
Custom states can also be specified.
timeout
Stop the Job if it takes longer than this number of seconds, or NULL
.
This Job, invisibly.
stop()
Stop this Job. If the Job is running, its Worker will be restarted.
Job$stop(reason = "job stopped by user", cls = NULL)
reason
A message to include in the 'interrupt' condition object that will be returned as the Job's result. Or a condition object.
cls
Character vector of additional classes to prepend to
c('interrupt', 'condition')
.
This Job, invisibly.
Jobs go in. Results come out.
hooks
A named list of currently registered callback hooks.
jobs
Get or set - List of Jobs currently managed by this Queue.
state
The Queue's state: 'starting'
, 'idle'
, 'busy'
, 'stopped'
, or 'error.'
uid
Get or set - Unique identifier, e.g. 'Q1'
.
tmp
The Queue's temporary directory.
workers
Get or set - List of Workers used for processing Jobs.
cnd
The error that caused the Queue to stop.
new()
Creates a pool of background processes for handling $run()
and
$submit()
calls. These workers are initialized according to the
globals
, packages
, and init
arguments.
Queue$new( globals = NULL, packages = NULL, namespace = NULL, init = NULL, max_cpus = availableCores(), workers = ceiling(max_cpus * 1.2), timeout = NULL, hooks = NULL, reformat = NULL, signal = FALSE, cpus = 1L, stop_id = NULL, copy_id = NULL )
globals
A named list of variables that all <Job>$expr
s will have
access to. Alternatively, an object that can be coerced to a named
list with as.list()
, e.g. named vector, data.frame, or environment.
packages
Character vector of package names to load on workers.
namespace
The name of a package to attach to the worker's environment.
init
A call or R expression wrapped in curly braces to evaluate on
each worker just once, immediately after start-up. Will have access
to variables defined by globals
and assets from packages
and
namespace
. Returned value is ignored.
max_cpus
Total number of CPU cores that can be reserved by all
running Jobs (sum(<Job>$cpus)
). Does not enforce limits on actual
CPU utilization.
workers
How many background Worker processes to start. Set to more
than max_cpus
to enable standby Workers to quickly swap out with
Workers that need to restart.
timeout, hooks, reformat, signal, cpus, stop_id, copy_id
Defaults for this Queue's $run()
method. Here only, stop_id
and copy_id
must be either a function (job)
or NULL
.
hooks
can set queue, worker, and/or job hooks - see the
"Attaching" section in vignette('hooks')
.
A Queue
object.
print()
Print method for a Queue
.
Queue$print(...)
...
Arguments are not used currently.
run()
Creates a Job object and submits it to the queue for running.
Any NA
arguments will be replaced with their value from Queue$new()
.
Queue$run( expr, vars = list(), timeout = NA, hooks = NA, reformat = NA, signal = NA, cpus = NA, stop_id = NA, copy_id = NA, ... )
expr
A call or R expression wrapped in curly braces to evaluate on a
worker. Will have access to any variables defined by vars
, as well
as the Worker's globals
, packages
, and init
configuration.
See vignette('eval')
.
vars
A named list of variables to make available to expr
during
evaluation. Alternatively, an object that can be coerced to a named
list with as.list()
, e.g. named vector, data.frame, or environment.
Or a function (job)
that returns such an object.
timeout
A named numeric vector indicating the maximum number of
seconds allowed for each state the job passes through, or 'total' to
apply a single timeout from 'submitted' to 'done'. Can also limit the
'starting' state for Workers. A function (job)
can be used in place
of a number. Example: timeout = c(total = 2.5, running = 1)
.
See vignette('stops')
.
hooks
A named list of functions to run when the Job state changes,
of the form hooks = list(created = function (worker) {...})
. Or a
function (job)
that returns the same. Names of worker hooks are
typically 'created'
, 'submitted'
, 'queued'
, 'dispatched'
,
'starting'
, 'running'
, 'done'
, or '*'
(duplicates okay).
See vignette('hooks')
.
reformat
Set reformat = function (job)
to define what
<Job>$result
should return. The default, reformat = NULL
passes
<Job>$output
to <Job>$result
unchanged.
See vignette('results')
.
signal
Should calling <Job>$result
signal on condition objects?
When FALSE
, <Job>$result
will return the object without
taking additional action. Setting to TRUE
or a character vector of
condition classes, e.g. c('interrupt', 'error', 'warning')
, will
cause the equivalent of stop(<condition>)
to be called when those
conditions are produced. Alternatively, a function (job)
that
returns TRUE
or FALSE
. See vignette('results')
.
cpus
How many CPU cores to reserve for this Job. Or a
function (job)
that returns the same. Used to limit the number of
Jobs running simultaneously to respect <Queue>$max_cpus
. Does not
prevent a Job from using more CPUs than reserved.
stop_id
If an existing Job in the Queue has the same stop_id
,
that Job will be stopped and return an 'interrupt' condition object
as its result. stop_id
can also be a function (job)
that returns
the stop_id
to assign to a given Job. A stop_id
of NULL
disables this feature. See vignette('stops')
.
copy_id
If an existing Job in the Queue has the same copy_id
,
the newly submitted Job will become a "proxy" for that earlier Job,
returning whatever result the earlier Job returns. copy_id
can also
be a function (job)
that returns the copy_id
to assign to a given
Job. A copy_id
of NULL
disables this feature.
See vignette('stops')
.
...
Arbitrary named values to add to the returned Job object.
The new Job object.
submit()
Adds a Job to the Queue for running on a background process.
Queue$submit(job)
job
A Job object, as created by Job$new()
.
This Queue, invisibly.
wait()
Blocks until the Queue enters the given state.
Queue$wait(state = "idle", timeout = NULL, signal = TRUE)
state
The name of a Queue state. Typically one of:
'*'
- Every time the state changes.
'.next'
- Only one time, the next time the state changes.
'starting'
- Workers are starting.
'idle'
- All workers are ready/idle.
'busy'
- At least one worker is busy.
'stopped'
- Shutdown is complete.
timeout
Stop the Queue if it takes longer than this number of seconds, or NULL
.
signal
Raise an error if encountered (will also be recorded in <Queue>$cnd
).
This Queue, invisibly.
on()
Attach a callback function to execute when the Queue enters state
.
Queue$on(state, func)
state
The name of a Queue state. Typically one of:
'*'
- Every time the state changes.
'.next'
- Only one time, the next time the state changes.
'starting'
- Workers are starting.
'idle'
- All workers are ready/idle.
'busy'
- At least one worker is busy.
'stopped'
- Shutdown is complete.
func
A function that accepts a Queue object as input. Return value is ignored.
A function that when called removes this callback from the Queue.
stop()
Stop all jobs and workers.
Queue$stop(reason = "job queue shut down by user", cls = NULL)
reason
Passed to <Job>$stop()
for any Jobs currently managed by
this Queue.
cls
Passed to <Job>$stop()
for any Jobs currently managed by
this Queue.
This Queue, invisibly.
Where Job expressions are evaluated.
hooks
A named list of currently registered callback hooks.
job
The currently running Job.
ps
The ps::ps_handle()
object for the background process.
state
The Worker's state: 'starting'
, 'idle'
, 'busy'
, or 'stopped'
.
uid
A short string, e.g. 'W11'
, that uniquely identifies this Worker.
tmp
The Worker's temporary directory.
cnd
The error that caused the Worker to stop.
new()
Creates a background R process for running Jobs.
Worker$new( globals = NULL, packages = NULL, namespace = NULL, init = NULL, hooks = NULL, wait = TRUE, timeout = Inf )
globals
A named list of variables that all <Job>$expr
s will have
access to. Alternatively, an object that can be coerced to a named
list with as.list()
, e.g. named vector, data.frame, or environment.
packages
Character vector of package names to load on workers.
namespace
The name of a package to attach to the worker's environment.
init
A call or R expression wrapped in curly braces to evaluate on
each worker just once, immediately after start-up. Will have access
to variables defined by globals
and assets from packages
and
namespace
. Returned value is ignored.
hooks
A named list of functions to run when the Worker state
changes, of the form hooks = list(idle = function (worker) {...})
.
Names of worker hooks are typically starting
, idle
, busy
,
stopped
, or '*'
(duplicates okay). See vignette('hooks')
.
wait
If TRUE
, blocks until the Worker is 'idle'. If FALSE
, the
Worker object is returned in the 'starting' state.
timeout
How long to wait for the worker to finish starting (in seconds).
If NA
, defaults to the Worker$new()
argument.
A Worker object.
print()
Print method for a Worker
.
Worker$print(...)
...
Arguments are not used currently.
The Worker, invisibly.
start()
Restarts a stopped Worker.
Worker$start(wait = TRUE, timeout = NA)
wait
If TRUE
, blocks until the Worker is 'idle'. If FALSE
, the
Worker object is returned in the 'starting' state.
timeout
How long to wait for the worker to finish starting (in seconds).
If NA
, defaults to the Worker$new()
argument.
The Worker, invisibly.
stop()
Stops a Worker by terminating the background process and calling
<Job>$stop(reason)
on any Jobs currently assigned to this Worker.
Worker$stop(reason = "worker stopped by user", cls = NULL)
reason
Passed to <Job>$stop()
for any Jobs currently managed by
this Worker.
cls
Passed to <Job>$stop()
for any Jobs currently managed by this
Worker.
The Worker, invisibly.
restart()
Restarts a Worker by calling <Worker>$stop(reason)
and
<Worker>$start()
in succession.
Worker$restart( wait = TRUE, timeout = NA, reason = "restarting worker", cls = NULL )
wait
If TRUE
, blocks until the Worker is 'idle'. If FALSE
, the
Worker object is returned in the 'starting' state.
timeout
How long to wait for the worker to finish starting (in seconds).
If NA
, defaults to the Worker$new()
argument.
reason
Passed to <Job>$stop()
for any Jobs currently managed by
this Worker.
cls
Passed to <Job>$stop()
for any Jobs currently managed by this
Worker.
The Worker, invisibly.
on()
Attach a callback function to execute when the Worker enters state
.
Worker$on(state, func)
state
The name of a Worker state. Typically one of:
'*'
- Every time the state changes.
'.next'
- Only one time, the next time the state changes.
'starting'
- Waiting for the background process to load.
'idle'
- Waiting for Jobs to be $run()
.
'busy'
- While a Job is running.
'stopped'
- After <Worker>$stop()
is called.
func
A function that accepts a Worker object as input. You can call
<Worker>$stop()
and other <Worker>$
methods.
A function that when called removes this callback from the Worker.
wait()
Blocks until the Worker enters the given state.
Worker$wait(state = "idle", timeout = Inf, signal = TRUE)
state
The name of a Worker state. Typically one of:
'*'
- Every time the state changes.
'.next'
- Only one time, the next time the state changes.
'starting'
- Waiting for the background process to load.
'idle'
- Waiting for Jobs to be $run()
.
'busy'
- While a Job is running.
'stopped'
- After <Worker>$stop()
is called.
timeout
Stop the Worker if it takes longer than this number of seconds.
signal
Raise an error if encountered (will also be recorded in <Worker>$cnd
).
This Worker, invisibly.
run()
Assigns a Job to this Worker for evaluation on the background process.
Worker$run(job)
job
A Job object, as created by Job$new()
.
This Worker, invisibly.