Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Grunder <michael.grunder@gmail.com>2018-09-29 21:59:01 +0300
committerGitHub <noreply@github.com>2018-09-29 21:59:01 +0300
commit2c9e0572361d5131f24fbc81a8f7baaafb671994 (patch)
tree6982b1e1f17b7cf2fc7e024652fad8212edadacd
parentbfd274712eeb372926d1106b3da3c4fc19c0a48a (diff)
Streams (#1413)
Streams API
-rw-r--r--.gitignore3
-rw-r--r--README.markdown322
-rw-r--r--cluster_library.c165
-rw-r--r--cluster_library.h67
-rw-r--r--common.h84
-rw-r--r--library.c402
-rw-r--r--library.h31
-rw-r--r--php_redis.h15
-rw-r--r--redis.c73
-rw-r--r--redis_array.c4
-rw-r--r--redis_cluster.c76
-rw-r--r--redis_cluster.h15
-rw-r--r--redis_commands.c643
-rw-r--r--redis_commands.h35
-rw-r--r--redis_session.c14
-rw-r--r--tests/RedisTest.php444
-rw-r--r--tests/TestSuite.php18
17 files changed, 2296 insertions, 115 deletions
diff --git a/.gitignore b/.gitignore
index 0462ff37..f672fcee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,4 +14,5 @@ missing
autom4te.cache
mkinstalldirs
run-tests.php
-idea/* \ No newline at end of file
+idea/*
+.cquery
diff --git a/README.markdown b/README.markdown
index 56bdd754..1bce4964 100644
--- a/README.markdown
+++ b/README.markdown
@@ -3268,6 +3268,328 @@ array(1) {
}
~~~
+## Streams
+
+* [xAck](#xack) - Acknowledge one or more pending messages
+* [xAdd](#xadd) - Add a message to a stream
+* [xClaim](#xclaim) - Acquire ownership of a pending message
+* [xDel](#xdel) - Remove a message from a stream
+* [xGroup](#xgroup) - Manage consumer groups
+* [xInfo](#xinfo) - Get information about a stream
+* [xLen](#xlen) - Get the length of a stream
+* [xPending](#xpending) - Inspect pending messages in a stream
+* [xRange](#xrange) - Query a range of messages from a stream
+* [xRead](#xread) - Read message(s) from a stream
+* [xReadGroup](#xreadgroup) - Read stream messages with a group and consumer
+* [xRevRange](#xrevrange) - Query one or more messages from end to start
+* [xTrim](#xtrim) - Trim a stream's size
+
+### xAck
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xAck($stream, $group, $arr_messages);
+~~~
+
+_**Description**_: Acknowledge one or more messages on behalf of a consumer group.
+
+##### *Return value*
+*long*: The number of messages Redis reports as acknowledged.
+
+##### *Example*
+~~~php
+$obj_redis->xAck('stream', 'group1', ['1530063064286-0', '1530063064286-1']);
+~~~
+
+### xAdd
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xAdd($str_key, $str_id, $arr_message);
+~~~
+
+_**Description**_: Add a message to a stream
+
+##### *Return value*
+*String*: The added message ID
+
+##### *Example*
+~~~php
+$obj_redis->xAdd('mystream', "\*", ['field' => 'value']);
+~~~
+
+### xClaim
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->($str_key, $str_group, $str_consumer, $min_idle_time, [$arr_options]);
+~~~
+
+_**Description**_: Claim ownership of one or more pending messages.
+
+#### *Options Array*
+~~~php
+$options = [
+ /* Note: 'TIME', and 'IDLE' are mutually exclusive */
+ 'IDLE' => $value, /* Set the idle time to $value ms */,
+ 'TIME' => $value, /* Set the idle time to now - $value */
+ 'RETRYCOUNT' => $value, /* Update message retrycount to $value */
+ 'FORCE', /* Claim the message(s) even if they're not pending anywhere */
+ 'JUSTID', /* Instruct Redis to only return IDs */
+];
+~~~
+
+##### *Return value*
+*Array*: Either an array of message IDs along with corresponding data, or just an array of IDs (if the 'JUSTID' option was passed).
+
+##### *Example*
+~~~php
+$ids = ['1530113681011-0', '1530113681011-1', '1530113681011-2'];
+
+/* Without any options */
+$obj_redis->xClaim(
+ 'mystream', 'group1', 'myconsumer1', $ids
+);
+
+/* With options */
+$obj_redis->xClaim(
+ 'mystream', 'group1', 'myconsumer2', $ids,
+ [
+ 'IDLE' => time() * 1000,
+ 'RETRYCOUNT' => 5,
+ 'FORCE',
+ 'JUSTID'
+ ]
+);
+~~~
+
+### xDel
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xDel($str_key, $arr_ids);
+~~~
+
+_**Description**_: Delete one or more messages from a stream.
+
+##### *Return value*
+*long*: The number of messages removed
+
+##### *Example*
+~~~php
+$obj_redis->xDel('mystream', ['1530115304877-0', '1530115305731-0']);
+~~~
+
+### xGroup
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xGroup('HELP');
+$obj_redis->xGroup('SETID', $str_key, $str_group, $str_msg_id);
+$obj_redis->xGroup('DELGROUP', $str_key, $str_group);
+$obj_redis->xGroup('CREATE', $str_key, $str_group, $str_msg_id);
+$obj_redis->xGroup('DELCONSUMER', $str_key, $str_group, $str_consumer_name);
+~~~
+
+_**Description**_: This command is used in order to create, destroy, or manage consumer groups.
+
+##### *Return value*
+*Mixed*: This command returns different types depending on the specific XGROUP command executed.
+
+##### *Example*
+~~~php
+$obj_redis->xGroup('CREATE', 'mystream', 'mygroup');
+$obj_redis->xGroup('DELGROUP', 'mystream', 'mygroup');
+~~~
+
+### xInfo
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xInfo('CONSUMERS', $str_stream, $str_group);
+$obj_redis->xInfo('GROUPS', $str_stream);
+$obj_redis->xInfo('STREAM', $str_stream);
+$obj_redis->xInfo('HELP');
+~~~
+
+_**Description**_: Get information about a stream or consumer groups.
+
+##### *Return value*
+*Mixed*: This command returns different types depending on which subcommand is used.
+
+##### *Example*
+~~~php
+$obj_redis->xInfo('STREAM', 'mystream');
+~~~
+
+### xLen
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xLen($str_stream);
+~~~
+
+_**Description**_: Get the length of a given stream
+
+##### *Return value*
+*Long*: The number of messages in the stream.
+
+##### *Example*
+~~~php
+$obj_redis->xLen('mystream');
+~~~
+
+### xPending
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xPending($str_stream, $str_group [, $i_start, $i_end, $i_count, $str_consumer]);
+~~~
+
+_**Description**_: Get information about pending messages in a given stream.
+
+##### *Return value*
+*Array*: Information about the pending messages, in various forms depending on the specific invocation of XPENDING.
+
+##### *Examples*
+~~~php
+$obj_redis->xPending('mystream', 'mygroup');
+$obj_redis->xPending('mystream', 'mygroup', 0, '+', 1, 'consumer-1');
+~~~
+
+### xRange
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xRange($str_stream, $i_start, $i_end [, $i_count]);
+~~~
+
+_**Description**_: Get a range of messages from a given stream.
+
+##### *Return value*
+*Array*: The messages in the stream within the requested range.
+
+##### *Example*
+~~~php
+/* Get everything in this stream */
+$obj_redis->xRange('mystream', '-', '+');
+
+/* Only the first two messages */
+$obj_redis->xRange('mystream', '-', '+', 2);
+~~~
+
+### xRead
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xRead($arr_streams [, $i_count, $i_block);
+~~~
+
+_**Description**_: Read data from one or more streams and only return IDs greater than sent in the command.
+
+##### *Return value*
+*Array*: The messages in the stream newer than the IDs passed to Redis (if any).
+
+##### *Example*
+~~~php
+$obj_redis->xRead(['stream1' => '1535222584555-0', 'stream2' => '1535222584555-0']);
+
+/* --- Possible output ---
+Array
+(
+ [stream1] => Array
+ (
+ [1535222584555-1] => Array
+ (
+ [key:1] => val:1
+ )
+
+ )
+
+ [stream2] => Array
+ (
+ [1535222584555-1] => Array
+ (
+ [key:1] => val:1
+ )
+
+ )
+
+)
+*/
+~~~
+
+### xReadGroup
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xReadGroup($str_group, $str_consumer, $arr_streams [, $i_count, $i_block]);
+~~~
+
+_**Description**_: This method is similar to xRead except that it supports reading messages for a specific consumer group.
+
+##### *Return value*
+*Array*: The messages delivered to this consumer group (if any).
+
+##### *Examples*
+~~~php
+/* Consume messages for 'mygroup', 'consumer1' */
+$obj_redis->xReadGroup('mygroup', 'consumer1', ['s1' => 0, 's2' => 0]);
+
+/* Read a single message as 'consumer2' for up to a second until a message arrives. */
+$obj_redis->xReadGroup('mygroup', 'consumer2', ['s1' => 0, 's2' => 0], 1, 1000);
+~~~
+
+### xRevRange
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xRevRange($str_stream, $i_end, $i_start [, $i_count]);
+~~~
+
+_**Description**_: This is identical to xRange except the results come back in reverse order. Also note that Redis reverses the order of "start" and "end".
+
+##### *Return value*
+*Array*: The messages in the range specified.
+
+##### *Example*
+~~~php
+$obj_redis->xRevRange('mystream', '+', '-');
+~~~
+
+### xTrim
+-----
+
+##### *Prototype*
+~~~php
+$obj_redis->xTrim($str_stream, $i_max_len [, $boo_approximate]);
+~~~
+
+_**Description**_: Trim the stream length to a given maximum. If the "approximate" flag is pasesed, Redis will use your size as a hint but only trim trees in whole nodes (this is more efficient).
+
+##### *Return value*
+*long*: The number of messages trimed from the stream.
+
+##### *Example*
+~~~php
+/* Trim to exactly 100 messages */
+$obj_redis->xTrim('mystream', 100);
+
+/* Let Redis approximate the trimming */
+$obj_redis->xTrim('mystream', 100, true);
+~~~
## Pub/sub
diff --git a/cluster_library.c b/cluster_library.c
index 8f157873..5f710c5e 100644
--- a/cluster_library.c
+++ b/cluster_library.c
@@ -103,7 +103,7 @@ void cluster_free_reply(clusterReply *reply, int free_data) {
for (i = 0; i < reply->elements && reply->element[i]; i++) {
cluster_free_reply(reply->element[i], free_data);
}
- efree(reply->element);
+ if (reply->element) efree(reply->element);
break;
default:
break;
@@ -113,7 +113,8 @@ void cluster_free_reply(clusterReply *reply, int free_data) {
static void
cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements,
- clusterReply **element, int *err TSRMLS_DC)
+ clusterReply **element, int status_strings,
+ int *err TSRMLS_DC)
{
int i;
size_t sz;
@@ -141,6 +142,7 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements,
return;
}
r->len = (long long)sz;
+ if (status_strings) r->str = estrndup(buf, r->len);
break;
case TYPE_INT:
r->integer = len;
@@ -155,11 +157,15 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements,
}
break;
case TYPE_MULTIBULK:
- r->element = ecalloc(r->len,sizeof(clusterReply*));
- r->elements = r->len;
- cluster_multibulk_resp_recursive(sock, r->elements, r->element,
- err TSRMLS_CC);
- if (*err) return;
+ if (r->len >= 0) {
+ r->elements = r->len;
+ if (r->len > 0) {
+ r->element = ecalloc(r->len,sizeof(clusterReply*));
+ cluster_multibulk_resp_recursive(sock, r->elements, r->element,
+ status_strings, err TSRMLS_CC);
+ }
+ if (*err) return;
+ }
break;
default:
*err = 1;
@@ -191,15 +197,17 @@ static RedisSock *cluster_slot_sock(redisCluster *c, unsigned short slot,
}
/* Read the response from a cluster */
-clusterReply *cluster_read_resp(redisCluster *c TSRMLS_DC) {
- return cluster_read_sock_resp(c->cmd_sock,c->reply_type,c->reply_len TSRMLS_CC);
+clusterReply *cluster_read_resp(redisCluster *c, int status_strings TSRMLS_DC) {
+ return cluster_read_sock_resp(c->cmd_sock, c->reply_type,
+ status_strings ? c->line_reply : NULL,
+ c->reply_len TSRMLS_CC);
}
/* Read any sort of response from the socket, having already issued the
* command and consumed the reply type and meta info (length) */
clusterReply*
cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type,
- size_t len TSRMLS_DC)
+ char *line_reply, size_t len TSRMLS_DC)
{
clusterReply *r;
@@ -214,6 +222,10 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type,
r->integer = len;
break;
case TYPE_LINE:
+ if (line_reply) {
+ r->str = estrndup(line_reply, len);
+ r->len = len;
+ }
case TYPE_ERR:
return r;
case TYPE_BULK:
@@ -229,7 +241,7 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type,
if (len != (size_t)-1) {
r->element = ecalloc(len, sizeof(clusterReply*)*len);
cluster_multibulk_resp_recursive(redis_sock, len, r->element,
- &err TSRMLS_CC);
+ line_reply != NULL, &err TSRMLS_CC);
}
break;
default:
@@ -618,7 +630,7 @@ clusterReply* cluster_get_slots(RedisSock *redis_sock TSRMLS_DC)
}
// Consume the rest of our response
- if ((r = cluster_read_sock_resp(redis_sock, type, len TSRMLS_CC)) == NULL ||
+ if ((r = cluster_read_sock_resp(redis_sock, type, NULL, len TSRMLS_CC)) == NULL ||
r->type != TYPE_MULTIBULK || r->elements < 1)
{
if (r) cluster_free_reply(r, 1);
@@ -1486,6 +1498,24 @@ PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,
efree(resp);
}
+PHP_REDIS_API void
+cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
+{
+ char *p;
+
+ /* Cluster already has the reply so abort if this isn't a LINE response *or* if for
+ * some freaky reason we don't detect a null terminator */
+ if (c->reply_type != TYPE_LINE || !(p = memchr(c->line_reply,'\0',sizeof(c->line_reply)))) {
+ CLUSTER_RETURN_FALSE(c);
+ }
+
+ if (CLUSTER_IS_ATOMIC(c)) {
+ CLUSTER_RETURN_STRING(c, c->line_reply, p - c->line_reply);
+ } else {
+ add_next_index_stringl(&c->multi_resp, c->line_reply, p - c->line_reply);
+ }
+}
+
/* BULK response handler */
PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx)
@@ -1799,12 +1829,15 @@ static void cluster_mbulk_variant_resp(clusterReply *r, zval *z_ret)
add_next_index_long(z_ret, r->integer);
break;
case TYPE_LINE:
- add_next_index_bool(z_ret, 1);
+ if (r->str) {
+ add_next_index_stringl(z_ret, r->str, r->len);
+ } else {
+ add_next_index_bool(z_ret, 1);
+ }
break;
case TYPE_BULK:
if (r->len > -1) {
add_next_index_stringl(z_ret, r->str, r->len);
- efree(r->str);
} else {
add_next_index_null(z_ret);
}
@@ -1827,15 +1860,16 @@ static void cluster_mbulk_variant_resp(clusterReply *r, zval *z_ret)
/* Variant response handling, for things like EVAL and various other responses
* where we just map the replies from Redis type values to PHP ones directly. */
-PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
- void *ctx)
+static void
+cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ int status_strings, void *ctx)
{
clusterReply *r;
zval zv, *z_arr = &zv;
int i;
// Make sure we can read it
- if ((r = cluster_read_resp(c TSRMLS_CC)) == NULL) {
+ if ((r = cluster_read_resp(c, status_strings TSRMLS_CC)) == NULL) {
CLUSTER_RETURN_FALSE(c);
}
@@ -1849,7 +1883,11 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust
RETVAL_FALSE;
break;
case TYPE_LINE:
- RETVAL_TRUE;
+ if (status_strings) {
+ RETVAL_STRINGL(r->str, r->len);
+ } else {
+ RETVAL_TRUE;
+ }
break;
case TYPE_BULK:
if (r->len < 0) {
@@ -1879,14 +1917,17 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust
add_next_index_bool(&c->multi_resp, 0);
break;
case TYPE_LINE:
- add_next_index_bool(&c->multi_resp, 1);
+ if (status_strings) {
+ add_next_index_stringl(&c->multi_resp, r->str, r->len);
+ } else {
+ add_next_index_bool(&c->multi_resp, 1);
+ }
break;
case TYPE_BULK:
if (r->len < 0) {
add_next_index_null(&c->multi_resp);
} else {
add_next_index_stringl(&c->multi_resp, r->str, r->len);
- efree(r->str);
}
break;
case TYPE_MULTIBULK:
@@ -1899,7 +1940,19 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust
}
// Free our response structs, but not allocated data itself
- cluster_free_reply(r, 0);
+ cluster_free_reply(r, 1);
+}
+
+PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ void *ctx)
+{
+ cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 0, ctx);
+}
+
+PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ void *ctx)
+{
+ cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, ctx);
}
/* Generic MULTI BULK response processor */
@@ -2052,6 +2105,76 @@ PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS, redisC
}
}
+/* XRANGE */
+PHP_REDIS_API void
+cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
+ zval zv, *z_messages = &zv;
+
+ REDIS_MAKE_STD_ZVAL(z_messages);
+ array_init(z_messages);
+
+ c->cmd_sock->serializer = c->flags->serializer;
+ c->cmd_sock->compression = c->flags->compression;
+
+ if (redis_read_stream_messages(c->cmd_sock, c->reply_len, z_messages TSRMLS_CC) < 0) {
+ zval_dtor(z_messages);
+ REDIS_FREE_ZVAL(z_messages);
+ CLUSTER_RETURN_FALSE(c);
+ }
+
+ if (CLUSTER_IS_ATOMIC(c)) {
+ RETVAL_ZVAL(z_messages, 0, 1);
+ } else {
+ add_next_index_zval(&c->multi_resp, z_messages);
+ }
+}
+
+/* XREAD */
+PHP_REDIS_API void
+cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
+ zval zv, *z_streams = &zv;
+
+ REDIS_MAKE_STD_ZVAL(z_streams);
+ array_init(z_streams);
+
+ c->cmd_sock->serializer = c->flags->serializer;
+ c->cmd_sock->compression = c->flags->compression;
+
+ if (redis_read_stream_messages_multi(c->cmd_sock, c->reply_len, z_streams TSRMLS_CC) < 0) {
+ zval_dtor(z_streams);
+ REDIS_FREE_ZVAL(z_streams);
+ CLUSTER_RETURN_FALSE(c);
+ }
+
+ if (CLUSTER_IS_ATOMIC(c)) {
+ RETVAL_ZVAL(z_streams, 0, 1);
+ } else {
+ add_next_index_zval(&c->multi_resp, z_streams);
+ }
+}
+
+/* XCLAIM */
+PHP_REDIS_API void
+cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
+ zval zv, *z_msg = &zv;
+
+ REDIS_MAKE_STD_ZVAL(z_msg);
+ array_init(z_msg);
+
+ if (redis_read_xclaim_response(c->cmd_sock, c->reply_len, z_msg TSRMLS_CC) < 0) {
+ zval_dtor(z_msg);
+ REDIS_FREE_ZVAL(z_msg);
+ CLUSTER_RETURN_FALSE(c);
+ }
+
+ if (CLUSTER_IS_ATOMIC(c)) {
+ RETVAL_ZVAL(z_msg, 0, 1);
+ } else {
+ add_next_index_zval(&c->multi_resp, z_msg);
+ }
+
+}
+
/* MULTI BULK response loop where we might pull the next one */
PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, int pull, mbulk_cb cb, zval *z_ret)
diff --git a/cluster_library.h b/cluster_library.h
index 49285da7..01457832 100644
--- a/cluster_library.h
+++ b/cluster_library.h
@@ -230,7 +230,7 @@ typedef struct redisCluster {
/* One RedisSock struct for serialization and prefix information */
RedisSock *flags;
- /* The first line of our last reply, not including our reply type byte
+ /* The first line of our last reply, not including our reply type byte
* or the trailing \r\n */
char line_reply[1024];
@@ -283,7 +283,7 @@ typedef struct clusterDistList {
size_t len, size;
} clusterDistList;
-/* Context for things like MGET/MSET/MSETNX. When executing in MULTI mode,
+/* Context for things like MGET/MSET/MSETNX. When executing in MULTI mode,
* we'll want to re-integrate into one running array, except for the last
* command execution, in which we'll want to return the value (or add it) */
typedef struct clusterMultiCtx {
@@ -325,17 +325,17 @@ typedef struct clusterReply {
} clusterReply;
/* Direct variant response handler */
-clusterReply *cluster_read_resp(redisCluster *c TSRMLS_DC);
-clusterReply *cluster_read_sock_resp(RedisSock *redis_sock,
- REDIS_REPLY_TYPE type, size_t reply_len TSRMLS_DC);
+clusterReply *cluster_read_resp(redisCluster *c, int status_strings TSRMLS_DC);
+clusterReply *cluster_read_sock_resp(RedisSock *redis_sock,
+ REDIS_REPLY_TYPE type, char *line_reply, size_t reply_len TSRMLS_DC);
void cluster_free_reply(clusterReply *reply, int free_data);
/* Cluster distribution helpers for WATCH */
HashTable *cluster_dist_create();
void cluster_dist_free(HashTable *ht);
-int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key,
+int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key,
strlen_t key_len, clusterKeyVal **kv);
-void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *val
+void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *val
TSRMLS_DC);
/* Aggregation for multi commands like MGET, MSET, and MSETNX */
@@ -351,7 +351,7 @@ unsigned short cluster_hash_key(const char *key, int len);
/* Get the current time in miliseconds */
long long mstime(void);
-PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char *cmd,
+PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char *cmd,
int cmd_len TSRMLS_DC);
PHP_REDIS_API void cluster_disconnect(redisCluster *c, int force TSRMLS_DC);
@@ -363,7 +363,7 @@ PHP_REDIS_API int cluster_reset_multi(redisCluster *c);
PHP_REDIS_API short cluster_find_slot(redisCluster *c, const char *host,
unsigned short port);
-PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd,
+PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd,
int cmd_len, REDIS_REPLY_TYPE rtype TSRMLS_DC);
PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout,
@@ -379,28 +379,30 @@ PHP_REDIS_API char **cluster_sock_read_multibulk_reply(RedisSock *redis_sock,
/*
* Redis Cluster response handlers. Our response handlers generally take the
* following form:
- * PHP_REDIS_API void handler(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ * PHP_REDIS_API void handler(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
* void *ctx)
*
* Reply handlers are responsible for setting the PHP return value (either to
* something valid, or FALSE in the case of some failures).
*/
-PHP_REDIS_API void cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+PHP_REDIS_API void cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
PHP_REDIS_API void cluster_ping_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
-PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+PHP_REDIS_API void cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
-PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
-PHP_REDIS_API void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
-PHP_REDIS_API void cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+PHP_REDIS_API void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
-PHP_REDIS_API void cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+PHP_REDIS_API void cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
-PHP_REDIS_API void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+PHP_REDIS_API void cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ void *ctx);
+PHP_REDIS_API void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx);
@@ -411,28 +413,31 @@ PHP_REDIS_API void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster
PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
+PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS,
+ redisCluster *c, void *ctx);
+
/* MULTI BULK response functions */
-PHP_REDIS_API void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHP_REDIS_API void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, mbulk_cb func, void *ctx);
-PHP_REDIS_API void cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHP_REDIS_API void cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
-PHP_REDIS_API void cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHP_REDIS_API void cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
PHP_REDIS_API void cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
PHP_REDIS_API void cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
-PHP_REDIS_API void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHP_REDIS_API void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
PHP_REDIS_API void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
-PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, int pull, mbulk_cb cb, zval *z_ret);
/* Handlers for things like DEL/MGET/MSET/MSETNX */
-PHP_REDIS_API void cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHP_REDIS_API void cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
-PHP_REDIS_API void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHP_REDIS_API void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
PHP_REDIS_API void cluster_mset_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
@@ -448,13 +453,21 @@ PHP_REDIS_API void cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
/* CLIENT LIST response handler */
-PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS,
+PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS,
+ redisCluster *c, void *ctx);
+
+/* Custom STREAM handlers */
+PHP_REDIS_API void cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS,
+ redisCluster *c, void *ctx);
+PHP_REDIS_API void cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS,
+ redisCluster *c, void *ctx);
+PHP_REDIS_API void cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, void *ctx);
/* MULTI BULK processing callbacks */
-int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result,
+int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result,
long long count, void *ctx TSRMLS_DC);
-int mbulk_resp_loop_raw(RedisSock *redis_sock, zval *z_result,
+int mbulk_resp_loop_raw(RedisSock *redis_sock, zval *z_result,
long long count, void *ctx TSRMLS_DC);
int mbulk_resp_loop_zipstr(RedisSock *redis_sock, zval *z_result,
long long count, void *ctx TSRMLS_DC);
diff --git a/common.h b/common.h
index c52dd6d8..a1dbf28a 100644
--- a/common.h
+++ b/common.h
@@ -22,6 +22,9 @@ typedef struct {
char *val;
} zend_string;
+#define REDIS_MAKE_STD_ZVAL(zv) MAKE_STD_ZVAL(zv)
+#define REDIS_FREE_ZVAL(zv) (efree(zv))
+
#define ZSTR_VAL(s) (s)->val
#define ZSTR_LEN(s) (s)->len
@@ -432,6 +435,9 @@ typedef int strlen_t;
typedef size_t strlen_t;
#define PHPREDIS_ZVAL_IS_STRICT_FALSE(z) (Z_TYPE_P(z) == IS_FALSE)
#define PHPREDIS_GET_OBJECT(class_entry, z) (class_entry *)((char *)Z_OBJ_P(z) - XtOffsetOf(class_entry, std))
+
+#define REDIS_MAKE_STD_ZVAL(zv) do {} while(0)
+#define REDIS_FREE_ZVAL(zv) do {} while(0)
#endif
/* NULL check so Eclipse doesn't go crazy */
@@ -1084,4 +1090,82 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_georadiusbymember, 0, 0, 4)
ZEND_ARG_ARRAY_INFO(0, opts, 0)
ZEND_END_ARG_INFO()
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xadd, 0, 0, 3)
+ ZEND_ARG_INFO(0, str_key)
+ ZEND_ARG_INFO(0, str_id)
+ ZEND_ARG_ARRAY_INFO(0, arr_fields, 0)
+ ZEND_ARG_INFO(0, i_maxlen)
+ ZEND_ARG_INFO(0, boo_approximate)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xpending, 0, 0, 2)
+ ZEND_ARG_INFO(0, str_key)
+ ZEND_ARG_INFO(0, str_group)
+ ZEND_ARG_INFO(0, str_start)
+ ZEND_ARG_INFO(0, str_end)
+ ZEND_ARG_INFO(0, i_count)
+ ZEND_ARG_INFO(0, str_consumer)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xrange, 0, 0, 3)
+ ZEND_ARG_INFO(0, str_key)
+ ZEND_ARG_INFO(0, str_start)
+ ZEND_ARG_INFO(0, str_end)
+ ZEND_ARG_INFO(0, i_count)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xread, 0, 0, 1)
+ ZEND_ARG_ARRAY_INFO(0, arr_streams, 0)
+ ZEND_ARG_INFO(0, i_count)
+ ZEND_ARG_INFO(0, i_block)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xreadgroup, 0, 0, 3)
+ ZEND_ARG_INFO(0, str_group)
+ ZEND_ARG_INFO(0, str_consumer)
+ ZEND_ARG_ARRAY_INFO(0, arr_streams, 0)
+ ZEND_ARG_INFO(0, i_count)
+ ZEND_ARG_INFO(0, i_block)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xack, 0, 0, 3)
+ ZEND_ARG_INFO(0, str_key)
+ ZEND_ARG_INFO(0, str_group)
+ ZEND_ARG_ARRAY_INFO(0, arr_ids, 0)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xclaim, 0, 0, 5)
+ ZEND_ARG_INFO(0, str_key)
+ ZEND_ARG_INFO(0, str_group)
+ ZEND_ARG_INFO(0, str_consumer)
+ ZEND_ARG_INFO(0, i_min_idle)
+ ZEND_ARG_ARRAY_INFO(0, arr_ids, 0)
+ ZEND_ARG_ARRAY_INFO(0, arr_opts, 0)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xgroup, 0, 0, 1)
+ ZEND_ARG_INFO(0, str_operation)
+ ZEND_ARG_INFO(0, str_key)
+ ZEND_ARG_INFO(0, str_arg1)
+ ZEND_ARG_INFO(0, str_arg2)
+ ZEND_ARG_INFO(0, str_arg3)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xinfo, 0, 0, 1)
+ ZEND_ARG_INFO(0, str_cmd)
+ ZEND_ARG_INFO(0, str_key)
+ ZEND_ARG_INFO(0, str_group)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xtrim, 0, 0, 2)
+ ZEND_ARG_INFO(0, str_key)
+ ZEND_ARG_INFO(0, i_maxlen)
+ ZEND_ARG_INFO(0, boo_approximate)
+ZEND_END_ARG_INFO()
+
+ZEND_BEGIN_ARG_INFO_EX(arginfo_xdel, 0, 0, 2)
+ ZEND_ARG_INFO(0, str_key)
+ ZEND_ARG_ARRAY_INFO(0, arr_ids, 0)
+ZEND_END_ARG_INFO()
+
#endif
diff --git a/library.c b/library.c
index 6fc88dc8..1d9f2ec4 100644
--- a/library.c
+++ b/library.c
@@ -114,6 +114,13 @@ static int resend_auth(RedisSock *redis_sock TSRMLS_DC) {
return 0;
}
+/* Helper function and macro to test a RedisSock error prefix. */
+#define REDIS_SOCK_ERRCMP_STATIC(rs, s) redis_sock_errcmp(rs, s, sizeof(s)-1)
+static int redis_sock_errcmp(RedisSock *redis_sock, const char *err, size_t errlen) {
+ return ZSTR_LEN(redis_sock->err) >= errlen &&
+ memcmp(ZSTR_VAL(redis_sock->err), err, errlen) == 0;
+}
+
/* Helper function that will throw an exception for a small number of ERR codes
* returned by Redis. Typically we just return FALSE to the caller in the event
* of an ERROR reply, but for the following error types:
@@ -124,11 +131,19 @@ static int resend_auth(RedisSock *redis_sock TSRMLS_DC) {
static void
redis_error_throw(RedisSock *redis_sock TSRMLS_DC)
{
- if (redis_sock != NULL && redis_sock->err != NULL &&
- memcmp(ZSTR_VAL(redis_sock->err), "ERR", sizeof("ERR") - 1) != 0 &&
- memcmp(ZSTR_VAL(redis_sock->err), "NOSCRIPT", sizeof("NOSCRIPT") - 1) != 0 &&
- memcmp(ZSTR_VAL(redis_sock->err), "WRONGTYPE", sizeof("WRONGTYPE") - 1) != 0
- ) {
+ /* Short circuit if we have no redis_sock or any error */
+ if (redis_sock == NULL || redis_sock->err == NULL)
+ return;
+
+ /* We may want to flip this logic and check for MASTERDOWN, AUTH,
+ * and LOADING but that may have side effects (esp for things like
+ * Disque) */
+ if (!REDIS_SOCK_ERRCMP_STATIC(redis_sock, "ERR") &&
+ !REDIS_SOCK_ERRCMP_STATIC(redis_sock, "NOSCRIPT") &&
+ !REDIS_SOCK_ERRCMP_STATIC(redis_sock, "WRONGTYPE") &&
+ !REDIS_SOCK_ERRCMP_STATIC(redis_sock, "BUSYGROUP") &&
+ !REDIS_SOCK_ERRCMP_STATIC(redis_sock, "NOGROUP"))
+ {
zend_throw_exception(redis_exception_ce, ZSTR_VAL(redis_sock->err), 0 TSRMLS_CC);
}
}
@@ -442,8 +457,7 @@ redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAMETERS,
array_init(z_tab);
- redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab,
- numElems, UNSERIALIZE_ALL);
+ redis_mbulk_reply_loop(redis_sock, z_tab, numElems, UNSERIALIZE_ALL TSRMLS_CC);
return z_tab;
}
@@ -730,6 +744,24 @@ int redis_cmd_append_sstr_key(smart_string *str, char *key, strlen_t len, RedisS
return retval;
}
+/* Append an array key to a redis smart string command. This function
+ * handles the boilerplate conditionals around string or integer keys */
+int redis_cmd_append_sstr_arrkey(smart_string *cmd, zend_string *kstr, ulong idx)
+{
+ char *arg, kbuf[128];
+ int len;
+
+ if (kstr) {
+ len = ZSTR_LEN(kstr);
+ arg = ZSTR_VAL(kstr);
+ } else {
+ len = snprintf(kbuf, sizeof(kbuf), "%ld", (long)idx);
+ arg = (char*)kbuf;
+ }
+
+ return redis_cmd_append_sstr(cmd, arg, len);
+}
+
PHP_REDIS_API void redis_bulk_double_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx) {
char *response;
@@ -1137,6 +1169,30 @@ static void array_zip_values_and_scores(RedisSock *redis_sock, zval *z_tab,
}
static int
+read_mbulk_header(RedisSock *redis_sock, int *nelem TSRMLS_DC)
+{
+ char line[4096];
+ size_t len;
+
+ /* Throws exception on failure */
+ if (redis_sock_gets(redis_sock, line, sizeof(line)-1, &len TSRMLS_CC) < 0)
+ return -1;
+
+ if (line[0] != '*') {
+ if (IS_ATOMIC(redis_sock)) {
+ if (line[0] == '-') {
+ redis_sock_set_err(redis_sock, line+1, len-1);
+ }
+ }
+ return -1;
+ }
+
+ *nelem = atoi(line+1);
+
+ return 0;
+}
+
+static int
redis_mbulk_reply_zipped(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
zval *z_tab, int unserialize, int decode)
{
@@ -1164,8 +1220,7 @@ redis_mbulk_reply_zipped(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
array_init(z_multi_result); /* pre-allocate array for multi's results. */
/* Grab our key, value, key, value array */
- redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock,
- z_multi_result, numElems, unserialize);
+ redis_mbulk_reply_loop(redis_sock, z_multi_result, numElems, unserialize TSRMLS_CC);
/* Zip keys and values */
array_zip_values_and_scores(redis_sock, z_multi_result, decode TSRMLS_CC);
@@ -1179,6 +1234,243 @@ redis_mbulk_reply_zipped(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return 0;
}
+/* Consume message ID */
+PHP_REDIS_API int
+redis_sock_read_single_line(RedisSock *redis_sock, char *buffer, size_t buflen,
+ size_t *linelen, int set_err TSRMLS_DC)
+{
+ REDIS_REPLY_TYPE type;
+ long info;
+
+ if (redis_read_reply_type(redis_sock, &type, &info TSRMLS_CC) < 0 ||
+ (type != TYPE_LINE && type != TYPE_ERR))
+ {
+ return -1;
+ }
+
+ if (redis_sock_gets(redis_sock, buffer, buflen, linelen TSRMLS_CC) < 0) {
+ return -1;
+ }
+
+ if (set_err && type == TYPE_ERR) {
+ if (IS_ATOMIC(redis_sock)) {
+ redis_sock_set_err(redis_sock, buffer, *linelen);
+ }
+ }
+
+ return type == TYPE_LINE ? 0 : -1;
+}
+
+/* Helper function to consume Redis stream message data. This is useful for
+ * multiple stream callers (e.g. XREAD[GROUP], and X[REV]RANGE handlers). */
+PHP_REDIS_API int
+redis_read_stream_messages(RedisSock *redis_sock, int count, zval *z_ret
+ TSRMLS_DC)
+{
+ zval zv, *z_message = &zv;
+ int i, mhdr, fields;
+ char id[1024];
+ size_t idlen;
+
+ /* Iterate over each message */
+ for (i = 0; i < count; i++) {
+ /* Consume inner multi-bulk header, message ID itself and finaly
+ * the multi-bulk header for field and values */
+ if ((read_mbulk_header(redis_sock, &mhdr TSRMLS_CC) < 0 || mhdr != 2) ||
+ redis_sock_read_single_line(redis_sock, id, sizeof(id), &idlen, 0 TSRMLS_CC) < 0 ||
+ (read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0))
+ {
+ return -1;
+ }
+
+ REDIS_MAKE_STD_ZVAL(z_message);
+ array_init(z_message);
+
+ redis_mbulk_reply_loop(redis_sock, z_message, fields, UNSERIALIZE_VALS TSRMLS_CC);
+ array_zip_values_and_scores(redis_sock, z_message, SCORE_DECODE_NONE TSRMLS_CC);
+ add_assoc_zval_ex(z_ret, id, idlen, z_message);
+ }
+
+ return 0;
+}
+
+PHP_REDIS_API int
+redis_xrange_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ zval *z_tab, void *ctx)
+{
+ int messages;
+ zval zv, *z_messages = &zv;
+
+ REDIS_MAKE_STD_ZVAL(z_messages);
+ array_init(z_messages);
+
+ if (read_mbulk_header(redis_sock, &messages TSRMLS_CC) < 0 ||
+ redis_read_stream_messages(redis_sock, messages, z_messages TSRMLS_CC) < 0)
+ {
+ zval_dtor(z_messages);
+ REDIS_FREE_ZVAL(z_messages);
+ if (IS_ATOMIC(redis_sock)) {
+ RETVAL_FALSE;
+ } else {
+ add_next_index_bool(z_tab, 0);
+ }
+ return -1;
+ }
+
+ if (IS_ATOMIC(redis_sock)) {
+ RETVAL_ZVAL(z_messages, 0, 1);
+ } else {
+ add_next_index_zval(z_tab, z_messages);
+ }
+
+ return 0;
+}
+
+PHP_REDIS_API int
+redis_read_stream_messages_multi(RedisSock *redis_sock, int count, zval *z_streams
+ TSRMLS_DC)
+{
+ zval zv, *z_messages = &zv;
+ int i, shdr, messages;
+ char *id;
+ int idlen;
+
+ for (i = 0; i < count; i++) {
+ if ((read_mbulk_header(redis_sock, &shdr TSRMLS_CC) < 0 || shdr != 2) ||
+ (id = redis_sock_read(redis_sock, &idlen TSRMLS_CC)) == NULL ||
+ read_mbulk_header(redis_sock, &messages TSRMLS_CC) < 0)
+ {
+ if (id) efree(id);
+ return -1;
+ }
+
+ REDIS_MAKE_STD_ZVAL(z_messages);
+ array_init(z_messages);
+
+ if (redis_read_stream_messages(redis_sock, messages, z_messages TSRMLS_CC) < 0)
+ goto failure;
+
+ add_assoc_zval_ex(z_streams, id, idlen, z_messages);
+ efree(id);
+ }
+
+ return 0;
+failure:
+ efree(id);
+ zval_dtor(z_messages);
+ REDIS_FREE_ZVAL(z_messages);
+ return -1;
+}
+
+PHP_REDIS_API int
+redis_xread_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ zval *z_tab, void *ctx)
+{
+ zval zv, *z_rv = &zv;
+ int streams;
+
+ if (read_mbulk_header(redis_sock, &streams TSRMLS_CC) < 0)
+ goto failure;
+
+ REDIS_MAKE_STD_ZVAL(z_rv);
+ array_init(z_rv);
+
+ if (redis_read_stream_messages_multi(redis_sock, streams, z_rv TSRMLS_CC) < 0)
+ goto cleanup;
+
+ if (IS_ATOMIC(redis_sock)) {
+ RETVAL_ZVAL(z_rv, 0, 1);
+ } else {
+ add_next_index_zval(z_tab, z_rv);
+ }
+ return 0;
+
+cleanup:
+ zval_dtor(z_rv);
+ REDIS_FREE_ZVAL(z_rv);
+failure:
+ if (IS_ATOMIC(redis_sock)) {
+ RETVAL_FALSE;
+ } else {
+ add_next_index_bool(z_tab, 0);
+ }
+ return -1;
+}
+
+/* This helper function does that actual XCLAIM response handling, which can be used by both
+ * Redis and RedisCluster. Note that XCLAIM is somewhat unique in that its reply type depends
+ * on whether or not it was called with the JUSTID option */
+PHP_REDIS_API int
+redis_read_xclaim_response(RedisSock *redis_sock, int count, zval *rv TSRMLS_DC) {
+ zval zv, *z_msg = &zv;
+ REDIS_REPLY_TYPE type;
+ char id[1024];
+ int i, fields;
+ long li;
+ size_t idlen;
+
+ for (i = 0; i < count; i++) {
+ /* Consume inner reply type */
+ if (redis_read_reply_type(redis_sock, &type, &li TSRMLS_CC) < 0 ||
+ (type != TYPE_LINE && type != TYPE_MULTIBULK)) return -1;
+
+ if (type == TYPE_LINE) {
+ /* JUSTID variant */
+ if (redis_sock_gets(redis_sock, id, sizeof(id), &idlen TSRMLS_CC) < 0)
+ return -1;
+ add_next_index_stringl(rv, id, idlen);
+ } else {
+ if (li != 2 || redis_sock_read_single_line(redis_sock, id, sizeof(id), &idlen, 0 TSRMLS_CC) < 0 ||
+ (read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0)) return -1;
+
+ REDIS_MAKE_STD_ZVAL(z_msg);
+ array_init(z_msg);
+
+ redis_mbulk_reply_loop(redis_sock, z_msg, fields, UNSERIALIZE_VALS TSRMLS_CC);
+ array_zip_values_and_scores(redis_sock, z_msg, SCORE_DECODE_NONE TSRMLS_CC);
+ add_assoc_zval_ex(rv, id, idlen, z_msg);
+ }
+ }
+
+ return 0;
+}
+
+PHP_REDIS_API int
+redis_xclaim_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ zval *z_tab, void *ctx)
+{
+ zval zv, *z_ret = &zv;
+ int messages;
+
+ /* All XCLAIM responses start multibulk */
+ if (read_mbulk_header(redis_sock, &messages TSRMLS_CC) < 0)
+ goto failure;
+
+ REDIS_MAKE_STD_ZVAL(z_ret);
+ array_init(z_ret);
+
+ if (redis_read_xclaim_response(redis_sock, messages, z_ret TSRMLS_CC) < 0) {
+ zval_dtor(z_ret);
+ REDIS_FREE_ZVAL(z_ret);
+ goto failure;
+ }
+
+ if (IS_ATOMIC(redis_sock)) {
+ RETVAL_ZVAL(z_ret, 0, 1);
+ } else {
+ add_next_index_zval(z_tab, z_ret);
+ }
+ return 0;
+
+failure:
+ if (IS_ATOMIC(redis_sock)) {
+ RETVAL_FALSE;
+ } else {
+ add_next_index_bool(z_tab, 0);
+ }
+ return -1;
+}
+
/* Zipped key => value reply but we don't touch anything (e.g. CONFIG GET) */
PHP_REDIS_API int redis_mbulk_reply_zipped_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx)
{
@@ -1262,6 +1554,30 @@ PHP_REDIS_API void redis_string_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock
efree(response);
}
+PHP_REDIS_API
+void redis_single_line_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ zval *z_tab, void *ctx)
+{
+ char buffer[4096];
+ size_t len;
+
+ if (redis_sock_read_single_line(redis_sock, buffer, sizeof(buffer), &len, 1 TSRMLS_CC) < 0) {
+ if (IS_ATOMIC(redis_sock)) {
+ RETURN_FALSE;
+ } else {
+ add_next_index_bool(z_tab, 0);
+ }
+ return;
+ }
+
+ //str = estrndup(buffer, len);
+ if (IS_ATOMIC(redis_sock)) {
+ RETVAL_STRINGL(buffer, len);
+ } else {
+ add_next_index_stringl(z_tab, buffer, len);
+ }
+}
+
/* like string response, but never unserialized. */
PHP_REDIS_API void
redis_ping_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
@@ -1597,8 +1913,7 @@ PHP_REDIS_API int redis_sock_read_multibulk_reply(INTERNAL_FUNCTION_PARAMETERS,
#endif
array_init(z_multi_result); /* pre-allocate array for multi's results. */
- redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock,
- z_multi_result, numElems, UNSERIALIZE_ALL);
+ redis_mbulk_reply_loop(redis_sock, z_multi_result, numElems, UNSERIALIZE_ALL TSRMLS_CC);
if (IS_ATOMIC(redis_sock)) {
RETVAL_ZVAL(z_multi_result, 0, 1);
@@ -1640,8 +1955,7 @@ redis_mbulk_reply_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval
#endif
array_init(z_multi_result); /* pre-allocate array for multi's results. */
- redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock,
- z_multi_result, numElems, UNSERIALIZE_NONE);
+ redis_mbulk_reply_loop(redis_sock, z_multi_result, numElems, UNSERIALIZE_NONE TSRMLS_CC);
if (IS_ATOMIC(redis_sock)) {
RETVAL_ZVAL(z_multi_result, 0, 1);
@@ -1653,8 +1967,8 @@ redis_mbulk_reply_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval
}
PHP_REDIS_API void
-redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
- zval *z_tab, int count, int unserialize)
+redis_mbulk_reply_loop(RedisSock *redis_sock, zval *z_tab, int count,
+ int unserialize TSRMLS_DC)
{
char *line;
int i, len;
@@ -2090,32 +2404,27 @@ redis_read_reply_type(RedisSock *redis_sock, REDIS_REPLY_TYPE *reply_type,
/*
* Read a single line response, having already consumed the reply-type byte
*/
-PHP_REDIS_API int
+static int
redis_read_variant_line(RedisSock *redis_sock, REDIS_REPLY_TYPE reply_type,
- zval *z_ret TSRMLS_DC)
+ int as_string, zval *z_ret TSRMLS_DC)
{
// Buffer to read our single line reply
char inbuf[4096];
- size_t line_size;
+ size_t len;
/* Attempt to read our single line reply */
- if(redis_sock_gets(redis_sock, inbuf, sizeof(inbuf), &line_size TSRMLS_CC) < 0) {
+ if(redis_sock_gets(redis_sock, inbuf, sizeof(inbuf), &len TSRMLS_CC) < 0) {
return -1;
}
- // If this is an error response, check if it is a SYNC error, and throw in
- // that case
+ /* Throw exception on SYNC error otherwise just set error string */
if(reply_type == TYPE_ERR) {
- /* Set our last error */
- redis_sock_set_err(redis_sock, inbuf, line_size);
-
- /* Handle throwable errors */
+ redis_sock_set_err(redis_sock, inbuf, len);
redis_error_throw(redis_sock TSRMLS_CC);
-
- /* Set our response to FALSE */
ZVAL_FALSE(z_ret);
+ } else if (as_string) {
+ ZVAL_STRINGL(z_ret, inbuf, len);
} else {
- /* Set our response to TRUE */
ZVAL_TRUE(z_ret);
}
@@ -2140,8 +2449,8 @@ redis_read_variant_bulk(RedisSock *redis_sock, int size, zval *z_ret
}
PHP_REDIS_API int
-redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret
- TSRMLS_DC)
+redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, int status_strings,
+ zval *z_ret TSRMLS_DC)
{
long reply_info;
REDIS_REPLY_TYPE reply_type;
@@ -2166,8 +2475,8 @@ redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret
#if (PHP_MAJOR_VERSION < 7)
ALLOC_INIT_ZVAL(z_subelem);
#endif
- redis_read_variant_line(redis_sock, reply_type, z_subelem
- TSRMLS_CC);
+ redis_read_variant_line(redis_sock, reply_type, status_strings,
+ z_subelem TSRMLS_CC);
add_next_index_zval(z_ret, z_subelem);
break;
case TYPE_INT:
@@ -2192,7 +2501,7 @@ redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret
array_init(z_subelem);
add_next_index_zval(z_ret, z_subelem);
redis_read_multibulk_recursive(redis_sock, reply_info,
- z_subelem TSRMLS_CC);
+ status_strings, z_subelem TSRMLS_CC);
break;
default:
// Stop the compiler from whinging
@@ -2206,9 +2515,9 @@ redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret
return 0;
}
-PHP_REDIS_API int
-redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
- zval *z_tab, void *ctx)
+static int
+variant_reply_generic(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ int status_strings, zval *z_tab, void *ctx)
{
// Reply type, and reply size vars
REDIS_REPLY_TYPE reply_type;
@@ -2229,7 +2538,7 @@ redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
switch(reply_type) {
case TYPE_ERR:
case TYPE_LINE:
- redis_read_variant_line(redis_sock, reply_type, z_ret TSRMLS_CC);
+ redis_read_variant_line(redis_sock, reply_type, status_strings, z_ret TSRMLS_CC);
break;
case TYPE_INT:
ZVAL_LONG(z_ret, reply_info);
@@ -2243,9 +2552,8 @@ redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
// If we've got more than zero elements, parse our multi bulk
// response recursively
- if(reply_info > -1) {
- redis_read_multibulk_recursive(redis_sock, reply_info, z_ret
- TSRMLS_CC);
+ if (reply_info > -1) {
+ redis_read_multibulk_recursive(redis_sock, reply_info, status_strings, z_ret TSRMLS_CC);
}
break;
default:
@@ -2269,4 +2577,18 @@ redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return 0;
}
+PHP_REDIS_API int
+redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ zval *z_tab, void *ctx)
+{
+ return variant_reply_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, 0, z_tab, ctx);
+}
+
+PHP_REDIS_API int
+redis_read_variant_reply_strings(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ zval *z_tab, void *ctx)
+{
+ return variant_reply_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, 1, z_tab, ctx);
+}
+
/* vim: set tabstop=4 softtabstop=4 expandtab shiftwidth=4: */
diff --git a/library.h b/library.h
index 2c3e5da7..1c936dd5 100644
--- a/library.h
+++ b/library.h
@@ -21,6 +21,7 @@ int redis_cmd_append_sstr_long(smart_string *str, long append);
int redis_cmd_append_sstr_dbl(smart_string *str, double value);
int redis_cmd_append_sstr_zval(smart_string *str, zval *z, RedisSock *redis_sock TSRMLS_DC);
int redis_cmd_append_sstr_key(smart_string *str, char *key, strlen_t len, RedisSock *redis_sock, short *slot);
+int redis_cmd_append_sstr_arrkey(smart_string *cmd, zend_string *kstr, ulong idx);
PHP_REDIS_API int redis_spprintf(RedisSock *redis_sock, short *slot TSRMLS_DC, char **ret, char *kw, char *fmt, ...);
@@ -33,6 +34,8 @@ PHP_REDIS_API void redis_boolean_response_impl(INTERNAL_FUNCTION_PARAMETERS, Red
PHP_REDIS_API void redis_boolean_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API void redis_bulk_double_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API void redis_string_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
+PHP_REDIS_API void redis_single_line_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ zval *z_tab, void *ctx);
PHP_REDIS_API void redis_ping_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API void redis_info_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API void redis_parse_info_response(char *response, zval *z_ret);
@@ -43,9 +46,13 @@ PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC);
PHP_REDIS_API int redis_sock_server_open(RedisSock *redis_sock TSRMLS_DC);
PHP_REDIS_API int redis_sock_disconnect(RedisSock *redis_sock, int force TSRMLS_DC);
PHP_REDIS_API zval *redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab);
+PHP_REDIS_API int redis_sock_read_single_line(RedisSock *redis_sock, char *buffer,
+ size_t buflen, size_t *linelen, int set_err TSRMLS_DC);
PHP_REDIS_API char *redis_sock_read_bulk_reply(RedisSock *redis_sock, int bytes TSRMLS_DC);
PHP_REDIS_API int redis_sock_read_multibulk_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *_z_tab, void *ctx);
-PHP_REDIS_API void redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, int count, int unserialize);
+//PHP_REDIS_API void redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, int count, int unserialize);
+PHP_REDIS_API void redis_mbulk_reply_loop(RedisSock *redis_sock, zval *z_tab, int count, int unserialize TSRMLS_DC);
+
PHP_REDIS_API int redis_mbulk_reply_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API int redis_mbulk_reply_zipped_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
@@ -56,7 +63,15 @@ PHP_REDIS_API int redis_mbulk_reply_assoc(INTERNAL_FUNCTION_PARAMETERS, RedisSoc
PHP_REDIS_API int redis_sock_read_scan_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, REDIS_SCAN_TYPE type, zend_long *iter);
-PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
+
+PHP_REDIS_API int redis_xrange_reply(INTERNAL_FUNCTION_PARAMETERS,
+ RedisSock *redis_sock, zval *z_tab, void *ctx);
+PHP_REDIS_API int redis_xread_reply(INTERNAL_FUNCTION_PARAMETERS,
+ RedisSock *redis_sock, zval *z_tab, void *ctx);
+PHP_REDIS_API int redis_xclaim_reply(INTERNAL_FUNCTION_PARAMETERS,
+ RedisSock *redis_sock, zval *z_tab, void *ctx);
+
+PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS,
RedisSock *redis_sock, zval *z_tab, void *ctx);
@@ -78,16 +93,22 @@ redis_unserialize(RedisSock *redis_sock, const char *val, int val_len, zval *z_r
PHP_REDIS_API int redis_pack(RedisSock *redis_sock, zval *z, char **val, strlen_t *val_len TSRMLS_DC);
PHP_REDIS_API int redis_unpack(RedisSock *redis_sock, const char *val, int val_len, zval *z_ret TSRMLS_DC);
+PHP_REDIS_API int
+redis_read_stream_messages(RedisSock *redis_sock, int count, zval *z_ret TSRMLS_DC);
+PHP_REDIS_API int
+redis_read_stream_messages_multi(RedisSock *redis_sock, int count, zval *z_ret TSRMLS_DC);
+PHP_REDIS_API int
+redis_read_xclaim_response(RedisSock *redis_sock, int count, zval *rv TSRMLS_DC);
+
/*
* Variant Read methods, mostly to implement eval
*/
PHP_REDIS_API int redis_read_reply_type(RedisSock *redis_sock, REDIS_REPLY_TYPE *reply_type, long *reply_info TSRMLS_DC);
-PHP_REDIS_API int redis_read_variant_line(RedisSock *redis_sock, REDIS_REPLY_TYPE reply_type, zval *z_ret TSRMLS_DC);
PHP_REDIS_API int redis_read_variant_bulk(RedisSock *redis_sock, int size, zval *z_ret TSRMLS_DC);
-PHP_REDIS_API int redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret TSRMLS_DC);
+PHP_REDIS_API int redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, int status_strings, zval *z_ret TSRMLS_DC);
PHP_REDIS_API int redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
-
+PHP_REDIS_API int redis_read_variant_reply_strings(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx);
PHP_REDIS_API void redis_client_list_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab);
#endif
diff --git a/php_redis.h b/php_redis.h
index b076e51b..d85c13ff 100644
--- a/php_redis.h
+++ b/php_redis.h
@@ -222,6 +222,21 @@ PHP_METHOD(Redis, pfadd);
PHP_METHOD(Redis, pfcount);
PHP_METHOD(Redis, pfmerge);
+/* STREAMS */
+PHP_METHOD(Redis, xack);
+PHP_METHOD(Redis, xadd);
+PHP_METHOD(Redis, xclaim);
+PHP_METHOD(Redis, xdel);
+PHP_METHOD(Redis, xgroup);
+PHP_METHOD(Redis, xinfo);
+PHP_METHOD(Redis, xlen);
+PHP_METHOD(Redis, xpending);
+PHP_METHOD(Redis, xrange);
+PHP_METHOD(Redis, xread);
+PHP_METHOD(Redis, xreadgroup);
+PHP_METHOD(Redis, xrevrange);
+PHP_METHOD(Redis, xtrim);
+
/* Reflection */
PHP_METHOD(Redis, getHost);
PHP_METHOD(Redis, getPort);
diff --git a/redis.c b/redis.c
index f44004fc..e25b65d7 100644
--- a/redis.c
+++ b/redis.c
@@ -402,6 +402,19 @@ static zend_function_entry redis_functions[] = {
PHP_ME(Redis, unwatch, arginfo_void, ZEND_ACC_PUBLIC)
PHP_ME(Redis, wait, arginfo_wait, ZEND_ACC_PUBLIC)
PHP_ME(Redis, watch, arginfo_watch, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xack, arginfo_xack, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xadd, arginfo_xadd, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xclaim, arginfo_xclaim, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xdel, arginfo_xdel, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xgroup, arginfo_xgroup, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xinfo, arginfo_xinfo, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xlen, arginfo_key, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xpending, arginfo_xpending, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xrange, arginfo_xrange, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xread, arginfo_xread, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xreadgroup, arginfo_xreadgroup, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xrevrange, arginfo_xrange, ZEND_ACC_PUBLIC)
+ PHP_ME(Redis, xtrim, arginfo_xtrim, ZEND_ACC_PUBLIC)
PHP_ME(Redis, zAdd, arginfo_zadd, ZEND_ACC_PUBLIC)
PHP_ME(Redis, zCard, arginfo_key, ZEND_ACC_PUBLIC)
PHP_ME(Redis, zCount, arginfo_key_min_max, ZEND_ACC_PUBLIC)
@@ -1413,7 +1426,7 @@ PHP_METHOD(Redis, sAdd)
/* {{{ proto boolean Redis::sAddArray(string key, array $values) */
PHP_METHOD(Redis, sAddArray) {
- REDIS_PROCESS_KW_CMD("SADD", redis_key_arr_cmd, redis_long_response);
+ REDIS_PROCESS_KW_CMD("SADD", redis_key_val_arr_cmd, redis_long_response);
} /* }}} */
/* {{{ proto int Redis::sSize(string key) */
@@ -2441,7 +2454,7 @@ redis_sock_read_multibulk_multi_reply_loop(INTERNAL_FUNCTION_PARAMETERS,
add_next_index_zval(z_tab, z_ret);
int num = atol(inbuf + 1);
- if (num > 0 && redis_read_multibulk_recursive(redis_sock, num, z_ret TSRMLS_CC) < 0) {
+ if (num > 0 && redis_read_multibulk_recursive(redis_sock, num, 0, z_ret TSRMLS_CC) < 0) {
}
if (fi) fi = fi->next;
}
@@ -3573,4 +3586,60 @@ PHP_METHOD(Redis, georadiusbymember) {
REDIS_PROCESS_CMD(georadiusbymember, redis_read_variant_reply);
}
+/*
+ * Streams
+ */
+
+PHP_METHOD(Redis, xack) {
+ REDIS_PROCESS_CMD(xack, redis_long_response);
+}
+
+PHP_METHOD(Redis, xadd) {
+ REDIS_PROCESS_CMD(xadd, redis_single_line_reply);
+}
+
+PHP_METHOD(Redis, xclaim) {
+ REDIS_PROCESS_CMD(xclaim, redis_xclaim_reply);
+}
+
+PHP_METHOD(Redis, xdel) {
+ REDIS_PROCESS_KW_CMD("XDEL", redis_key_str_arr_cmd, redis_long_response);
+}
+
+PHP_METHOD(Redis, xgroup) {
+ REDIS_PROCESS_CMD(xgroup, redis_read_variant_reply);
+}
+
+PHP_METHOD(Redis, xinfo) {
+ REDIS_PROCESS_CMD(xinfo, redis_read_variant_reply);
+}
+
+PHP_METHOD(Redis, xlen) {
+ REDIS_PROCESS_KW_CMD("XLEN", redis_key_cmd, redis_long_response);
+}
+
+PHP_METHOD(Redis, xpending) {
+ REDIS_PROCESS_CMD(xpending, redis_read_variant_reply_strings);
+}
+
+PHP_METHOD(Redis, xrange) {
+ REDIS_PROCESS_KW_CMD("XRANGE", redis_xrange_cmd, redis_xrange_reply);
+}
+
+PHP_METHOD(Redis, xread) {
+ REDIS_PROCESS_CMD(xread, redis_xread_reply);
+}
+
+PHP_METHOD(Redis, xreadgroup) {
+ REDIS_PROCESS_CMD(xreadgroup, redis_xread_reply);
+}
+
+PHP_METHOD(Redis, xrevrange) {
+ REDIS_PROCESS_KW_CMD("XREVRANGE", redis_xrange_cmd, redis_xrange_reply);
+}
+
+PHP_METHOD(Redis, xtrim) {
+ REDIS_PROCESS_CMD(xtrim, redis_long_response);
+}
+
/* vim: set tabstop=4 softtabstop=4 expandtab shiftwidth=4: */
diff --git a/redis_array.c b/redis_array.c
index 9f00a74c..217ab63e 100644
--- a/redis_array.c
+++ b/redis_array.c
@@ -735,7 +735,6 @@ PHP_METHOD(RedisArray, keys)
RedisArray *ra;
char *pattern;
strlen_t pattern_len;
- int i;
/* Make sure the prototype is correct */
if(zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os",
@@ -764,7 +763,6 @@ PHP_METHOD(RedisArray, keys)
PHP_METHOD(RedisArray, getOption)
{
zval *object, z_fun, z_args[1];
- int i;
RedisArray *ra;
zend_long opt;
@@ -791,7 +789,6 @@ PHP_METHOD(RedisArray, getOption)
PHP_METHOD(RedisArray, setOption)
{
zval *object, z_fun, z_args[2];
- int i;
RedisArray *ra;
zend_long opt;
char *val_str;
@@ -822,7 +819,6 @@ PHP_METHOD(RedisArray, setOption)
PHP_METHOD(RedisArray, select)
{
zval *object, z_fun, z_args[1];
- int i;
RedisArray *ra;
zend_long opt;
diff --git a/redis_cluster.c b/redis_cluster.c
index 4271b9d9..130b961a 100644
--- a/redis_cluster.c
+++ b/redis_cluster.c
@@ -252,6 +252,19 @@ zend_function_entry redis_cluster_functions[] = {
PHP_ME(RedisCluster, unlink, arginfo_del, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, unwatch, arginfo_void, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, watch, arginfo_watch, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xack, arginfo_xack, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xadd, arginfo_xadd, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xclaim, arginfo_xclaim, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xdel, arginfo_xdel, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xgroup, arginfo_xgroup, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xinfo, arginfo_xinfo, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xlen, arginfo_key, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xpending, arginfo_xpending, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xrange, arginfo_xrange, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xread, arginfo_xread, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xreadgroup, arginfo_xreadgroup, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xrevrange, arginfo_xrange, ZEND_ACC_PUBLIC)
+ PHP_ME(RedisCluster, xtrim, arginfo_xtrim, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, zadd, arginfo_zadd, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, zcard, arginfo_key, ZEND_ACC_PUBLIC)
PHP_ME(RedisCluster, zcount, arginfo_key_min_max, ZEND_ACC_PUBLIC)
@@ -1101,7 +1114,7 @@ PHP_METHOD(RedisCluster, keys) {
}
/* Ensure we can get a response */
- resp = cluster_read_resp(c TSRMLS_CC);
+ resp = cluster_read_resp(c, 0 TSRMLS_CC);
if (!resp) {
php_error_docref(0 TSRMLS_CC, E_WARNING,
"Can't read response from %s:%d", ZSTR_VAL(node->sock->host),
@@ -1305,7 +1318,7 @@ PHP_METHOD(RedisCluster, sadd) {
/* {{{ proto long RedisCluster::saddarray(string key, array values) */
PHP_METHOD(RedisCluster, saddarray) {
- CLUSTER_PROCESS_KW_CMD("SADD", redis_key_arr_cmd, cluster_long_resp, 0);
+ CLUSTER_PROCESS_KW_CMD("SADD", redis_key_val_arr_cmd, cluster_long_resp, 0);
}
/* }}} */
@@ -2963,6 +2976,65 @@ PHP_METHOD(RedisCluster, ping) {
}
/* }}} */
+/* {{{ proto long RedisCluster::xack(string key, string group, array ids) }}} */
+PHP_METHOD(RedisCluster, xack) {
+ CLUSTER_PROCESS_CMD(xack, cluster_long_resp, 0);
+}
+
+/* {{{ proto string RedisCluster::xadd(string key, string id, array field_values) }}} */
+PHP_METHOD(RedisCluster, xadd) {
+ CLUSTER_PROCESS_CMD(xadd, cluster_single_line_resp, 0);
+}
+
+/* {{{ proto array RedisCluster::xclaim(string key, string group, string consumer,
+ * long min_idle_time, array ids, array options) */
+PHP_METHOD(RedisCluster, xclaim) {
+ CLUSTER_PROCESS_CMD(xclaim, cluster_xclaim_resp, 0);
+}
+
+PHP_METHOD(RedisCluster, xdel) {
+ CLUSTER_PROCESS_KW_CMD("XDEL", redis_key_str_arr_cmd, cluster_long_resp, 0);
+}
+
+/* {{{ proto variant RedisCluster::xgroup(string op, [string key, string arg1, string arg2]) }}} */
+PHP_METHOD(RedisCluster, xgroup) {
+ CLUSTER_PROCESS_CMD(xgroup, cluster_variant_resp, 0);
+}
+
+/* {{{ proto variant RedisCluster::xinfo(string op, [string arg1, string arg2]); */
+PHP_METHOD(RedisCluster, xinfo) {
+ CLUSTER_PROCESS_CMD(xinfo, cluster_variant_resp, 0);
+}
+
+/* {{{ proto string RedisCluster::xlen(string key) }}} */
+PHP_METHOD(RedisCluster, xlen) {
+ CLUSTER_PROCESS_KW_CMD("XLEN", redis_key_cmd, cluster_long_resp, 1);
+}
+
+PHP_METHOD(RedisCluster, xpending) {
+ CLUSTER_PROCESS_CMD(xpending, cluster_variant_resp_strings, 1);
+}
+
+PHP_METHOD(RedisCluster, xrange) {
+ CLUSTER_PROCESS_KW_CMD("XRANGE", redis_xrange_cmd, cluster_xrange_resp, 1);
+}
+
+PHP_METHOD(RedisCluster, xrevrange) {
+ CLUSTER_PROCESS_KW_CMD("XREVRANGE", redis_xrange_cmd, cluster_xrange_resp, 1);
+}
+
+PHP_METHOD(RedisCluster, xread) {
+ CLUSTER_PROCESS_CMD(xread, cluster_xread_resp, 1);
+}
+
+PHP_METHOD(RedisCluster, xreadgroup) {
+ CLUSTER_PROCESS_CMD(xreadgroup, cluster_xread_resp, 0);
+}
+
+PHP_METHOD(RedisCluster, xtrim) {
+ CLUSTER_PROCESS_CMD(xtrim, cluster_long_resp, 0);
+}
+
/* {{{ proto string RedisCluster::echo(string key, string msg)
* proto string RedisCluster::echo(array host_port, string msg) */
PHP_METHOD(RedisCluster, echo) {
diff --git a/redis_cluster.h b/redis_cluster.h
index 74903e67..b4d0a8ba 100644
--- a/redis_cluster.h
+++ b/redis_cluster.h
@@ -263,6 +263,21 @@ PHP_METHOD(RedisCluster, zscan);
PHP_METHOD(RedisCluster, hscan);
PHP_METHOD(RedisCluster, sscan);
+/* STREAMS */
+PHP_METHOD(RedisCluster, xack);
+PHP_METHOD(RedisCluster, xadd);
+PHP_METHOD(RedisCluster, xclaim);
+PHP_METHOD(RedisCluster, xdel);
+PHP_METHOD(RedisCluster, xgroup);
+PHP_METHOD(RedisCluster, xinfo);
+PHP_METHOD(RedisCluster, xlen);
+PHP_METHOD(RedisCluster, xpending);
+PHP_METHOD(RedisCluster, xrange);
+PHP_METHOD(RedisCluster, xread);
+PHP_METHOD(RedisCluster, xreadgroup);
+PHP_METHOD(RedisCluster, xrevrange);
+PHP_METHOD(RedisCluster, xtrim);
+
/* Transactions */
PHP_METHOD(RedisCluster, multi);
PHP_METHOD(RedisCluster, exec);
diff --git a/redis_commands.c b/redis_commands.c
index c47669ad..e9d26c3d 100644
--- a/redis_commands.c
+++ b/redis_commands.c
@@ -1050,13 +1050,16 @@ int redis_key_varval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
}
/* Commands that take a key and then an array of values */
-int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
- char *kw, char **cmd, int *cmd_len, short *slot,
- void **ctx)
+#define VAL_TYPE_VALUES 1
+#define VAL_TYPE_STRINGS 2
+static int gen_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, int valtype, char **cmd, int *cmd_len,
+ short *slot, void **ctx)
{
zval *z_arr, *z_val;
HashTable *ht_arr;
smart_string cmdstr = {0};
+ zend_string *zstr;
int key_free, val_free, argc = 1;
strlen_t val_len, key_len;
char *key, *val;
@@ -1080,10 +1083,17 @@ int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
if (key_free) efree(key);
/* Iterate our hash table, serializing and appending values */
+ assert(valtype == VAL_TYPE_VALUES || valtype == VAL_TYPE_STRINGS);
ZEND_HASH_FOREACH_VAL(ht_arr, z_val) {
- val_free = redis_pack(redis_sock, z_val, &val, &val_len TSRMLS_CC);
- redis_cmd_append_sstr(&cmdstr, val, val_len);
- if (val_free) efree(val);
+ if (valtype == VAL_TYPE_VALUES) {
+ val_free = redis_pack(redis_sock, z_val, &val, &val_len TSRMLS_CC);
+ redis_cmd_append_sstr(&cmdstr, val, val_len);
+ if (val_free) efree(val);
+ } else {
+ zstr = zval_get_string(z_val);
+ redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(zstr), ZSTR_LEN(zstr));
+ zend_string_release(zstr);
+ }
} ZEND_HASH_FOREACH_END();
*cmd_len = cmdstr.len;
@@ -1092,6 +1102,22 @@ int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return SUCCESS;
}
+int redis_key_val_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot,
+ void **ctx)
+{
+ return gen_key_arr_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, kw,
+ VAL_TYPE_VALUES, cmd, cmd_len, slot, ctx);
+}
+
+int redis_key_str_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot,
+ void **ctx)
+{
+ return gen_key_arr_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, kw,
+ VAL_TYPE_STRINGS, cmd, cmd_len, slot, ctx);
+}
+
/* Generic function that takes a variable number of keys, with an optional
* timeout value. This can handle various SUNION/SUNIONSTORE/BRPOP type
* commands. */
@@ -3147,6 +3173,611 @@ int redis_command_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return SUCCESS;
}
+/* XADD */
+int redis_xadd_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ zend_string *arrkey;
+ zval *z_fields, *value;
+ zend_long maxlen = 0;
+ zend_bool approx = 0;
+ ulong idx;
+ HashTable *ht_fields;
+ int fcount, argc;
+ char *key, *id;
+ strlen_t keylen, idlen;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa|lb", &key, &keylen,
+ &id, &idlen, &z_fields, &maxlen, &approx) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* At least one field and string are required */
+ ht_fields = Z_ARRVAL_P(z_fields);
+ if ((fcount = zend_hash_num_elements(ht_fields)) == 0) {
+ return FAILURE;
+ }
+
+ if (maxlen < 0 || (maxlen == 0 && approx != 0)) {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING,
+ "Warning: Invalid MAXLEN argument or approximate flag");
+ }
+
+
+ /* Calculate argc for XADD. It's a bit complex because we've got
+ * an optional MAXLEN argument which can either take the form MAXLEN N
+ * or MAXLEN ~ N */
+ argc = 2 + (fcount*2) + (maxlen > 0 ? (approx ? 3 : 2) : 0);
+
+ /* XADD key ID field string [field string ...] */
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XADD");
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+
+ /* Now append our MAXLEN bits if we've got them */
+ if (maxlen > 0) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "MAXLEN");
+ REDIS_CMD_APPEND_SSTR_OPT_STATIC(&cmdstr, approx, "~");
+ redis_cmd_append_sstr_long(&cmdstr, maxlen);
+ }
+
+ /* Now append ID and field(s) */
+ redis_cmd_append_sstr(&cmdstr, id, idlen);
+ ZEND_HASH_FOREACH_KEY_VAL(ht_fields, idx, arrkey, value) {
+ redis_cmd_append_sstr_arrkey(&cmdstr, arrkey, idx);
+ redis_cmd_append_sstr_zval(&cmdstr, value, redis_sock TSRMLS_CC);
+ } ZEND_HASH_FOREACH_END();
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+// XPENDING key group [start end count] [consumer]
+int redis_xpending_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ char *key, *group, *start = NULL, *end = NULL, *consumer = NULL;
+ strlen_t keylen, grouplen, startlen, endlen, consumerlen;
+ int argc;
+ zend_long count = -1;
+
+ // XPENDING mystream group55 - + 10 consumer-123
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|ssls", &key,
+ &keylen, &group, &grouplen, &start, &startlen,
+ &end, &endlen, &count, &consumer, &consumerlen)
+ == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* If we've been passed a start argument, we also need end and count */
+ if (start != NULL && (end == NULL || count < 0)) {
+ return FAILURE;
+ }
+
+ /* Calculate argc. It's either 2, 5, or 6 */
+ argc = 2 + (start != NULL ? 3 + (consumer != NULL) : 0);
+
+ /* Construct command and add required arguments */
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XPENDING");
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+ redis_cmd_append_sstr(&cmdstr, group, grouplen);
+
+ /* Add optional argumentst */
+ if (start) {
+ redis_cmd_append_sstr(&cmdstr, start, startlen);
+ redis_cmd_append_sstr(&cmdstr, end, endlen);
+ redis_cmd_append_sstr_long(&cmdstr, (long)count);
+
+ /* Finally add consumer if we have it */
+ if (consumer) redis_cmd_append_sstr(&cmdstr, consumer, consumerlen);
+ }
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* X[REV]RANGE key start end [COUNT count] */
+int redis_xrange_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot,
+ void **ctx)
+{
+ smart_string cmdstr = {0};
+ char *key, *start, *end;
+ strlen_t keylen, startlen, endlen;
+ zend_long count = -1;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sss|l", &key, &keylen,
+ &start, &startlen, &end, &endlen, &count)
+ == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ redis_cmd_init_sstr(&cmdstr, 3 + (2 * (count > -1)), kw, strlen(kw));
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+ redis_cmd_append_sstr(&cmdstr, start, startlen);
+ redis_cmd_append_sstr(&cmdstr, end, endlen);
+
+ if (count > -1) {
+ redis_cmd_append_sstr(&cmdstr, "COUNT", sizeof("COUNT")-1);
+ redis_cmd_append_sstr_long(&cmdstr, count);
+ }
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* Helper function to take an associative array and append the Redis
+ * STREAMS stream [stream...] id [id ...] arguments to a command string. */
+static int
+append_stream_args(smart_string *cmdstr, HashTable *ht, RedisSock *redis_sock,
+ short *slot TSRMLS_DC)
+{
+ char *kptr, kbuf[40];
+ int klen, i, pos = 0;
+ zend_string *key, *idstr;
+ short oldslot;
+ zval **id;
+ ulong idx;
+
+ /* Append STREAM qualifier */
+ REDIS_CMD_APPEND_SSTR_STATIC(cmdstr, "STREAMS");
+
+ /* Sentinel slot */
+ if (slot) oldslot = -1;
+
+ /* Allocate memory to keep IDs */
+ id = emalloc(sizeof(*id) * zend_hash_num_elements(ht));
+
+ /* Iterate over our stream => id array appending streams and retaining each
+ * value for final arguments */
+ ZEND_HASH_FOREACH_KEY_VAL(ht, idx, key, id[pos++]) {
+ if (key) {
+ klen = ZSTR_LEN(key);
+ kptr = ZSTR_VAL(key);
+ } else {
+ klen = snprintf(kbuf, sizeof(kbuf), "%ld", (long)idx);
+ kptr = (char*)kbuf;
+ }
+
+ /* Append stream key */
+ redis_cmd_append_sstr_key(cmdstr, kptr, klen, redis_sock, slot);
+
+ /* Protect the user against CROSSSLOT to avoid confusion */
+ if (slot) {
+ if (oldslot != -1 && *slot != oldslot) {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING,
+ "Warning, not all keys hash to the same slot!");
+ efree(id);
+ return FAILURE;
+ }
+ oldslot = *slot;
+ }
+ } ZEND_HASH_FOREACH_END();
+
+ /* Add our IDs */
+ for (i = 0; i < pos; i++) {
+ idstr = zval_get_string(id[i]);
+ redis_cmd_append_sstr(cmdstr, ZSTR_VAL(idstr), ZSTR_LEN(idstr));
+ zend_string_release(idstr);
+ }
+
+ /* Clean up ID container array */
+ efree(id);
+
+ return 0;
+}
+
+/* XREAD [COUNT count] [BLOCK ms] STREAMS key [key ...] id [id ...] */
+int redis_xread_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ zend_long count = -1, block = -1;
+ zval *z_streams;
+ int argc, scount;
+ HashTable *kt;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a|ll", &z_streams,
+ &count, &block) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* At least one stream and ID is required */
+ kt = Z_ARRVAL_P(z_streams);
+ if ((scount = zend_hash_num_elements(kt)) < 1) {
+ return FAILURE;
+ }
+
+ /* Calculate argc and start constructing command */
+ argc = 1 + (2 * scount) + (2 * (count > -1)) + (2 * (block > -1));
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XREAD");
+
+ /* Append COUNT if we have it */
+ if (count > -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "COUNT");
+ redis_cmd_append_sstr_long(&cmdstr, count);
+ }
+
+ /* Append BLOCK if we have it */
+ if (block > -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "BLOCK");
+ redis_cmd_append_sstr_long(&cmdstr, block);
+ }
+
+ /* Append final STREAM key [key ...] id [id ...] arguments */
+ if (append_stream_args(&cmdstr, kt, redis_sock, slot TSRMLS_CC) < 0) {
+ efree(cmdstr.c);
+ return FAILURE;
+ }
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* XREADGROUP GROUP group consumer [COUNT count] [BLOCK ms]
+ * STREAMS key [key ...] id [id ...] */
+int redis_xreadgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ zval *z_streams;
+ HashTable *kt;
+ char *group, *consumer;
+ strlen_t grouplen, consumerlen;
+ int scount, argc;
+ zend_long count = -1, block = -1;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa|ll", &group,
+ &grouplen, &consumer, &consumerlen, &z_streams,
+ &count, &block) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* Redis requires at least one stream */
+ kt = Z_ARRVAL_P(z_streams);
+ if ((scount = zend_hash_num_elements(kt)) < 1) {
+ return FAILURE;
+ }
+
+ /* Calculate argc and start constructing commands */
+ argc = 4 + (2 * scount) + (2 * (count > -1)) + (2 * (block > -1));
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XREADGROUP");
+
+ /* Group and consumer */
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "GROUP");
+ redis_cmd_append_sstr(&cmdstr, group, grouplen);
+ redis_cmd_append_sstr(&cmdstr, consumer, consumerlen);
+
+ /* Append COUNT if we have it */
+ if (count > -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "COUNT");
+ redis_cmd_append_sstr_long(&cmdstr, count);
+ }
+
+ /* Append BLOCK argument if we have it */
+ if (block > -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "BLOCK");
+ redis_cmd_append_sstr_long(&cmdstr, block);
+ }
+
+ /* Finally append stream and id args */
+ if (append_stream_args(&cmdstr, kt, redis_sock, slot TSRMLS_CC) < 0) {
+ efree(cmdstr.c);
+ return FAILURE;
+ }
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* XACK key group id [id ...] */
+int redis_xack_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ char *key, *group;
+ strlen_t keylen, grouplen;
+ zend_string *idstr;
+ zval *z_ids, *z_id;
+ HashTable *ht_ids;
+ int idcount;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa", &key, &keylen,
+ &group, &grouplen, &z_ids) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ ht_ids = Z_ARRVAL_P(z_ids);
+ if ((idcount = zend_hash_num_elements(ht_ids)) < 1) {
+ return FAILURE;
+ }
+
+ /* Create command and add initial arguments */
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, 2 + idcount, "XACK");
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+ redis_cmd_append_sstr(&cmdstr, group, grouplen);
+
+ /* Append IDs */
+ ZEND_HASH_FOREACH_VAL(ht_ids, z_id) {
+ idstr = zval_get_string(z_id);
+ redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(idstr), ZSTR_LEN(idstr));
+ zend_string_release(idstr);
+ } ZEND_HASH_FOREACH_END();
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* XCLAIM options container */
+typedef struct xclaimOptions {
+ struct {
+ char *type;
+ zend_long time;
+ } idle;
+ zend_long retrycount;
+ int force;
+ int justid;
+} xclaimOptions;
+
+/* Helper to extract XCLAIM options */
+static void get_xclaim_options(zval *z_arr, xclaimOptions *opt TSRMLS_DC) {
+ HashTable *ht;
+ zend_string *zkey;
+ char *kval;
+ strlen_t klen;
+ ulong idx;
+ zval *zv;
+
+ PHPREDIS_NOTUSED(idx);
+
+ /* Initialize options array to sane defaults */
+ memset(opt, 0, sizeof(*opt));
+ opt->retrycount = -1;
+ opt->idle.time = -1;
+
+ /* Early return if we don't have any options */
+ if (z_arr == NULL)
+ return;
+
+ /* Iterate over our options array */
+ ht = Z_ARRVAL_P(z_arr);
+ ZEND_HASH_FOREACH_KEY_VAL(ht, idx, zkey, zv) {
+ if (zkey) {
+ /* Every key => value xclaim option requires a long and Redis
+ * treats -1 as not being passed so skip negative values too. */
+ if (Z_TYPE_P(zv) != IS_LONG || Z_LVAL_P(zv) < 0)
+ continue;
+
+ kval = ZSTR_VAL(zkey);
+ klen = ZSTR_LEN(zkey);
+ if (klen == 4) {
+ if (!strncasecmp(kval, "TIME", 4)) {
+ opt->idle.type = "TIME";
+ opt->idle.time = Z_LVAL_P(zv);
+ } else if (!strncasecmp(kval, "IDLE", 4)) {
+ opt->idle.type = "IDLE";
+ opt->idle.time = Z_LVAL_P(zv);
+ }
+ } else if (klen == 10 && !strncasecmp(kval, "RETRYCOUNT", 10)) {
+ opt->retrycount = Z_LVAL_P(zv);
+ }
+ } else {
+ if (Z_TYPE_P(zv) == IS_STRING) {
+ kval = Z_STRVAL_P(zv);
+ klen = Z_STRLEN_P(zv);
+ if (klen == 5 && !strncasecmp(kval, "FORCE", 5)) {
+ opt->force = 1;
+ } else if (klen == 6 && !strncasecmp(kval, "JUSTID", 6)) {
+ opt->justid = 1;
+ }
+ }
+ }
+ } ZEND_HASH_FOREACH_END();
+}
+
+/* Count argc for any options we may have */
+static int xclaim_options_argc(xclaimOptions *opt) {
+ int argc = 0;
+
+ if (opt->idle.type != NULL && opt->idle.time != -1)
+ argc += 2;
+ if (opt->retrycount != -1)
+ argc += 2;
+ if (opt->force)
+ argc++;
+ if (opt->justid)
+ argc++;
+
+ return argc;
+}
+
+/* Append XCLAIM options */
+static void append_xclaim_options(smart_string *cmd, xclaimOptions *opt) {
+ /* IDLE/TIME long */
+ if (opt->idle.type != NULL && opt->idle.time != -1) {
+ redis_cmd_append_sstr(cmd, opt->idle.type, strlen(opt->idle.type));
+ redis_cmd_append_sstr_long(cmd, opt->idle.time);
+ }
+
+ /* RETRYCOUNT */
+ if (opt->retrycount != -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(cmd, "RETRYCOUNT");
+ redis_cmd_append_sstr_long(cmd, opt->retrycount);
+ }
+
+ /* FORCE and JUSTID */
+ if (opt->force)
+ REDIS_CMD_APPEND_SSTR_STATIC(cmd, "FORCE");
+ if (opt->justid)
+ REDIS_CMD_APPEND_SSTR_STATIC(cmd, "JUSTID");
+}
+
+/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
+ [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
+ [FORCE] [JUSTID] */
+int redis_xclaim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ char *key, *group, *consumer;
+ strlen_t keylen, grouplen, consumerlen;
+ zend_long min_idle;
+ int argc, id_count;
+ zval *z_ids, *z_id, *z_opts = NULL;
+ zend_string *zstr;
+ HashTable *ht_ids;
+ xclaimOptions opts;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sssla|a", &key, &keylen,
+ &group, &grouplen, &consumer, &consumerlen, &min_idle,
+ &z_ids, &z_opts) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* At least one id is required */
+ ht_ids = Z_ARRVAL_P(z_ids);
+ if ((id_count = zend_hash_num_elements(ht_ids)) < 1) {
+ return FAILURE;
+ }
+
+ /* Extract options array if we've got them */
+ get_xclaim_options(z_opts, &opts TSRMLS_CC);
+
+ /* Now we have enough information to calculate argc */
+ argc = 4 + id_count + xclaim_options_argc(&opts);
+
+ /* Start constructing our command */
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XCLAIM");
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+ redis_cmd_append_sstr(&cmdstr, group, grouplen);
+ redis_cmd_append_sstr(&cmdstr, consumer, consumerlen);
+ redis_cmd_append_sstr_long(&cmdstr, min_idle);
+
+ /* Add IDs */
+ ZEND_HASH_FOREACH_VAL(ht_ids, z_id) {
+ zstr = zval_get_string(z_id);
+ redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(zstr), ZSTR_LEN(zstr));
+ zend_string_release(zstr);
+ } ZEND_HASH_FOREACH_END();
+
+ /* Finally add our options */
+ append_xclaim_options(&cmdstr, &opts);
+
+ /* Success */
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* XGROUP HELP
+ * XGROUP SETID key group id
+ * XGROUP DELGROUP key groupname
+ * XGROUP CREATE key groupname id
+ * XGROUP DELCONSUMER key groupname consumername */
+int redis_xgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ char *op, *key = NULL, *arg1 = NULL, *arg2 = NULL;
+ strlen_t oplen, keylen, arg1len, arg2len;
+ int argc = ZEND_NUM_ARGS();
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|ssss", &op, &oplen,
+ &key, &keylen, &arg1, &arg1len, &arg2, &arg2len)
+ == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ if (argc == 1 && oplen == 4 && !strncasecmp(op, "HELP", 4)) {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "s", "HELP", 4);
+ return SUCCESS;
+ } else if (argc == 4 && ((oplen == 5 && !strncasecmp(op, "SETID", 5)) ||
+ (oplen == 6 && !strncasecmp(op, "CREATE", 6)) ||
+ (oplen == 11 && !strncasecmp(op, "DELCONSUMER", 11))))
+ {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "skss", op, oplen, key, keylen,
+ arg1, arg1len, arg2, arg2len);
+ return SUCCESS;
+ } else if (argc == 3 && ((oplen == 7 && !strncasecmp(op, "DELGROUP", 7)))) {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "sks", op, oplen, key,
+ keylen, arg1, arg1len);
+ return SUCCESS;
+ }
+
+ /* Didn't detect any valid XGROUP command pattern */
+ return FAILURE;
+}
+
+
+
+/* XINFO CONSUMERS key group
+ * XINFO GROUPS key
+ * XINFO STREAM key
+ * XINFO HELP */
+int redis_xinfo_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ char *op, *key, *arg;
+ strlen_t oplen, keylen, arglen;
+ char fmt[4];
+ int argc = ZEND_NUM_ARGS();
+
+ if (argc > 3 || zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|ss",
+ &op, &oplen, &key, &keylen, &arg,
+ &arglen) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* Our format is simply "s", "sk" or "sks" depending on argc */
+ memcpy(fmt, "sks", sizeof("sks")-1);
+ fmt[argc] = '\0';
+
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XINFO", fmt, op, oplen, key, keylen,
+ arg, arglen);
+ return SUCCESS;
+}
+
+/* XTRIM MAXLEN [~] count */
+int redis_xtrim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ char *key;
+ strlen_t keylen;
+ zend_long maxlen;
+ zend_bool approx = 0;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sl|b", &key, &keylen,
+ &maxlen, &approx) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ if (approx) {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XTRIM", "kssl", key, keylen,
+ "MAXLEN", 6, "~", 1, maxlen);
+ } else {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XTRIM", "ksl", key, keylen,
+ "MAXLEN", 6, maxlen);
+ }
+
+ return SUCCESS;
+}
+
/*
* Redis commands that don't deal with the server at all. The RedisSock*
* pointer is the only thing retreived differently, so we just take that
diff --git a/redis_commands.h b/redis_commands.h
index 413b868e..2f3e53de 100644
--- a/redis_commands.h
+++ b/redis_commands.h
@@ -72,7 +72,10 @@ int redis_key_dbl_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
int redis_key_varval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
-int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+int redis_key_val_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
+
+int redis_key_str_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
/* Construct SCAN and similar commands, as well as check iterator */
@@ -112,6 +115,9 @@ int redis_eval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
int redis_flush_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
+int redis_xrange_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot, void **ctx);
+
/* Commands which need a unique construction mechanism. This is either because
* they don't share a signature with any other command, or because there is
* specific processing we do (e.g. verifying subarguments) that make them
@@ -256,6 +262,33 @@ int redis_georadiusbymember_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_s
int redis_migrate_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
char **cmd, int *cmd_len, short *slot, void **ctx);
+int redis_xadd_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx);
+
+int redis_xclaim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx);
+
+int redis_xpending_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx);
+
+int redis_xack_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx);
+
+int redis_xgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx);
+
+int redis_xinfo_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx);
+
+int redis_xread_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx);
+
+int redis_xreadgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx);
+
+int redis_xtrim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx);
+
/* Commands that don't communicate with Redis at all (such as getOption,
* setOption, _prefix, _serialize, etc). These can be handled in one place
* with the method of grabbing our RedisSock* object in different ways
diff --git a/redis_session.c b/redis_session.c
index 08022cb1..7f1a2656 100644
--- a/redis_session.c
+++ b/redis_session.c
@@ -1061,7 +1061,7 @@ PS_READ_FUNC(rediscluster) {
redisCluster *c = PS_GET_MOD_DATA();
clusterReply *reply;
char *cmd, *skey;
- int cmdlen, skeylen;
+ int cmdlen, skeylen, free_flag;
short slot;
/* Set up our command and slot information */
@@ -1084,7 +1084,7 @@ PS_READ_FUNC(rediscluster) {
efree(cmd);
/* Attempt to read reply */
- reply = cluster_read_resp(c TSRMLS_CC);
+ reply = cluster_read_resp(c, 0 TSRMLS_CC);
if (!reply || c->err) {
if (reply) cluster_free_reply(reply, 1);
return FAILURE;
@@ -1099,16 +1099,20 @@ PS_READ_FUNC(rediscluster) {
*val = reply->str;
*vallen = reply->len;
}
+
+ free_flag = 0;
#else
if (reply->str == NULL) {
*val = ZSTR_EMPTY_ALLOC();
} else {
*val = zend_string_init(reply->str, reply->len, 0);
}
+
+ free_flag = 1;
#endif
/* Clean up */
- cluster_free_reply(reply, 0);
+ cluster_free_reply(reply, free_flag);
/* Success! */
return SUCCESS;
@@ -1148,7 +1152,7 @@ PS_WRITE_FUNC(rediscluster) {
efree(cmd);
/* Attempt to read reply */
- reply = cluster_read_resp(c TSRMLS_CC);
+ reply = cluster_read_resp(c, 0 TSRMLS_CC);
if (!reply || c->err) {
if (reply) cluster_free_reply(reply, 1);
return FAILURE;
@@ -1188,7 +1192,7 @@ PS_DESTROY_FUNC(rediscluster) {
efree(cmd);
/* Attempt to read reply */
- reply = cluster_read_resp(c TSRMLS_CC);
+ reply = cluster_read_resp(c, 0 TSRMLS_CC);
if (!reply || c->err) {
if (reply) cluster_free_reply(reply, 1);
return FAILURE;
diff --git a/tests/RedisTest.php b/tests/RedisTest.php
index b5da01e8..df650ee5 100644
--- a/tests/RedisTest.php
+++ b/tests/RedisTest.php
@@ -16,6 +16,11 @@ class Redis_Test extends TestSuite
'Cupertino' => Array(-122.032182, 37.322998)
);
+ protected $serializers = Array(
+ Redis::SERIALIZER_NONE,
+ Redis::SERIALIZER_PHP,
+ );
+
/**
* @var Redis
*/
@@ -35,12 +40,20 @@ class Redis_Test extends TestSuite
$this->redis = $this->newInstance();
$info = $this->redis->info();
$this->version = (isset($info['redis_version'])?$info['redis_version']:'0.0.0');
+
+ if (defined('Redis::SERIALIZER_IGBINARY')) {
+ $this->serializers[] = Redis::SERIALIZER_IGBINARY;
+ }
}
protected function minVersionCheck($version) {
return version_compare($this->version, $version, "ge");
}
+ protected function mstime() {
+ return round(microtime(true)*1000);
+ }
+
protected function newInstance() {
$r = new Redis();
@@ -5223,6 +5236,437 @@ class Redis_Test extends TestSuite
$this->assertEquals($this->redis->lrange('mylist', 0, -1), Array('A','B','C','D'));
}
+ /* STREAMS */
+
+ protected function addStreamEntries($key, $count) {
+ $ids = Array();
+
+ $this->redis->del($key);
+
+ for ($i = 0; $i < $count; $i++) {
+ $ids[] = $this->redis->xAdd($key, '*', Array('field' => "value:$i"));
+ }
+
+ return $ids;
+ }
+
+ protected function addStreamsAndGroups($arr_streams, $count, $arr_groups) {
+ $ids = Array();
+
+ foreach ($arr_streams as $str_stream) {
+ $ids[$str_stream] = $this->addStreamEntries($str_stream, $count);
+ foreach ($arr_groups as $str_group => $str_id) {
+ $this->redis->xGroup('CREATE', $str_stream, $str_group, $str_id);
+ }
+ }
+
+ return $ids;
+ }
+
+ public function testXAdd() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ $this->redis->del('stream');
+ for ($i = 0; $i < 5; $i++) {
+ $id = $this->redis->xAdd("stream", '*', Array('k1' => 'v1', 'k2' => 'v2'));
+ $this->assertEquals($this->redis->xLen('stream'), $i+1);
+
+ /* Redis should return <timestamp>-<sequence> */
+ $bits = explode('-', $id);
+ $this->assertEquals(count($bits), 2);
+ $this->assertTrue(is_numeric($bits[0]));
+ $this->assertTrue(is_numeric($bits[1]));
+ }
+
+ /* Test an absolute maximum length */
+ for ($i = 0; $i < 100; $i++) {
+ $this->redis->xAdd('stream', '*', Array('k' => 'v'), 10);
+ }
+ $this->assertEquals($this->redis->xLen('stream'), 10);
+
+ /* Not the greatest test but I'm unsure if approximate trimming is
+ * totally deterministic, so just make sure we are able to add with
+ * an approximate maxlen argument structure */
+ $id = $this->redis->xAdd('stream', '*', Array('k' => 'v'), 10, true);
+ $this->assertEquals(count(explode('-', $id)), 2);
+
+ /* Empty message should fail */
+ $this->redis->xAdd('stream', '*', Array());
+ }
+
+ protected function doXRangeTest($reverse) {
+ $key = '{stream}';
+
+ if ($reverse) {
+ list($cmd,$a1,$a2) = Array('xRevRange', '+', 0);
+ } else {
+ list($cmd,$a1,$a2) = Array('xRange', 0, '+');
+ }
+
+ $this->redis->del($key);
+ for ($i = 0; $i < 3; $i++) {
+ $msg = Array('field' => "value:$i");
+ $id = $this->redis->xAdd($key, '*', $msg);
+ $rows[$id] = $msg;
+ }
+
+ $messages = $this->redis->$cmd($key, $a1, $a2);
+ $this->assertEquals(count($messages), 3);
+
+ $i = $reverse ? 2 : 0;
+ foreach ($messages as $seq => $v) {
+ $this->assertEquals(count(explode('-', $seq)), 2);
+ $this->assertEquals($v, Array('field' => "value:$i"));
+ $i += $reverse ? -1 : 1;
+ }
+
+ /* Test COUNT option */
+ for ($count = 1; $count <= 3; $count++) {
+ $messages = $this->redis->$cmd($key, $a1, $a2, $count);
+ $this->assertEquals(count($messages), $count);
+ }
+ }
+
+ public function testXRange() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ foreach (Array(false, true) as $reverse) {
+ foreach ($this->serializers as $serializer) {
+ foreach (Array(NULL, 'prefix:') as $prefix) {
+ $this->redis->setOption(Redis::OPT_PREFIX, $prefix);
+ $this->redis->setOption(Redis::OPT_SERIALIZER, $serializer);
+ $this->doXRangeTest($reverse);
+ }
+ }
+ }
+ }
+
+ protected function testXLen() {
+ if (!$this->minVersionCheck("5.0"))
+ $this->markTestSkipped();
+
+ $this->redis->del('{stream}');
+ for ($i = 0; $i < 5; $i++) {
+ $this->redis->xadd('{stream}', '*', Array('foo' => 'bar'));
+ $this->assertEquals($this->redis->xLen('{stream}'), $i+1);
+ }
+ }
+
+ public function testXGroup() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ $this->addStreamEntries('s', 2);
+
+ /* CREATE */
+ $this->assertTrue($this->redis->xGroup('CREATE', 's', 'mygroup', '$'));
+ $this->assertFalse($this->redis->xGroup('CREATE', 's', 'mygroup', 'BAD_ID'));
+
+ /* BUSYGROUP */
+ $this->redis->xGroup('CREATE', 's', 'mygroup', '$');
+ $this->assertTrue(strpos($this->redis->getLastError(), 'BUSYGROUP') === 0);
+
+ /* SETID */
+ $this->assertTrue($this->redis->xGroup('SETID', 's', 'mygroup', '$'));
+ $this->assertFalse($this->redis->xGroup('SETID', 's', 'mygroup', 'BAD_ID'));
+
+ $this->assertEquals($this->redis->xGroup('DELCONSUMER', 's', 'mygroup', 'myconsumer'),0);
+
+ /* DELGROUP not yet implemented in Redis */
+ }
+
+ public function testXAck() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ for ($n = 1; $n <= 3; $n++) {
+ $this->addStreamsAndGroups(Array('{s}'), 3, Array('g1' => 0));
+ $msg = $this->redis->xReadGroup('g1', 'c1', Array('{s}' => 0));
+
+ /* Extract IDs */
+ $smsg = array_shift($msg);
+ $ids = array_keys($smsg);
+
+ /* Now ACK $n messages */
+ $ids = array_slice($ids, 0, $n);
+ $this->assertEquals($this->redis->xAck('{s}', 'g1', $ids), $n);
+ }
+
+ /* Verify sending no IDs is a failure */
+ $this->assertFalse($this->redis->xAck('{s}', 'g1', Array()));
+ }
+
+ protected function doXReadTest() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ $row = Array('f1' => 'v1', 'f2' => 'v2');
+ $msgdata = Array(
+ '{stream}-1' => $row,
+ '{stream}-2' => $row,
+ );
+
+ /* Append a bit of data and populate STREAM queries */
+ $this->redis->del(array_keys($msgdata));
+ foreach ($msgdata as $key => $message) {
+ for ($r = 0; $r < 2; $r++) {
+ $id = $this->redis->xAdd($key, '*', $message);
+ $qresult[$key][$id] = $message;
+ }
+ $qzero[$key] = 0;
+ $qnew[$key] = '$';
+ $keys[] = $key;
+ }
+
+ /* Everything from both streams */
+ $rmsg = $this->redis->xRead($qzero);
+ $this->assertEquals($rmsg, $qresult);
+
+ /* Test COUNT option */
+ for ($count = 1; $count <= 2; $count++) {
+ $rmsg = $this->redis->xRead($qzero, $count);
+ foreach ($keys as $key) {
+ $this->assertEquals(count($rmsg[$key]), $count);
+ }
+ }
+
+ /* Should be empty (no new entries) */
+ $this->assertEquals(count($this->redis->xRead($qnew)),0);
+
+ /* Test against a specific ID */
+ $id = $this->redis->xAdd('{stream}-1', '*', $row);
+ $new_id = $this->redis->xAdd('{stream}-1', '*', Array('final' => 'row'));
+ $rmsg = $this->redis->xRead(Array('{stream}-1' => $id));
+ $this->assertEquals(
+ $this->redis->xRead(Array('{stream}-1' => $id)),
+ Array('{stream}-1' => Array($new_id => Array('final' => 'row')))
+ );
+
+ /* Emtpy query should fail */
+ $this->assertFalse($this->redis->xRead(Array()));
+ }
+
+ public function testXRead() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ foreach ($this->serializers as $serializer) {
+ $this->redis->setOption(Redis::OPT_SERIALIZER, $serializer);
+ $this->doXReadTest();
+ }
+
+ /* Don't need to test BLOCK multiple times */
+ $m1 = round(microtime(true)*1000);
+ $this->redis->xRead(Array('somestream' => '$'), -1, 100);
+ $m2 = round(microtime(true)*1000);
+ $this->assertTrue($m2 - $m1 >= 100);
+ }
+
+ protected function compareStreamIds($redis, $control) {
+ foreach ($control as $stream => $ids) {
+ $rcount = count($redis[$stream]);
+ $lcount = count($control[$stream]);
+
+ /* We should have the same number of messages */
+ $this->assertEquals($rcount, $lcount);
+
+ /* We should have the exact same IDs */
+ foreach ($ids as $k => $id) {
+ $this->assertTrue(isset($redis[$stream][$id]));
+ }
+ }
+ }
+
+ public function testXReadGroup() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ /* Create some streams and groups */
+ $streams = Array('{s}-1', '{s}-2');
+ $qstreams = Array('{s}-1' => 0, '{s}-2' => 0);
+ $groups = Array('g1' => 0, 'g2' => 0);
+
+ $ids = $this->addStreamsAndGroups($streams, 3, $groups);
+
+ /* Test that we get get the IDs we should */
+ foreach (Array('g1', 'g2') as $group) {
+ foreach ($ids as $stream => $messages) {
+ while ($ids[$stream]) {
+ /* Read more messages */
+ $resp = $this->redis->xReadGroup($group, 'consumer', $qstreams);
+
+ /* They should match with our local control array */
+ $this->compareStreamIds($resp, $ids);
+
+ /* Remove a message from our control *and* XACK it in Redis */
+ $id = array_shift($ids[$stream]);
+ $this->redis->xAck($stream, $group, Array($id));
+ }
+ }
+ }
+
+ /* Test COUNT option */
+ for ($c = 1; $c <= 3; $c++) {
+ $this->addStreamsAndGroups($streams, 3, $groups);
+ $resp = $this->redis->xReadGroup('g1', 'consumer', $qstreams, $c);
+
+ foreach ($resp as $stream => $smsg) {
+ $this->assertEquals(count($smsg), $c);
+ }
+ }
+
+ /* Finally test BLOCK with a sloppy timing test */
+ $t1 = $this->mstime();
+ $qnew = Array('{s}-1' => '>', '{s}-2' => '>');
+ $this->redis->xReadGroup('g1', 'c1', $qnew, -1, 100);
+ $t2 = $this->mstime();
+ $this->assertTrue($t2 - $t1 >= 100);
+ }
+
+ public function testXPending() {
+ if (!$this->minVersionCheck("5.0")) {
+ return $this->markTestSkipped();
+ }
+
+ $rows = 5;
+ $this->addStreamsAndGroups(Array('s'), $rows, Array('group' => 0));
+
+ $msg = $this->redis->xReadGroup('group', 'consumer', Array('s' => 0));
+ $ids = array_keys($msg['s']);
+
+ for ($n = count($ids); $n >= 0; $n--) {
+ $xp = $this->redis->xPending('s', 'group');
+ $this->assertEquals($xp[0], count($ids));
+
+ /* Verify we're seeing the IDs themselves */
+ for ($idx = 1; $idx <= 2; $idx++) {
+ if ($xp[$idx]) {
+ $this->assertPatternMatch($xp[$idx], "/^[0-9].*-[0-9].*/");
+ }
+ }
+
+ if ($ids) {
+ $id = array_shift($ids);
+ $this->redis->xAck('s', 'group', Array($id));
+ }
+ }
+ }
+
+ public function testXDel() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ for ($n = 5; $n > 0; $n--) {
+ $ids = $this->addStreamEntries('s', 5);
+ $todel = array_slice($ids, 0, $n);
+ $this->assertEquals($this->redis->xDel('s', $todel), count($todel));
+ }
+
+ /* Empty array should fail */
+ $this->assertFalse($this->redis->xDel('s', Array()));
+ }
+
+ public function testXTrim() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ for ($maxlen = 0; $maxlen <= 50; $maxlen += 10) {
+ $this->addStreamEntries('stream', 100);
+ $trimmed = $this->redis->xTrim('stream', $maxlen);
+ $this->assertEquals($trimmed, 100 - $maxlen);
+ }
+
+ /* APPROX trimming isn't easily deterministic, so just make sure we
+ can call it with the flag */
+ $this->addStreamEntries('stream', 100);
+ $this->assertFalse($this->redis->xTrim('stream', 1, true) === false);
+ }
+
+ /* XCLAIM is one of the most complicated commands, with a great deal of different options
+ * The following test attempts to verify every combination of every possible option. */
+ public function testXClaim() {
+ if (!$this->minVersionCheck("5.0"))
+ return $this->markTestSkipped();
+
+ foreach (Array(0, 100) as $min_idle_time) {
+ foreach (Array(false, true) as $justid) {
+ foreach (Array(0, 10) as $retrycount) {
+ /* We need to test not passing TIME/IDLE as well as passing either */
+ if ($min_idle_time == 0) {
+ $topts = Array(Array(), Array('IDLE', 1000000), Array('TIME', time() * 1000));
+ } else {
+ $topts = Array(NULL);
+ }
+
+ foreach ($topts as $tinfo) {
+ if ($tinfo) {
+ list($ttype, $tvalue) = $tinfo;
+ } else {
+ $ttype = NULL; $tvalue = NULL;
+ }
+
+ /* Add some messages and create a group */
+ $this->addStreamsAndGroups(Array('s'), 10, Array('group1' => 0));
+
+ /* Create a second stream we can FORCE ownership on */
+ $fids = $this->addStreamsAndGroups(Array('f'), 10, Array('group1' => 0));
+ $fids = $fids['f'];
+
+ /* Have consumer 'Mike' read the messages */
+ $oids = $this->redis->xReadGroup('group1', 'Mike', Array('s' => 0));
+ $oids = array_keys($oids['s']); /* We're only dealing with stream 's' */
+
+ /* Construct our options array */
+ $opts = Array();
+ if ($justid) $opts[] = 'JUSTID';
+ if ($retrycount) $opts['RETRYCOUNT'] = $retrycount;
+ if ($tvalue !== NULL) $opts[$ttype] = $tvalue;
+
+ /* Now have pavlo XCLAIM them */
+ $cids = $this->redis->xClaim('s', 'group1', 'Pavlo', $min_idle_time, $oids, $opts);
+ if (!$justid) $cids = array_keys($cids);
+
+ if ($min_idle_time == 0) {
+ $this->assertEquals($cids, $oids);
+
+ /* Append the FORCE option to our second stream where we have not already
+ * assigned to a PEL group */
+ $opts[] = 'FORCE';
+ $freturn = $this->redis->xClaim('f', 'group1', 'Test', 0, $fids, $opts);
+ if (!$justid) $freturn = array_keys($freturn);
+ $this->assertEquals($freturn, $fids);
+
+ if ($retrycount || $tvalue !== NULL) {
+ $pending = $this->redis->xPending('s', 'group1', 0, '+', 1, 'Pavlo');
+
+ if ($retrycount) {
+ $this->assertEquals($pending[0][3], $retrycount);
+ }
+ if ($tvalue !== NULL) {
+ if ($ttype == 'IDLE') {
+ /* If testing IDLE the value must be >= what we set */
+ $this->assertTrue($pending[0][2] >= $tvalue);
+ } else {
+ /* Timing tests are notoriously irritating but I don't see
+ * how we'll get >= 20,000 ms between XCLAIM and XPENDING no
+ * matter how slow the machine/VM running the tests is */
+ $this->assertTrue($pending[0][2] <= 20000);
+ }
+ }
+ }
+ } else {
+ /* We're verifying that we get no messages when we've set 100 seconds
+ * as our idle time, which should match nothing */
+ $this->assertEquals($cids, Array());
+ }
+ }
+ }
+ }
+ }
+ }
+
public function testSession_savedToRedis()
{
$this->setSessionHandler();
diff --git a/tests/TestSuite.php b/tests/TestSuite.php
index 461ac7dc..c2d68259 100644
--- a/tests/TestSuite.php
+++ b/tests/TestSuite.php
@@ -65,7 +65,14 @@ class TestSuite {
}
protected function assertFalse($bool) {
- return $this->assertTrue(!$bool);
+ if(!$bool)
+ return true;
+
+ $bt = debug_backtrace(false);
+ self::$errors []= sprintf("Assertion failed: %s:%d (%s)\n",
+ $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]);
+
+ return false;
}
protected function assertTrue($bool) {
@@ -99,6 +106,15 @@ class TestSuite {
$bt[0]["file"], $bt[0]["line"], $bt[1]["function"]);
}
+ protected function assertPatternMatch($str_test, $str_regex) {
+ if (preg_match($str_regex, $str_test))
+ return;
+
+ $bt = debug_backtrace(false);
+ self::$errors []= sprintf("Assertion failed ('%s' doesnt match '%s'): %s:%d (%s)\n",
+ $str_test, $str_regex, $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]);
+ }
+
protected function markTestSkipped($msg='') {
$bt = debug_backtrace(false);
self::$warnings []= sprintf("Skipped test: %s:%d (%s) %s\n",