class SysVMQ

Constants

IPC_CREAT
IPC_EXCL
IPC_INFO
IPC_NOWAIT
IPC_RMID
IPC_SET
IPC_STAT

Public Class Methods

new(p1, p2, p3) click to toggle source
VALUE
sysvmq_initialize(VALUE self, VALUE key, VALUE buffer_size, VALUE flags)
{
  sysvmq_t* sysv;
  size_t msgbuf_size;

  // TODO: Also support string keys, so you can pass '0xDEADC0DE'
  Check_Type(key,   T_FIXNUM);
  Check_Type(flags, T_FIXNUM);
  Check_Type(buffer_size, T_FIXNUM);

  TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);

  // (key_t) is a 32-bit integer (int). It's defined as `int` (at least on OS X
  // and Linux). However, `FIX2INT()` (from Ruby) will complain if the key is
  // something in the range 2^31-2^32, because of the sign bit. We use UINT to
  // trick Ruby, so it won't complain.
  sysv->key = (key_t) FIX2UINT(key);

  while ((sysv->id = msgget(sysv->key, FIX2INT(flags))) < 0) {
    if (errno == EINTR) {
      rb_thread_wait_for(polling_interval); // TODO: Really necessary here?
      continue;
    }
    rb_sys_fail("Failed opening the message queue.");
  }

  // Allocate the msgbuf buffer once for the instance, to not allocate a buffer
  // for each message sent. This makes SysVMQ not thread-safe (requiring a
  // buffer for each thread), but is a reasonable trade-off for now for the
  // performance.
  sysv->buffer_size = (size_t) FIX2LONG(buffer_size + 1);
  msgbuf_size = sysv->buffer_size * sizeof(char) + sizeof(long);

  // Note that this is a zero-length array, so we size the struct to size of the
  // header (long, the mtype) and then the rest of the space for message buffer.
  sysv->msgbuf = (sysvmq_msgbuf_t*) xmalloc(msgbuf_size);

  return self;
}

Public Instance Methods

destroy() click to toggle source
static VALUE
sysvmq_destroy(VALUE self)
{
  VALUE argv[1];
  argv[0] = INT2FIX(IPC_RMID);
  return sysvmq_stats(1, argv, self);
}
receive(*args) click to toggle source
VALUE
sysvmq_receive(int argc, VALUE *argv, VALUE self)
{
  VALUE type  = INT2FIX(0);
  VALUE flags = INT2FIX(0);
  sysvmq_t* sysv;
  sysvmq_blocking_call_t blocking;

  if (argc > 2) {
    rb_raise(rb_eArgError, "Wrong number of arguments (0..2)");
  }

  if (argc >= 1) type  = argv[0];
  if (argc == 2) flags = argv[1];

  TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);

  Check_Type(type, T_FIXNUM);
  Check_Type(flags, T_FIXNUM);

  // Attach blocking call parameters to the struct passed to the blocking
  // function wrapper.
  blocking.flags  = FIX2INT(flags);
  blocking.type   = FIX2LONG(type);
  blocking.sysv   = sysv;
  // Initialize error so it's never a garbage value, if
  // `sysvmq_maybe_blocking_receive` was interrupted at a non-nice time.
  blocking.error  = UNINITIALIZED_ERROR;
  blocking.length = UNINITIALIZED_ERROR;

  if ((blocking.flags & IPC_NOWAIT) == IPC_NOWAIT) {
    while(sysvmq_maybe_blocking_receive(&blocking) == NULL && blocking.error < 0) {
      if (errno == EINTR) {
        continue;
      }

      rb_sys_fail("Failed recieving message from queue");
    }
  } else {
    // msgrcv(2) can block sending a message, if IPC_NOWAIT is not passed.
    // We unlock the GVL waiting for the call so other threads (e.g. signal
    // handling) can continue to work. Sets `length` on `blocking` with the size
    // of the message returned.
    while (WITHOUT_GVL(sysvmq_maybe_blocking_receive, &blocking, RUBY_UBF_IO, NULL) == NULL
            && blocking.error < 0) {
      if (errno == EINTR || blocking.error == UNINITIALIZED_ERROR) {
        continue;
      }

      rb_sys_fail("Failed receiving message from queue");
    }
  }

  // Guard it..
  assert(blocking.length != UNINITIALIZED_ERROR);

  // Reencode with default external encoding
  return rb_str_new(sysv->msgbuf->mtext, blocking.length);
}
send(*args) click to toggle source
VALUE
sysvmq_send(int argc, VALUE *argv, VALUE self)
{
  VALUE message;
  VALUE priority = INT2FIX(1);
  VALUE flags = INT2FIX(0);
  sysvmq_blocking_call_t blocking;
  sysvmq_t* sysv;

  if (argc > 3 || argc == 0) {
    rb_raise(rb_eArgError, "Wrong number of arguments (1..3)");
  }

  message  = argv[0];
  if (argc >= 2) priority = argv[1];
  if (argc == 3) flags    = argv[2];

  message = rb_funcall(message, rb_intern("to_s"), 0);

  TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);

  Check_Type(flags,    T_FIXNUM);
  Check_Type(priority, T_FIXNUM);
  // TODO: Call to_s on message if it responds to

  // Attach blocking call parameters to the struct passed to the blocking
  // function wrapper.
  blocking.flags = FIX2INT(flags);
  blocking.size  = RSTRING_LEN(message);
  blocking.sysv  = sysv;
  // See msgrcv(2) wrapper
  blocking.error  = UNINITIALIZED_ERROR;
  blocking.length = UNINITIALIZED_ERROR;

  // The buffer can be obtained from `sysvmq_maybe_blocking_send`, instead of
  // passing it, set it directly on the instance struct.
  sysv->msgbuf->mtype = FIX2INT(priority);

  if (blocking.size > sysv->buffer_size) {
    rb_raise(rb_eArgError, "Size of message is bigger than buffer size.");
  }

  // TODO: Can a string copy be avoided?
  memcpy(sysv->msgbuf->mtext, RSTRING_PTR(message), blocking.size);

  // Non-blocking call, skip the expensive GVL release/acquire
  if ((blocking.flags & IPC_NOWAIT) == IPC_NOWAIT) {
    while(sysvmq_maybe_blocking_send(&blocking) == NULL && blocking.error < 0) {
      if (errno == EINTR) {
        continue;
      }

      rb_sys_fail("Failed sending message to queue");
    }
  } else {
    // msgsnd(2) can block waiting for a message, if IPC_NOWAIT is not passed.
    // We unlock the GVL waiting for the call so other threads (e.g. signal
    // handling) can continue to work.
    while (WITHOUT_GVL(sysvmq_maybe_blocking_send, &blocking, RUBY_UBF_IO, NULL) == NULL
            && blocking.error < 0) {
      if (errno == EINTR || blocking.error == UNINITIALIZED_ERROR) {
        continue;
      }

      rb_sys_fail("Failed sending message to queue");
    }
  }

  return message;
}
stats(*args) click to toggle source
static VALUE
sysvmq_stats(int argc, VALUE *argv, VALUE self)
{
  struct msqid_ds info;
  VALUE info_hash;
  VALUE cmd;
  sysvmq_t* sysv;

  // Optional argument handling
  if (argc > 1) {
    rb_raise(rb_eArgError, "Wrong number of arguments (0..1)");
  }

  // Default to IPC_STAT
  cmd = argc == 1 ? argv[0] : INT2FIX(IPC_STAT);

  TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);

  // TODO: Does FIX2INT actually perform this check already?
  Check_Type(cmd, T_FIXNUM);

  while (msgctl(sysv->id, FIX2INT(cmd), &info) < 0) {
    if (errno == EINTR) {
      rb_thread_wait_for(polling_interval);
      continue;
    }
    rb_sys_fail("Failed executing msgctl(2) command.");
  }

  // Map values from struct to a hash
  // TODO: Add all the fields
  // TODO: They are probably not ints..
  info_hash = rb_hash_new();
  rb_hash_aset(info_hash, ID2SYM(rb_intern("count")),         INT2FIX(info.msg_qnum));
  rb_hash_aset(info_hash, ID2SYM(rb_intern("maximum_size")), INT2FIX(info.msg_qbytes));

  // TODO: Can probably make a better checker here for whether the struct
  // actually has the member.
  // TODO: BSD support?
#ifdef __linux__
  rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.__msg_cbytes));
#elif __APPLE__
  rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.msg_cbytes));
#endif

  return info_hash;
}