msg_queue {interprocess}R Documentation

Send Text Messages Between Processes

Description

An interprocess message queue that ensures each message is delivered to only one reader, at which time the message is removed from the queue. Ideal for producer/consumer situations where the message defines work waiting to be processed. The message itself can be any scalar character, for example, a JSON string or path to an RDS file.

Usage

msg_queue(
  name = uid(),
  assert = NULL,
  max_count = 100,
  max_nchar = 128,
  cleanup = FALSE,
  file = NULL
)

## S3 method for class 'msg_queue'
with(data, expr, alt_expr = NULL, timeout_ms = Inf, ...)

Arguments

name

Unique ID. Alphanumeric, starting with a letter.

assert

Apply an additional constraint.

  • 'create' - Error if the message queue already exists.

  • 'exists' - Error if the message queue doesn't exist.

  • NULL - No constraint; create the message queue if it doesn't exist.

max_count

The maximum number of messages that can be stored in the queue at the same time. Attempting to send additional messages will cause send() to block or return FALSE. Ignored if the message queue already exists.

max_nchar

The maximum number of characters in each message. Attempting to send larger messages will throw an error. Ignored if the message queue already exists.

cleanup

Remove the message queue when the R session exits. If FALSE, the message queue will persist until ⁠$remove()⁠ is called or the operating system is restarted.

file

Use a hash of this file/directory path as the message queue name. The file itself will not be read or modified, and does not need to exist.

data

A msg_queue object.

expr

Expression to evaluate if a message is received. The message can be accessed by . in this context. See examples.

alt_expr

Expression to evaluate if timeout_ms is reached.

timeout_ms

Maximum time (in milliseconds) to block the process while waiting for the operation to succeed. Use 0 or Inf to return immediately or only when successful, respectively.

...

Not used.

Value

msg_queue() returns a msg_queue object with the following methods:

with() returns eval(expr) on success; eval(alt_expr) otherwise.

See Also

Other shared objects: mutex(), semaphore()

Examples


mq <- interprocess::msg_queue()
print(mq)

mq$send(paste('my favorite number is', floor(runif(1) * 100)))
mq$count()

mq$receive()
mq$receive(timeout_ms = 0)

mq$send('The Matrix has you...')
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)
with(mq, paste('got message:', .), 'no messages', timeout_ms = 0)

mq$remove()

[Package interprocess version 1.3.0 Index]