--- title: "Interrupts" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Interrupts} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ## Overview Interrupts in webqueue fall into three categories: 1. **Timeout** - set before the request starts processing. 2. **Replacement** - a later request replaces an earlier one. 3. **Custom** - your code calls a Job's `$stop()` method. ### Requests vs Jobs A request (`req`) is an environment with the data sent from a web browser to the web server. A Job (`job`) is a [`jobqueue::Job` R6 object](https://cmmr.github.io/jobqueue/reference/Job.html) containing a single `req` along with the parameters for processing it. Some useful elements of each are:
* `req$ARGS` * `req$COOKIES` * `req$PATH_INFO` * `req$REMOTE_ADDR` * `job$result` * `job$timeout` * `job$on()` * `job$stop()`
Interrupts are always defined at the Job level. ## Setup ```r library(webqueue) library(RCurl) handler <- function (req) { args <- req$ARGS # POST/GET parameters if (!is.null(args$s)) Sys.sleep(args$s) # sleep for `s` seconds if (!is.null(args$x)) return (args$x) # return `x` if present return ('hello') # default to 'hello' } ``` To interrupt a job from within `handler`, call `stop()` or `cli_abort()` as usual, or `return (webqueue::response())` to send a more informative message back. ## Timeout ### Basic Let's start by limiting all Jobs to 1 second. ```r wq <- WebQueue$new(handler, timeout = 1) getURL('http://localhost:8080') #> [1] "hello" getURL('http://localhost:8080?x=hi') #> [1] "hi" getURL('http://localhost:8080?s=3') #> [1] "timeout: total runtime exceeded 1 second\n" wq$stop() ``` ### Per State Setting `timeout = 1` is shorthand for `timeout = c(total = 1)`, which starts the clock as soon as the job is created. If these jobs are likely to wait a long time before running, you might consider setting `timeout = c(running = 1)` which starts the clock when the job actually begins running. Or, set `timeout = c(running = 1, queued = 60)` to also set a limit to how long a job can spend waiting in the queue. See the [jobqueue::Job](https://cmmr.github.io/jobqueue/reference/Job.html#arguments-2) reference page for information on all standard Job states. ```r wq <- WebQueue$new(handler, timeout = c(queued = 1)) getURL('http://localhost:8080?s=3') # spends 3 seconds in 'running' state #> [1] "hello" wq$stop() ``` ### Per Request Perhaps some pages on your website need a different time limit. ```r timeout <- function (job) { ifelse(job$req$PATH_INFO == '/compute', 5, 1) } wq <- WebQueue$new(handler, timeout = timeout) getURL('http://localhost:8080/compute?s=3') #> [1] "hello" getURL('http://localhost:8080?s=3') #> [1] "timeout: total runtime exceeded 1 second\n" wq$stop() ``` ## Replacement Use the `stop_id` field to only run the most recent request with a given hash. For instance, if you have a session id (`sid`) you can use that as the request hash. ```r stop_id <- function (job) { job$req$ARGS$sid } wq <- WebQueue$new(handler, stop_id = stop_id) # Fetch three URLs at the same time. jq <- jobqueue::Queue$new(workers = 3L) r1 <- jq$run({ RCurl::getURL('http://localhost:8080?sid=1&s=1&x=first') }) r2 <- jq$run({ RCurl::getURL('http://localhost:8080?sid=1&s=1&x=second') }) r3 <- jq$run({ RCurl::getURL('http://localhost:8080?sid=1&s=1&x=third') }) r1$result #> [1] "superseded: duplicated stop_id\n" r2$result #> [1] "superseded: duplicated stop_id\n" r3$result #> [1] "third" jq$stop() wq$stop() ``` ### Cancel You can also send a dummy request with the appropriate `stop_id` hash to cancel an actual request. ## Custom Anywhere you provide a `function (job)`, you can examine the Job and request, and call `job$stop()` as needed. ### IP Filter To ignore requests from certain IP addresses: ```r ip_check <- function (job) { ip <- job$req$REMOTE_ADDR if (!startsWith(ip, '192.168.')) job$stop(paste('Unauthorized IP Address:', ip)) } wq <- WebQueue$new(handler, hooks = list(created = ip_check)) getURL('http://localhost:8080') #> [1] "interrupt: Unauthorized IP Address: 127.0.0.1\n" wq$stop() ``` Note: in reality, you'd want to use `webqueue::WebQueue$new(onHeaders)` to do this particular task more efficiently. ### Queue Limit Once the job is assigned to a Queue, you can access the list of all jobs that are currently queued or running. Here, we'll refuse to add more than 2 jobs to the queue at once. ```r qlimit <- function (job) { if (length(job$queue$jobs) > 2) job$stop('Queue is too full.') } wq <- WebQueue$new(handler, hooks = list(queued = qlimit)) # Fetch three URLs at the same time. jq <- jobqueue::Queue$new(workers = 3L) r1 <- jq$run({ RCurl::getURL('http://localhost:8080?s=1') }) r2 <- jq$run({ RCurl::getURL('http://localhost:8080?s=1') }) r3 <- jq$run({ RCurl::getURL('http://localhost:8080?s=1') }) r1$result #> [1] "hello" r2$result #> [1] "hello" r3$result #> [1] "interrupt: Queue is too full.\n" jq$stop() wq$stop() ``` ### Stop Other Jobs Suppose an admin needs to stop all jobs for a particular user. ```r stop_user <- function (job) { stop_uid <- job$req$ARGS$stop if (!is.null(stop_uid)) { for (j in job$queue$jobs) if (j$req$ARGS$u == stop_uid) j$stop('Stopped by admin.') job$output <- 'done' } } wq <- WebQueue$new(handler, hooks = list(queued = stop_user)) # Fetch three URLs at the same time. jq <- jobqueue::Queue$new(workers = 3L) r1 <- jq$run({ RCurl::getURL('http://localhost:8080?u=1&s=10') }) r2 <- jq$run({ RCurl::getURL('http://localhost:8080?u=1&s=10') }) r3 <- jq$run({ RCurl::getURL('http://localhost:8080?u=2&stop=1') }) r1$result #> [1] "interrupt: Stopped by admin.\n" r2$result #> [1] "interrupt: Stopped by admin.\n" r3$result #> [1] "done" jq$stop() wq$stop() ```