class POSIX_MQ

This class represents an POSIX message queue descriptor (mqd_t) object. It matches the C API for POSIX messages queues closely.

See the README for examples on how to use it.

Constants

Attr

An analogous Struct to “struct mq_attr” in C. This may be used in arguments for POSIX_MQ.new and POSIX_MQ#attr=. POSIX_MQ#attr returns an instance of this class.

See the mq_getattr(3) manpage for more information on the values.

OPEN_MAX

The maximum number of open message descriptors supported by the system. This may be -1, in which case it is dynamically set at runtime. Consult your operating system documentation for system-specific information about this.

PRIO_MAX

The maximum priority that may be specified for POSIX_MQ#send On POSIX-compliant systems, this is at least 31, but some systems allow higher limits. The minimum priority is always zero.

Public Class Methods

for_fd(socket) → mq click to toggle source

Adopts a socket as a POSIX message queue. Argument will be checked to ensure it is a POSIX message queue socket.

This is useful for adopting systemd sockets passed via the ListenMessageQueue directive. Returns a POSIX_MQ instance. This method is only available under Linux and FreeBSD and is not intended to be portable.

static VALUE for_fd(VALUE klass, VALUE socket)
{
        VALUE mqv = alloc(klass);
        struct posix_mq *mq = get(mqv, 0);
        mqd_t mqd;

        mq->name = Qnil;
        mqd = FD_TO_MQD(NUM2INT(socket));

        if (mq_getattr(mqd, &mq->attr) < 0)
                rb_sys_fail("provided file descriptor is not a POSIX MQ");

        mq->des = mqd;
        return mqv;
}
new(name [, flags [, mode [, mq_attr]]) → mq click to toggle source

Opens a POSIX message queue given by name. name should start with a slash (“/”) for portable applications.

If a Symbol is given in place of integer flags, then:

  • :r is equivalent to IO::RDONLY

  • :w is equivalent to IO::CREAT|IO::WRONLY

  • :rw is equivalent to IO::CREAT|IO::RDWR

mode is an integer and only used when IO::CREAT is used. mq_attr is a POSIX_MQ::Attr and only used if IO::CREAT is used. If mq_attr is not specified when creating a queue, then the system defaults will be used.

See the manpage for mq_open(3) for more details on this function.

static VALUE init(int argc, VALUE *argv, VALUE self)
{
        struct posix_mq *mq = get(self, 0);
        struct open_args x;
        VALUE name, oflags, mode, attr;

        rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);

        switch (TYPE(oflags)) {
        case T_NIL:
                x.oflags = O_RDONLY;
                break;
        case T_SYMBOL:
                if (oflags == sym_r)
                        x.oflags = O_RDONLY;
                else if (oflags == sym_w)
                        x.oflags = O_CREAT|O_WRONLY;
                else if (oflags == sym_rw)
                        x.oflags = O_CREAT|O_RDWR;
                else {
                        oflags = rb_inspect(oflags);
                        rb_raise(rb_eArgError,
                                 "symbol must be :r, :w, or :rw: %s",
                                 StringValuePtr(oflags));
                }
                break;
        case T_BIGNUM:
        case T_FIXNUM:
                x.oflags = NUM2INT(oflags);
                break;
        default:
                rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
        }

        x.name = StringValueCStr(name);
        x.argc = 2;

        switch (TYPE(mode)) {
        case T_FIXNUM:
                x.argc = 3;
                x.mode = NUM2UINT(mode);
                break;
        case T_NIL:
                if (x.oflags & O_CREAT) {
                        x.argc = 3;
                        x.mode = 0666;
                }
                break;
        default:
                rb_raise(rb_eArgError, "mode not an integer");
        }

        switch (TYPE(attr)) {
        case T_STRUCT:
                x.argc = 4;
                rstruct2mqattr(&x.attr, attr, 1);

                /* principle of least surprise */
                if (x.attr.mq_flags & O_NONBLOCK)
                        x.oflags |= O_NONBLOCK;
                break;
        case T_NIL:
                break;
        default:
                check_struct_type(attr);
        }

        (void)xopen(&x);
        mq->des = x.des;
        if (mq->des == MQD_INVALID) {
                switch (errno) {
                case ENOMEM:
                case EMFILE:
                case ENFILE:
                case ENOSPC:
                        rb_gc();
                        (void)xopen(&x);
                        mq->des = x.des;
                }
                if (mq->des == MQD_INVALID)
                        rb_sys_fail("mq_open");
        }

        mq->name = rb_str_new_frozen(name);
        if (x.oflags & O_NONBLOCK)
                mq->attr.mq_flags = O_NONBLOCK;

        return self;
}
open(*args) { |mq| ... } click to toggle source

Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit. All all arguments are passed to POSIX_MQ.new.

# File lib/posix_mq.rb, line 19
def self.open(*args)
  mq = new(*args)
  block_given? or return mq
  begin
    yield mq
  ensure
    mq.close unless mq.closed?
  end
end

Public Instance Methods

mq << string → mq click to toggle source

Inserts the given string into the message queue with a default priority of 0 and no timeout.

Returns itself so its calls may be chained. This use is only recommended only for users who expect blocking behavior from the queue.

static VALUE send0(VALUE self, VALUE buffer)
{
        struct posix_mq *mq = get(self, 1);
        struct rw_args x;

        setup_send_buffer(&x, buffer);
        x.des = mq->des;
        x.timeout = NULL;
        x.msg_prio = 0;

retry:
        WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
        if (x.retval < 0) {
                if (errno == EINTR)
                        goto retry;
                rb_sys_fail("mq_send");
        }

        return self;
}
attr → mq_attr click to toggle source

Returns a POSIX_MQ::Attr struct containing the attributes of the message queue. See the mq_getattr(3) manpage for more details.

static VALUE getattr(VALUE self)
{
        struct posix_mq *mq = get(self, 1);

        if (mq_getattr(mq->des, &mq->attr) < 0)
                rb_sys_fail("mq_getattr");

        return rb_funcall(cAttr, id_new, 4,
                          LONG2NUM(mq->attr.mq_flags),
                          LONG2NUM(mq->attr.mq_maxmsg),
                          LONG2NUM(mq->attr.mq_msgsize),
                          LONG2NUM(mq->attr.mq_curmsgs));
}
attr = POSIX_MQ::Attr(IO::NONBLOCK) → mq_attr click to toggle source

Only the IO::NONBLOCK flag may be set or unset (zero) in this manner. See the mq_setattr(3) manpage for more details.

Consider using the POSIX_MQ#nonblock= method as it is easier and more natural to use.

static VALUE setattr(VALUE self, VALUE astruct)
{
        struct posix_mq *mq = get(self, 1);
        struct mq_attr newattr;

        rstruct2mqattr(&newattr, astruct, 0);

        if (mq_setattr(mq->des, &newattr, NULL) < 0)
                rb_sys_fail("mq_setattr");

        return astruct;
}
autoclose = boolean → boolean click to toggle source

Determines whether or not the mq will be closed automatically at finalization.

static VALUE setautoclose(VALUE self, VALUE autoclose)
{
        struct posix_mq *mq = get(self, 1);

        MQ_IO_SET_AUTOCLOSE(mq, autoclose);
        mq->autoclose = RTEST(autoclose) ? 1 : 0;
        return autoclose;
}
autoclose? → boolean click to toggle source

Returns whether or not the mq will be closed automatically at finalization.

static VALUE autoclose_p(VALUE self)
{
        struct posix_mq *mq = get(self, 1);

        return mq->autoclose ? Qtrue : Qfalse;
}
clone()

There's no point in ever cloning a POSIX_MQ object. All send/receive operations are atomic and only one native thread may be notified at a time

Alias for: dup
close → nil click to toggle source

Closes the underlying message queue descriptor. If this descriptor had a registered notification request, the request will be removed so another descriptor or process may register a notification request. Message queue descriptors are automatically closed by garbage collection.

static VALUE _close(VALUE self)
{
        struct posix_mq *mq;

        if (IDEMPOTENT_IO_CLOSE) { /* defined in extconf.rb */
                mq = get(self, 0);
                if (!mq || (mq->des == MQD_INVALID))
                        return Qnil;
        } else {
                mq = get(self, 1);
        }

        if (! MQ_IO_CLOSE(mq)) {
                if (mq_close(mq->des) < 0)
                        rb_sys_fail("mq_close");
        }
        mq->des = MQD_INVALID;

        return Qnil;
}
closed? → true or false click to toggle source

Returns true if the message queue descriptor is closed and therefore unusable, otherwise false

static VALUE closed(VALUE self)
{
        struct posix_mq *mq = get(self, 0);

        return mq->des == MQD_INVALID ? Qtrue : Qfalse;
}
dup() click to toggle source

There's no point in ever duping a POSIX_MQ object. All send/receive operations are atomic and only one native thread may be notified at a time

# File lib/posix_mq.rb, line 73
def dup
  self
end
Also aliased as: clone
name → string click to toggle source

Returns the string name of message queue associated with mq

static VALUE name(VALUE self)
{
        struct posix_mq *mq = get(self, 0);

        if (NIL_P(mq->name)) {
                /*
                 * We could use readlink(2) on /proc/self/fd/N, but lots of
                 * care required.
                 * http://stackoverflow.com/questions/1188757/
                 */
                rb_raise(rb_eArgError, "can not get name of an adopted socket");
        }

        /* XXX compatibility: in retrospect, we could return a frozen string */
        return rb_str_dup(mq->name);
}
nonblock = boolean → boolean click to toggle source

Enables or disables non-blocking operation for the message queue descriptor. Errno::EAGAIN will be raised in situations where the queue would block. This is not compatible with timeout arguments to POSIX_MQ#send and POSIX_MQ#receive.

static VALUE setnonblock(VALUE self, VALUE nb)
{
        struct mq_attr newattr;
        struct posix_mq *mq = get(self, 1);

        if (nb == Qtrue)
                newattr.mq_flags = O_NONBLOCK;
        else if (nb == Qfalse)
                newattr.mq_flags = 0;
        else
                rb_raise(rb_eArgError, "must be true or false");

        if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
                rb_sys_fail("mq_setattr");

        mq->attr.mq_flags = newattr.mq_flags;

        return nb;
}
nonblock? → true or false click to toggle source

Returns the current non-blocking state of the message queue descriptor.

static VALUE nonblock_p(VALUE self)
{
        struct posix_mq *mq = get(self, 1);

        if (mq_getattr(mq->des, &mq->attr) < 0)
                rb_sys_fail("mq_getattr");
        return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
}
notify(&block) click to toggle source

Executes the given block upon reception of the next message in an empty queue. If the message queue is not empty, then this block will only be fired after the queue is emptied and repopulated with one message.

This block will only be executed upon the arrival of the first message and must be reset/reenabled for subsequent notifications. This block will execute in a separate Ruby Thread (and thus will safely have the GVL by default).

This method is only supported on platforms that implement SIGEV_THREAD functionality in mq_notify(3). So far we only know of glibc + Linux supporting this. Please let us know if your platform can support this functionality and are willing to test for us <ruby-posix-mq@bogomips.org>

As far as we can tell, this method is not very useful nor efficient. You would be better served using signals or just blocking. On Linux and FreeBSD, you can use POSIX_MQ with I/O multiplexing (IO.select, EventMachine), too.

# File lib/posix_mq.rb, line 49
def notify(&block)
  block.arity == 1 or
    raise ArgumentError, "arity of notify block must be 1"
  r, w = IO.pipe
  notify_exec(w, Thread.new(block) do |blk|
    begin
      begin
        r.read(1) or raise Errno::EINTR
      rescue Errno::EINTR, Errno::EAGAIN
        retry
      end
      blk.call(self)
    ensure
      notify_cleanup
      r.close rescue nil
      w.close rescue nil
    end
  end)
  nil
end
notify = signal → signal click to toggle source

Registers the notification request to deliver a given signal to the current process when message is received. If signal is nil, it will unregister and disable the notification request to allow other processes to register a request. If signal is false, it will register a no-op notification request which will prevent other processes from registering a notification. If signal is an IO object, it will spawn a thread upon the arrival of the next message and write one “\0” byte to the file descriptor belonging to that IO object. Only one process may have a notification request for a queue at a time, Errno::EBUSY will be raised if there is already a notification request registration for the queue.

Notifications are only fired once and processes must reregister for subsequent notifications.

For readers of the mq_notify(3) manpage, passing false is equivalent to SIGEV_NONE, and passing nil is equivalent of passing a NULL notification pointer to mq_notify(3).

static VALUE setnotify(VALUE self, VALUE arg)
{
        struct posix_mq *mq = get(self, 1);
        struct sigevent not;
        struct sigevent * notification = &not;
        VALUE rv = arg;

        notify_cleanup(self);
        not.sigev_notify = SIGEV_SIGNAL;

        switch (TYPE(arg)) {
        case T_FALSE:
                not.sigev_notify = SIGEV_NONE;
                break;
        case T_NIL:
                notification = NULL;
                break;
        case T_FIXNUM:
                not.sigev_signo = NUM2INT(arg);
                break;
        case T_SYMBOL:
        case T_STRING:
                not.sigev_signo = lookup_sig(arg);
                rv = INT2NUM(not.sigev_signo);
                break;
        default:
                rb_raise(rb_eArgError, "must be a signal or nil");
        }

        my_mq_notify(mq->des, notification);

        return rv;
}
receive([buffer, [timeout]]) → [ message, priority ] click to toggle source

Takes the highest priority message off the queue and returns an array containing the message as a String and the Integer priority of the message.

If the optional buffer is present, then it must be a String which will receive the data.

If the optional timeout is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout has elapsed and there are no messages in the queue.

On some older systems, the timeout argument is not currently supported and may raise NotImplementedError if timeout is used.

static VALUE receive(int argc, VALUE *argv, VALUE self)
{
        return _receive(PMQ_WANTARRAY, argc, argv, self);
}
send(string [,priority[, timeout]]) → true click to toggle source

Inserts the given string into the message queue with an optional, unsigned integer priority. If the optional timeout is specified, then Errno::ETIMEDOUT will be raised if the operation cannot complete before timeout seconds has elapsed. Without timeout, this method may block until the queue is writable.

On some older systems, the timeout argument is not currently supported and may raise NotImplementedError if timeout is used.

static VALUE my_send(int argc, VALUE *argv, VALUE self)
{
        return _send(0, argc, argv, self);
}
shift([buffer, [timeout]]) → message click to toggle source

Takes the highest priority message off the queue and returns the message as a String.

If the optional buffer is present, then it must be a String which will receive the data.

If the optional timeout is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout has elapsed and there are no messages in the queue.

On some older systems, the timeout argument is not currently supported and may raise NotImplementedError if timeout is used.

static VALUE shift(int argc, VALUE *argv, VALUE self)
{
        return _receive(0, argc, argv, self);
}
to_io → IO click to toggle source

Returns an IO.select-able IO object. This method is only available under Linux and FreeBSD and is not intended to be portable.

static VALUE to_io(VALUE self)
{
        struct posix_mq *mq = get(self, 1);
        int fd = MQD_TO_FD(mq->des);

        if (NIL_P(mq->io)) {
                mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));

                if (!mq->autoclose)
                        rb_funcall(mq->io, id_setautoclose, 1, Qfalse);
        }

        return mq->io;
}
tryreceive([buffer [, timeout]]) → [ message, priority ] or nil click to toggle source

Exactly like POSIX_MQ#receive, except it returns nil instead of raising Errno::EAGAIN when non-blocking operation is desired.

This does not guarantee non-blocking behavior, the message queue must be made non-blocking before calling this method.

static VALUE tryreceive(int argc, VALUE *argv, VALUE self)
{
        return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self);
}
trysend(string [,priority[, timeout]]) → +true+ or +false+ click to toggle source

Exactly like POSIX_MQ#send, except it returns false instead of raising Errno::EAGAIN when non-blocking operation is desired and returns true on success instead of nil.

This does not guarantee non-blocking behavior, the message queue must be made non-blocking before calling this method.

static VALUE trysend(int argc, VALUE *argv, VALUE self)
{
        return _send(PMQ_TRY, argc, argv, self);
}
tryshift([buffer [, timeout]]) → message or nil click to toggle source

Exactly like POSIX_MQ#shift, except it returns nil instead of raising Errno::EAGAIN when non-blocking operation is desired.

This does not guarantee non-blocking behavior, the message queue must be made non-blocking before calling this method.

static VALUE tryshift(int argc, VALUE *argv, VALUE self)
{
        return _receive(PMQ_TRY, argc, argv, self);
}