class MQueue

Constants

VERSION

Public Class Methods

new(p1, p2 = {}) click to toggle source
VALUE
mqueue_initialize(int argc, VALUE* argv, VALUE self) {
  VALUE queue_name, options;
  mqueue_t* queue_ptr;

  rb_scan_args(argc, argv, "1:", &queue_name, &options);
  TypedData_Get_Struct(self, mqueue_t, &mqueue_data_type, queue_ptr);

  if ((*queue_ptr).queue_descriptor != -1)
    rb_raise(rb_eRuntimeError, "Illegal initialization");

  if (TYPE(queue_name) != T_STRING)
    rb_raise(rb_eTypeError, "Invalid queue name, must be a string");
  if (TYPE(options) != T_HASH)
    options = rb_hash_new();

  (*queue_ptr).queue_name = ruby_strdup(StringValueCStr(queue_name));
  (*queue_ptr).attributes.mq_maxmsg = FIX2INT(rb_hash_lookup2(options,
        ID2SYM(rb_intern("capacity")),
        INT2NUM(10))
      );
  (*queue_ptr).attributes.mq_msgsize = FIX2INT(rb_hash_lookup2(options,
        ID2SYM(rb_intern("max_msgsize")),
        INT2NUM(4096))
      );
  (*queue_ptr).attributes.mq_flags = generate_flags(options);
  (*queue_ptr).queue_descriptor = mq_open((*queue_ptr).queue_name,
        (*queue_ptr).attributes.mq_flags,
        S_IRUSR | S_IWUSR,
        &(*queue_ptr).attributes
      );

  if ((*queue_ptr).queue_descriptor == (mqd_t)-1)
    rb_raise(rb_eRuntimeError, "Unable to initialize");

  return self;
}

Public Instance Methods

capacity() click to toggle source
VALUE
mqueue_capacity(VALUE self) {
  mqueue_t* queue_ptr;
  struct mq_attr attributes;

  TypedData_Get_Struct(self, mqueue_t, &mqueue_data_type, queue_ptr);

  if (mq_getattr((*queue_ptr).queue_descriptor, &attributes) == -1)
    return INT2NUM(-1);

  return INT2NUM(attributes.mq_maxmsg);
}
delete() click to toggle source
VALUE
mqueue_delete(VALUE self) {
  mqueue_t* queue_ptr;
  TypedData_Get_Struct(self, mqueue_t, &mqueue_data_type, queue_ptr);

  if (mq_unlink((*queue_ptr).queue_name) == -1)
    return Qfalse;

  return Qtrue;
}
max_msgsize() click to toggle source
VALUE
mqueue_max_msgsize(VALUE self) {
  mqueue_t* queue_ptr;
  struct mq_attr attributes;

  TypedData_Get_Struct(self, mqueue_t, &mqueue_data_type, queue_ptr);

  if (mq_getattr((*queue_ptr).queue_descriptor, &attributes) == -1)
    return INT2NUM(-1);

  return INT2NUM(attributes.mq_msgsize);
}
receive() click to toggle source
VALUE
mqueue_receive(VALUE self) {
  mqueue_t* queue_ptr;
  int len;

  TypedData_Get_Struct(self, mqueue_t, &mqueue_data_type, queue_ptr);
  char msg_buffer[(*queue_ptr).attributes.mq_msgsize];

  if ((len = mq_receive((*queue_ptr).queue_descriptor, msg_buffer, (*queue_ptr).attributes.mq_msgsize, 0)) == -1)
    return Qfalse;

  return rb_str_new(msg_buffer, len);
}
send(p1) click to toggle source
VALUE
mqueue_send(VALUE self, VALUE message) {
  mqueue_t* queue_ptr;
  char* msg_ptr;
  size_t msg_len;

  TypedData_Get_Struct(self, mqueue_t, &mqueue_data_type, queue_ptr);

  msg_ptr = RSTRING_PTR(message);
  msg_len = RSTRING_LEN(message);

  if (mq_send((*queue_ptr).queue_descriptor, msg_ptr, msg_len, 0) == -1)
    return Qfalse;
  return Qtrue;
}
size() click to toggle source
VALUE
mqueue_size(VALUE self) {
  mqueue_t* queue_ptr;
  struct mq_attr attributes;

  TypedData_Get_Struct(self, mqueue_t, &mqueue_data_type, queue_ptr);

  if (mq_getattr((*queue_ptr).queue_descriptor, &attributes) == -1)
    return INT2NUM(-1);

  return INT2NUM(attributes.mq_curmsgs);
}
timedreceive(p1 = v1) click to toggle source
VALUE
mqueue_timedreceive(int argc, VALUE* argv, VALUE self) {
  VALUE timeout;
  mqueue_t* queue_ptr;
  struct timespec abs_timeout;

  rb_scan_args(argc, argv, "01", &timeout);

  if (NIL_P(timeout))
    timeout = INT2NUM(10);

  TypedData_Get_Struct(self, mqueue_t, &mqueue_data_type, queue_ptr);
  char msg_buffer[(*queue_ptr).attributes.mq_msgsize];

  if (clock_gettime(CLOCK_REALTIME, &abs_timeout) == -1)
    return Qfalse;

  abs_timeout.tv_sec += NUM2INT(timeout);
  if (mq_timedreceive((*queue_ptr).queue_descriptor, msg_buffer, (*queue_ptr).attributes.mq_msgsize, 0, &abs_timeout) == -1)
    return Qfalse;

  return rb_str_new_cstr(msg_buffer);
}
timedsend(p1, p2 = v2) click to toggle source
VALUE
mqueue_timedsend(int argc, VALUE* argv, VALUE self) {
  VALUE message, timeout;
  mqueue_t* queue_ptr;
  char* msg_ptr;
  size_t msg_len;
  struct timespec abs_timeout;

  rb_scan_args(argc, argv, "11", &message, &timeout);

  if (NIL_P(timeout))
    timeout = INT2NUM(10);

  TypedData_Get_Struct(self, mqueue_t, &mqueue_data_type, queue_ptr);

  msg_ptr = RSTRING_PTR(message);
  msg_len = RSTRING_LEN(message);

  if (clock_gettime(CLOCK_REALTIME, &abs_timeout) == -1)
    return Qfalse;

  abs_timeout.tv_sec += NUM2INT(timeout);

  if (mq_timedsend((*queue_ptr).queue_descriptor, msg_ptr, msg_len, 0, &abs_timeout) == -1)
    return Qfalse;
  return Qtrue;
}