Returns new or historical messages from a stream for a consumer in a group. Blocks until a message is available otherwise.
XREADGROUP
GROUP
group consumer
[COUNT
count]
[BLOCK
milliseconds]
[NOACK
]
STREAMS
key [key…]
ID [ID…]
The XREADGROUP
command is a special version of the
XREAD
command with support for consumer groups. Probably
you will have to understand the XREAD
command before
reading this page will makes sense.
Moreover, if you are new to streams, we recommend to read our introduction to Streams. Make sure to understand the concept of consumer group in the introduction so that following how this command works will be simpler.
The difference between this command and the vanilla
XREAD
is that this one supports consumer groups.
Without consumer groups, just using XREAD
, all the
clients are served with all the entries arriving in a stream. Instead
using consumer groups with XREADGROUP
, it is possible to
create groups of clients that consume different parts of the messages
arriving in a given stream. If, for instance, the stream gets the new
entries A, B, and C and there are two consumers reading via a consumer
group, one client will get, for instance, the messages A and C, and the
other the message B, and so forth.
Within a consumer group, a given consumer (that is, just a client consuming messages from the stream), has to identify with a unique consumer name. Which is just a string.
One of the guarantees of consumer groups is that a given consumer can
only see the history of messages that were delivered to it, so a message
has just a single owner. However there is a special feature called
message claiming that allows other consumers to claim messages
in case there is a non recoverable failure of some consumer. In order to
implement such semantics, consumer groups require explicit
acknowledgment of the messages successfully processed by the consumer,
via the XACK
command. This is needed because the stream
will track, for each consumer group, who is processing what message.
This is how to understand if you want to use a consumer group or not:
From the point of view of the syntax, the commands are almost the
same, however XREADGROUP
requires a special and
mandatory option:
GROUP <group-name> <consumer-name>
The group name is just the name of a consumer group associated to the
stream. The group is created using the XGROUP
command. The
consumer name is the string that is used by the client to identify
itself inside the group. The consumer is auto created inside the
consumer group the first time it is saw. Different clients should select
a different consumer name.
When you read with XREADGROUP
, the server will
remember that a given message was delivered to you: the message
will be stored inside the consumer group in what is called a Pending
Entries List (PEL), that is a list of message IDs delivered but not yet
acknowledged.
The client will have to acknowledge the message processing using
XACK
in order for the pending entry to be removed from the
PEL. The PEL can be inspected using the XPENDING
command.
The NOACK
subcommand can be used to avoid adding the
message to the PEL in cases where reliability is not a requirement and
the occasional message loss is acceptable. This is equivalent to
acknowledging the message when it is read.
The ID to specify in the STREAMS option when using
XREADGROUP
can be one of the following two:
>
ID, which means that the consumer want
to receive only messages that were never delivered to any other
consumer. It just means, give me new messages.>
, then the command will just let the client access its
pending entries: messages delivered to it, but not yet acknowledged.
Note that in this case, both BLOCK
and NOACK
are ignored.Like XREAD
the XREADGROUP
command can be
used in a blocking way. There are no differences in this regard.
Two things:
XPENDING
command.Normally you use the command like that in order to get new messages and process them. In pseudo-code:
WHILE true
entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
if entries == nil
puts "Timeout... try again"
CONTINUE
end
FOREACH entries AS stream_entries
FOREACH stream_entries as message
process_message(message.id,message.fields)
# ACK the message as processed
XACK mystream $GroupName message.id
END
END
END
In this way the example consumer code will fetch only new messages,
process them, and acknowledge them via XACK
. However the
example code above is not complete, because it does not handle
recovering after a crash. What will happen if we crash in the middle of
processing messages, is that our messages will remain in the pending
entries list, so we can access our history by giving
XREADGROUP
initially an ID of 0, and performing the same
loop. Once providing an ID of 0 the reply is an empty set of messages,
we know that we processed and acknowledged all the pending messages: we
can start to use >
as ID, in order to get the new
messages and rejoin the consumers that are processing new things.
To see how the command actually replies, please check the
XREAD
command page.
Entries may be deleted from the stream due to trimming or explicit
calls to XDEL
at any time. By design, Valkey doesn’t
prevent the deletion of entries that are present in the stream’s PELs.
When this happens, the PELs retain the deleted entries’ IDs, but the
actual entry payload is no longer available. Therefore, when reading
such PEL entries, Valkey will return a null value in place of their
respective data.
Example:
> XADD mystream 1 myfield mydata
"1-0"
> XGROUP CREATE mystream mygroup 0
OK
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1-0"
2) 1) "myfield"
2) "mydata"
> XDEL mystream 1-0
(integer) 1
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1-0"
2) (nil)
Reading the Streams introduction is highly suggested in order to understand more about the streams overall behavior and semantics.
One of the following:
Array reply: an
array where each element is an array composed of a two elements
containing the key name and the entries reported for that key. The
entries reported are full stream entries, having IDs and the list of all
the fields and values. Field and values are guaranteed to be reported in
the same order they were added by XADD
.
Nil reply: if the BLOCK option is given and a timeout occurs, or if there is no stream that can be served.
One of the following:
Map reply: A map of
key-value elements where each element is composed of the key name and
the entries reported for that key. The entries reported are full stream
entries, having IDs and the list of all the fields and values. Field and
values are guaranteed to be reported in the same order they were added
by XADD
.
Null reply: if the BLOCK option is given and a timeout occurs, or if there is no stream that can be served.
For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.
@blocking @slow @stream @write
XACK, XADD, XAUTOCLAIM, XCLAIM, XDEL, XGROUP, XGROUP CREATE, XGROUP CREATECONSUMER, XGROUP DELCONSUMER, XGROUP DESTROY, XGROUP HELP, XGROUP SETID, XINFO, XINFO CONSUMERS, XINFO GROUPS, XINFO HELP, XINFO STREAM, XLEN, XPENDING, XRANGE, XREAD, XREVRANGE, XSETID, XTRIM.