diff options
author | Michael Grunder <michael.grunder@gmail.com> | 2018-09-29 21:59:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-29 21:59:01 +0300 |
commit | 2c9e0572361d5131f24fbc81a8f7baaafb671994 (patch) | |
tree | 6982b1e1f17b7cf2fc7e024652fad8212edadacd | |
parent | bfd274712eeb372926d1106b3da3c4fc19c0a48a (diff) |
Streams (#1413)
Streams API
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | README.markdown | 322 | ||||
-rw-r--r-- | cluster_library.c | 165 | ||||
-rw-r--r-- | cluster_library.h | 67 | ||||
-rw-r--r-- | common.h | 84 | ||||
-rw-r--r-- | library.c | 402 | ||||
-rw-r--r-- | library.h | 31 | ||||
-rw-r--r-- | php_redis.h | 15 | ||||
-rw-r--r-- | redis.c | 73 | ||||
-rw-r--r-- | redis_array.c | 4 | ||||
-rw-r--r-- | redis_cluster.c | 76 | ||||
-rw-r--r-- | redis_cluster.h | 15 | ||||
-rw-r--r-- | redis_commands.c | 643 | ||||
-rw-r--r-- | redis_commands.h | 35 | ||||
-rw-r--r-- | redis_session.c | 14 | ||||
-rw-r--r-- | tests/RedisTest.php | 444 | ||||
-rw-r--r-- | tests/TestSuite.php | 18 |
17 files changed, 2296 insertions, 115 deletions
@@ -14,4 +14,5 @@ missing autom4te.cache mkinstalldirs run-tests.php -idea/*
\ No newline at end of file +idea/* +.cquery diff --git a/README.markdown b/README.markdown index 56bdd754..1bce4964 100644 --- a/README.markdown +++ b/README.markdown @@ -3268,6 +3268,328 @@ array(1) { } ~~~ +## Streams + +* [xAck](#xack) - Acknowledge one or more pending messages +* [xAdd](#xadd) - Add a message to a stream +* [xClaim](#xclaim) - Acquire ownership of a pending message +* [xDel](#xdel) - Remove a message from a stream +* [xGroup](#xgroup) - Manage consumer groups +* [xInfo](#xinfo) - Get information about a stream +* [xLen](#xlen) - Get the length of a stream +* [xPending](#xpending) - Inspect pending messages in a stream +* [xRange](#xrange) - Query a range of messages from a stream +* [xRead](#xread) - Read message(s) from a stream +* [xReadGroup](#xreadgroup) - Read stream messages with a group and consumer +* [xRevRange](#xrevrange) - Query one or more messages from end to start +* [xTrim](#xtrim) - Trim a stream's size + +### xAck +----- + +##### *Prototype* +~~~php +$obj_redis->xAck($stream, $group, $arr_messages); +~~~ + +_**Description**_: Acknowledge one or more messages on behalf of a consumer group. + +##### *Return value* +*long*: The number of messages Redis reports as acknowledged. + +##### *Example* +~~~php +$obj_redis->xAck('stream', 'group1', ['1530063064286-0', '1530063064286-1']); +~~~ + +### xAdd +----- + +##### *Prototype* +~~~php +$obj_redis->xAdd($str_key, $str_id, $arr_message); +~~~ + +_**Description**_: Add a message to a stream + +##### *Return value* +*String*: The added message ID + +##### *Example* +~~~php +$obj_redis->xAdd('mystream', "\*", ['field' => 'value']); +~~~ + +### xClaim +----- + +##### *Prototype* +~~~php +$obj_redis->($str_key, $str_group, $str_consumer, $min_idle_time, [$arr_options]); +~~~ + +_**Description**_: Claim ownership of one or more pending messages. + +#### *Options Array* +~~~php +$options = [ + /* Note: 'TIME', and 'IDLE' are mutually exclusive */ + 'IDLE' => $value, /* Set the idle time to $value ms */, + 'TIME' => $value, /* Set the idle time to now - $value */ + 'RETRYCOUNT' => $value, /* Update message retrycount to $value */ + 'FORCE', /* Claim the message(s) even if they're not pending anywhere */ + 'JUSTID', /* Instruct Redis to only return IDs */ +]; +~~~ + +##### *Return value* +*Array*: Either an array of message IDs along with corresponding data, or just an array of IDs (if the 'JUSTID' option was passed). + +##### *Example* +~~~php +$ids = ['1530113681011-0', '1530113681011-1', '1530113681011-2']; + +/* Without any options */ +$obj_redis->xClaim( + 'mystream', 'group1', 'myconsumer1', $ids +); + +/* With options */ +$obj_redis->xClaim( + 'mystream', 'group1', 'myconsumer2', $ids, + [ + 'IDLE' => time() * 1000, + 'RETRYCOUNT' => 5, + 'FORCE', + 'JUSTID' + ] +); +~~~ + +### xDel +----- + +##### *Prototype* +~~~php +$obj_redis->xDel($str_key, $arr_ids); +~~~ + +_**Description**_: Delete one or more messages from a stream. + +##### *Return value* +*long*: The number of messages removed + +##### *Example* +~~~php +$obj_redis->xDel('mystream', ['1530115304877-0', '1530115305731-0']); +~~~ + +### xGroup +----- + +##### *Prototype* +~~~php +$obj_redis->xGroup('HELP'); +$obj_redis->xGroup('SETID', $str_key, $str_group, $str_msg_id); +$obj_redis->xGroup('DELGROUP', $str_key, $str_group); +$obj_redis->xGroup('CREATE', $str_key, $str_group, $str_msg_id); +$obj_redis->xGroup('DELCONSUMER', $str_key, $str_group, $str_consumer_name); +~~~ + +_**Description**_: This command is used in order to create, destroy, or manage consumer groups. + +##### *Return value* +*Mixed*: This command returns different types depending on the specific XGROUP command executed. + +##### *Example* +~~~php +$obj_redis->xGroup('CREATE', 'mystream', 'mygroup'); +$obj_redis->xGroup('DELGROUP', 'mystream', 'mygroup'); +~~~ + +### xInfo +----- + +##### *Prototype* +~~~php +$obj_redis->xInfo('CONSUMERS', $str_stream, $str_group); +$obj_redis->xInfo('GROUPS', $str_stream); +$obj_redis->xInfo('STREAM', $str_stream); +$obj_redis->xInfo('HELP'); +~~~ + +_**Description**_: Get information about a stream or consumer groups. + +##### *Return value* +*Mixed*: This command returns different types depending on which subcommand is used. + +##### *Example* +~~~php +$obj_redis->xInfo('STREAM', 'mystream'); +~~~ + +### xLen +----- + +##### *Prototype* +~~~php +$obj_redis->xLen($str_stream); +~~~ + +_**Description**_: Get the length of a given stream + +##### *Return value* +*Long*: The number of messages in the stream. + +##### *Example* +~~~php +$obj_redis->xLen('mystream'); +~~~ + +### xPending +----- + +##### *Prototype* +~~~php +$obj_redis->xPending($str_stream, $str_group [, $i_start, $i_end, $i_count, $str_consumer]); +~~~ + +_**Description**_: Get information about pending messages in a given stream. + +##### *Return value* +*Array*: Information about the pending messages, in various forms depending on the specific invocation of XPENDING. + +##### *Examples* +~~~php +$obj_redis->xPending('mystream', 'mygroup'); +$obj_redis->xPending('mystream', 'mygroup', 0, '+', 1, 'consumer-1'); +~~~ + +### xRange +----- + +##### *Prototype* +~~~php +$obj_redis->xRange($str_stream, $i_start, $i_end [, $i_count]); +~~~ + +_**Description**_: Get a range of messages from a given stream. + +##### *Return value* +*Array*: The messages in the stream within the requested range. + +##### *Example* +~~~php +/* Get everything in this stream */ +$obj_redis->xRange('mystream', '-', '+'); + +/* Only the first two messages */ +$obj_redis->xRange('mystream', '-', '+', 2); +~~~ + +### xRead +----- + +##### *Prototype* +~~~php +$obj_redis->xRead($arr_streams [, $i_count, $i_block); +~~~ + +_**Description**_: Read data from one or more streams and only return IDs greater than sent in the command. + +##### *Return value* +*Array*: The messages in the stream newer than the IDs passed to Redis (if any). + +##### *Example* +~~~php +$obj_redis->xRead(['stream1' => '1535222584555-0', 'stream2' => '1535222584555-0']); + +/* --- Possible output --- +Array +( + [stream1] => Array + ( + [1535222584555-1] => Array + ( + [key:1] => val:1 + ) + + ) + + [stream2] => Array + ( + [1535222584555-1] => Array + ( + [key:1] => val:1 + ) + + ) + +) +*/ +~~~ + +### xReadGroup +----- + +##### *Prototype* +~~~php +$obj_redis->xReadGroup($str_group, $str_consumer, $arr_streams [, $i_count, $i_block]); +~~~ + +_**Description**_: This method is similar to xRead except that it supports reading messages for a specific consumer group. + +##### *Return value* +*Array*: The messages delivered to this consumer group (if any). + +##### *Examples* +~~~php +/* Consume messages for 'mygroup', 'consumer1' */ +$obj_redis->xReadGroup('mygroup', 'consumer1', ['s1' => 0, 's2' => 0]); + +/* Read a single message as 'consumer2' for up to a second until a message arrives. */ +$obj_redis->xReadGroup('mygroup', 'consumer2', ['s1' => 0, 's2' => 0], 1, 1000); +~~~ + +### xRevRange +----- + +##### *Prototype* +~~~php +$obj_redis->xRevRange($str_stream, $i_end, $i_start [, $i_count]); +~~~ + +_**Description**_: This is identical to xRange except the results come back in reverse order. Also note that Redis reverses the order of "start" and "end". + +##### *Return value* +*Array*: The messages in the range specified. + +##### *Example* +~~~php +$obj_redis->xRevRange('mystream', '+', '-'); +~~~ + +### xTrim +----- + +##### *Prototype* +~~~php +$obj_redis->xTrim($str_stream, $i_max_len [, $boo_approximate]); +~~~ + +_**Description**_: Trim the stream length to a given maximum. If the "approximate" flag is pasesed, Redis will use your size as a hint but only trim trees in whole nodes (this is more efficient). + +##### *Return value* +*long*: The number of messages trimed from the stream. + +##### *Example* +~~~php +/* Trim to exactly 100 messages */ +$obj_redis->xTrim('mystream', 100); + +/* Let Redis approximate the trimming */ +$obj_redis->xTrim('mystream', 100, true); +~~~ ## Pub/sub diff --git a/cluster_library.c b/cluster_library.c index 8f157873..5f710c5e 100644 --- a/cluster_library.c +++ b/cluster_library.c @@ -103,7 +103,7 @@ void cluster_free_reply(clusterReply *reply, int free_data) { for (i = 0; i < reply->elements && reply->element[i]; i++) { cluster_free_reply(reply->element[i], free_data); } - efree(reply->element); + if (reply->element) efree(reply->element); break; default: break; @@ -113,7 +113,8 @@ void cluster_free_reply(clusterReply *reply, int free_data) { static void cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements, - clusterReply **element, int *err TSRMLS_DC) + clusterReply **element, int status_strings, + int *err TSRMLS_DC) { int i; size_t sz; @@ -141,6 +142,7 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements, return; } r->len = (long long)sz; + if (status_strings) r->str = estrndup(buf, r->len); break; case TYPE_INT: r->integer = len; @@ -155,11 +157,15 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements, } break; case TYPE_MULTIBULK: - r->element = ecalloc(r->len,sizeof(clusterReply*)); - r->elements = r->len; - cluster_multibulk_resp_recursive(sock, r->elements, r->element, - err TSRMLS_CC); - if (*err) return; + if (r->len >= 0) { + r->elements = r->len; + if (r->len > 0) { + r->element = ecalloc(r->len,sizeof(clusterReply*)); + cluster_multibulk_resp_recursive(sock, r->elements, r->element, + status_strings, err TSRMLS_CC); + } + if (*err) return; + } break; default: *err = 1; @@ -191,15 +197,17 @@ static RedisSock *cluster_slot_sock(redisCluster *c, unsigned short slot, } /* Read the response from a cluster */ -clusterReply *cluster_read_resp(redisCluster *c TSRMLS_DC) { - return cluster_read_sock_resp(c->cmd_sock,c->reply_type,c->reply_len TSRMLS_CC); +clusterReply *cluster_read_resp(redisCluster *c, int status_strings TSRMLS_DC) { + return cluster_read_sock_resp(c->cmd_sock, c->reply_type, + status_strings ? c->line_reply : NULL, + c->reply_len TSRMLS_CC); } /* Read any sort of response from the socket, having already issued the * command and consumed the reply type and meta info (length) */ clusterReply* cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type, - size_t len TSRMLS_DC) + char *line_reply, size_t len TSRMLS_DC) { clusterReply *r; @@ -214,6 +222,10 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type, r->integer = len; break; case TYPE_LINE: + if (line_reply) { + r->str = estrndup(line_reply, len); + r->len = len; + } case TYPE_ERR: return r; case TYPE_BULK: @@ -229,7 +241,7 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type, if (len != (size_t)-1) { r->element = ecalloc(len, sizeof(clusterReply*)*len); cluster_multibulk_resp_recursive(redis_sock, len, r->element, - &err TSRMLS_CC); + line_reply != NULL, &err TSRMLS_CC); } break; default: @@ -618,7 +630,7 @@ clusterReply* cluster_get_slots(RedisSock *redis_sock TSRMLS_DC) } // Consume the rest of our response - if ((r = cluster_read_sock_resp(redis_sock, type, len TSRMLS_CC)) == NULL || + if ((r = cluster_read_sock_resp(redis_sock, type, NULL, len TSRMLS_CC)) == NULL || r->type != TYPE_MULTIBULK || r->elements < 1) { if (r) cluster_free_reply(r, 1); @@ -1486,6 +1498,24 @@ PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, efree(resp); } +PHP_REDIS_API void +cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) +{ + char *p; + + /* Cluster already has the reply so abort if this isn't a LINE response *or* if for + * some freaky reason we don't detect a null terminator */ + if (c->reply_type != TYPE_LINE || !(p = memchr(c->line_reply,'\0',sizeof(c->line_reply)))) { + CLUSTER_RETURN_FALSE(c); + } + + if (CLUSTER_IS_ATOMIC(c)) { + CLUSTER_RETURN_STRING(c, c->line_reply, p - c->line_reply); + } else { + add_next_index_stringl(&c->multi_resp, c->line_reply, p - c->line_reply); + } +} + /* BULK response handler */ PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) @@ -1799,12 +1829,15 @@ static void cluster_mbulk_variant_resp(clusterReply *r, zval *z_ret) add_next_index_long(z_ret, r->integer); break; case TYPE_LINE: - add_next_index_bool(z_ret, 1); + if (r->str) { + add_next_index_stringl(z_ret, r->str, r->len); + } else { + add_next_index_bool(z_ret, 1); + } break; case TYPE_BULK: if (r->len > -1) { add_next_index_stringl(z_ret, r->str, r->len); - efree(r->str); } else { add_next_index_null(z_ret); } @@ -1827,15 +1860,16 @@ static void cluster_mbulk_variant_resp(clusterReply *r, zval *z_ret) /* Variant response handling, for things like EVAL and various other responses * where we just map the replies from Redis type values to PHP ones directly. */ -PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, - void *ctx) +static void +cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + int status_strings, void *ctx) { clusterReply *r; zval zv, *z_arr = &zv; int i; // Make sure we can read it - if ((r = cluster_read_resp(c TSRMLS_CC)) == NULL) { + if ((r = cluster_read_resp(c, status_strings TSRMLS_CC)) == NULL) { CLUSTER_RETURN_FALSE(c); } @@ -1849,7 +1883,11 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust RETVAL_FALSE; break; case TYPE_LINE: - RETVAL_TRUE; + if (status_strings) { + RETVAL_STRINGL(r->str, r->len); + } else { + RETVAL_TRUE; + } break; case TYPE_BULK: if (r->len < 0) { @@ -1879,14 +1917,17 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust add_next_index_bool(&c->multi_resp, 0); break; case TYPE_LINE: - add_next_index_bool(&c->multi_resp, 1); + if (status_strings) { + add_next_index_stringl(&c->multi_resp, r->str, r->len); + } else { + add_next_index_bool(&c->multi_resp, 1); + } break; case TYPE_BULK: if (r->len < 0) { add_next_index_null(&c->multi_resp); } else { add_next_index_stringl(&c->multi_resp, r->str, r->len); - efree(r->str); } break; case TYPE_MULTIBULK: @@ -1899,7 +1940,19 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust } // Free our response structs, but not allocated data itself - cluster_free_reply(r, 0); + cluster_free_reply(r, 1); +} + +PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + void *ctx) +{ + cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 0, ctx); +} + +PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + void *ctx) +{ + cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, ctx); } /* Generic MULTI BULK response processor */ @@ -2052,6 +2105,76 @@ PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS, redisC } } +/* XRANGE */ +PHP_REDIS_API void +cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { + zval zv, *z_messages = &zv; + + REDIS_MAKE_STD_ZVAL(z_messages); + array_init(z_messages); + + c->cmd_sock->serializer = c->flags->serializer; + c->cmd_sock->compression = c->flags->compression; + + if (redis_read_stream_messages(c->cmd_sock, c->reply_len, z_messages TSRMLS_CC) < 0) { + zval_dtor(z_messages); + REDIS_FREE_ZVAL(z_messages); + CLUSTER_RETURN_FALSE(c); + } + + if (CLUSTER_IS_ATOMIC(c)) { + RETVAL_ZVAL(z_messages, 0, 1); + } else { + add_next_index_zval(&c->multi_resp, z_messages); + } +} + +/* XREAD */ +PHP_REDIS_API void +cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { + zval zv, *z_streams = &zv; + + REDIS_MAKE_STD_ZVAL(z_streams); + array_init(z_streams); + + c->cmd_sock->serializer = c->flags->serializer; + c->cmd_sock->compression = c->flags->compression; + + if (redis_read_stream_messages_multi(c->cmd_sock, c->reply_len, z_streams TSRMLS_CC) < 0) { + zval_dtor(z_streams); + REDIS_FREE_ZVAL(z_streams); + CLUSTER_RETURN_FALSE(c); + } + + if (CLUSTER_IS_ATOMIC(c)) { + RETVAL_ZVAL(z_streams, 0, 1); + } else { + add_next_index_zval(&c->multi_resp, z_streams); + } +} + +/* XCLAIM */ +PHP_REDIS_API void +cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { + zval zv, *z_msg = &zv; + + REDIS_MAKE_STD_ZVAL(z_msg); + array_init(z_msg); + + if (redis_read_xclaim_response(c->cmd_sock, c->reply_len, z_msg TSRMLS_CC) < 0) { + zval_dtor(z_msg); + REDIS_FREE_ZVAL(z_msg); + CLUSTER_RETURN_FALSE(c); + } + + if (CLUSTER_IS_ATOMIC(c)) { + RETVAL_ZVAL(z_msg, 0, 1); + } else { + add_next_index_zval(&c->multi_resp, z_msg); + } + +} + /* MULTI BULK response loop where we might pull the next one */ PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, int pull, mbulk_cb cb, zval *z_ret) diff --git a/cluster_library.h b/cluster_library.h index 49285da7..01457832 100644 --- a/cluster_library.h +++ b/cluster_library.h @@ -230,7 +230,7 @@ typedef struct redisCluster { /* One RedisSock struct for serialization and prefix information */ RedisSock *flags; - /* The first line of our last reply, not including our reply type byte + /* The first line of our last reply, not including our reply type byte * or the trailing \r\n */ char line_reply[1024]; @@ -283,7 +283,7 @@ typedef struct clusterDistList { size_t len, size; } clusterDistList; -/* Context for things like MGET/MSET/MSETNX. When executing in MULTI mode, +/* Context for things like MGET/MSET/MSETNX. When executing in MULTI mode, * we'll want to re-integrate into one running array, except for the last * command execution, in which we'll want to return the value (or add it) */ typedef struct clusterMultiCtx { @@ -325,17 +325,17 @@ typedef struct clusterReply { } clusterReply; /* Direct variant response handler */ -clusterReply *cluster_read_resp(redisCluster *c TSRMLS_DC); -clusterReply *cluster_read_sock_resp(RedisSock *redis_sock, - REDIS_REPLY_TYPE type, size_t reply_len TSRMLS_DC); +clusterReply *cluster_read_resp(redisCluster *c, int status_strings TSRMLS_DC); +clusterReply *cluster_read_sock_resp(RedisSock *redis_sock, + REDIS_REPLY_TYPE type, char *line_reply, size_t reply_len TSRMLS_DC); void cluster_free_reply(clusterReply *reply, int free_data); /* Cluster distribution helpers for WATCH */ HashTable *cluster_dist_create(); void cluster_dist_free(HashTable *ht); -int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key, +int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key, strlen_t key_len, clusterKeyVal **kv); -void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *val +void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *val TSRMLS_DC); /* Aggregation for multi commands like MGET, MSET, and MSETNX */ @@ -351,7 +351,7 @@ unsigned short cluster_hash_key(const char *key, int len); /* Get the current time in miliseconds */ long long mstime(void); -PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char *cmd, +PHP_REDIS_API short cluster_send_command(redisCluster *c, short slot, const char *cmd, int cmd_len TSRMLS_DC); PHP_REDIS_API void cluster_disconnect(redisCluster *c, int force TSRMLS_DC); @@ -363,7 +363,7 @@ PHP_REDIS_API int cluster_reset_multi(redisCluster *c); PHP_REDIS_API short cluster_find_slot(redisCluster *c, const char *host, unsigned short port); -PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd, +PHP_REDIS_API int cluster_send_slot(redisCluster *c, short slot, char *cmd, int cmd_len, REDIS_REPLY_TYPE rtype TSRMLS_DC); PHP_REDIS_API redisCluster *cluster_create(double timeout, double read_timeout, @@ -379,28 +379,30 @@ PHP_REDIS_API char **cluster_sock_read_multibulk_reply(RedisSock *redis_sock, /* * Redis Cluster response handlers. Our response handlers generally take the * following form: - * PHP_REDIS_API void handler(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + * PHP_REDIS_API void handler(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, * void *ctx) * * Reply handlers are responsible for setting the PHP return value (either to * something valid, or FALSE in the case of some failures). */ -PHP_REDIS_API void cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, +PHP_REDIS_API void cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); PHP_REDIS_API void cluster_ping_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, +PHP_REDIS_API void cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, +PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, +PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API void cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, +PHP_REDIS_API void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API void cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, +PHP_REDIS_API void cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, +PHP_REDIS_API void cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + void *ctx); +PHP_REDIS_API void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); PHP_REDIS_API void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); @@ -411,28 +413,31 @@ PHP_REDIS_API void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); +PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS, + redisCluster *c, void *ctx); + /* MULTI BULK response functions */ -PHP_REDIS_API void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, +PHP_REDIS_API void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, mbulk_cb func, void *ctx); -PHP_REDIS_API void cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, +PHP_REDIS_API void cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API void cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, +PHP_REDIS_API void cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); PHP_REDIS_API void cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); PHP_REDIS_API void cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS, +PHP_REDIS_API void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); PHP_REDIS_API void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, +PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, int pull, mbulk_cb cb, zval *z_ret); /* Handlers for things like DEL/MGET/MSET/MSETNX */ -PHP_REDIS_API void cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS, +PHP_REDIS_API void cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); -PHP_REDIS_API void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS, +PHP_REDIS_API void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); PHP_REDIS_API void cluster_mset_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); @@ -448,13 +453,21 @@ PHP_REDIS_API void cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); /* CLIENT LIST response handler */ -PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS, +PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS, + redisCluster *c, void *ctx); + +/* Custom STREAM handlers */ +PHP_REDIS_API void cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS, + redisCluster *c, void *ctx); +PHP_REDIS_API void cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS, + redisCluster *c, void *ctx); +PHP_REDIS_API void cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx); /* MULTI BULK processing callbacks */ -int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result, +int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result, long long count, void *ctx TSRMLS_DC); -int mbulk_resp_loop_raw(RedisSock *redis_sock, zval *z_result, +int mbulk_resp_loop_raw(RedisSock *redis_sock, zval *z_result, long long count, void *ctx TSRMLS_DC); int mbulk_resp_loop_zipstr(RedisSock *redis_sock, zval *z_result, long long count, void *ctx TSRMLS_DC); @@ -22,6 +22,9 @@ typedef struct { char *val; } zend_string; +#define REDIS_MAKE_STD_ZVAL(zv) MAKE_STD_ZVAL(zv) +#define REDIS_FREE_ZVAL(zv) (efree(zv)) + #define ZSTR_VAL(s) (s)->val #define ZSTR_LEN(s) (s)->len @@ -432,6 +435,9 @@ typedef int strlen_t; typedef size_t strlen_t; #define PHPREDIS_ZVAL_IS_STRICT_FALSE(z) (Z_TYPE_P(z) == IS_FALSE) #define PHPREDIS_GET_OBJECT(class_entry, z) (class_entry *)((char *)Z_OBJ_P(z) - XtOffsetOf(class_entry, std)) + +#define REDIS_MAKE_STD_ZVAL(zv) do {} while(0) +#define REDIS_FREE_ZVAL(zv) do {} while(0) #endif /* NULL check so Eclipse doesn't go crazy */ @@ -1084,4 +1090,82 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_georadiusbymember, 0, 0, 4) ZEND_ARG_ARRAY_INFO(0, opts, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_INFO_EX(arginfo_xadd, 0, 0, 3) + ZEND_ARG_INFO(0, str_key) + ZEND_ARG_INFO(0, str_id) + ZEND_ARG_ARRAY_INFO(0, arr_fields, 0) + ZEND_ARG_INFO(0, i_maxlen) + ZEND_ARG_INFO(0, boo_approximate) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xpending, 0, 0, 2) + ZEND_ARG_INFO(0, str_key) + ZEND_ARG_INFO(0, str_group) + ZEND_ARG_INFO(0, str_start) + ZEND_ARG_INFO(0, str_end) + ZEND_ARG_INFO(0, i_count) + ZEND_ARG_INFO(0, str_consumer) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xrange, 0, 0, 3) + ZEND_ARG_INFO(0, str_key) + ZEND_ARG_INFO(0, str_start) + ZEND_ARG_INFO(0, str_end) + ZEND_ARG_INFO(0, i_count) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xread, 0, 0, 1) + ZEND_ARG_ARRAY_INFO(0, arr_streams, 0) + ZEND_ARG_INFO(0, i_count) + ZEND_ARG_INFO(0, i_block) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xreadgroup, 0, 0, 3) + ZEND_ARG_INFO(0, str_group) + ZEND_ARG_INFO(0, str_consumer) + ZEND_ARG_ARRAY_INFO(0, arr_streams, 0) + ZEND_ARG_INFO(0, i_count) + ZEND_ARG_INFO(0, i_block) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xack, 0, 0, 3) + ZEND_ARG_INFO(0, str_key) + ZEND_ARG_INFO(0, str_group) + ZEND_ARG_ARRAY_INFO(0, arr_ids, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xclaim, 0, 0, 5) + ZEND_ARG_INFO(0, str_key) + ZEND_ARG_INFO(0, str_group) + ZEND_ARG_INFO(0, str_consumer) + ZEND_ARG_INFO(0, i_min_idle) + ZEND_ARG_ARRAY_INFO(0, arr_ids, 0) + ZEND_ARG_ARRAY_INFO(0, arr_opts, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xgroup, 0, 0, 1) + ZEND_ARG_INFO(0, str_operation) + ZEND_ARG_INFO(0, str_key) + ZEND_ARG_INFO(0, str_arg1) + ZEND_ARG_INFO(0, str_arg2) + ZEND_ARG_INFO(0, str_arg3) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xinfo, 0, 0, 1) + ZEND_ARG_INFO(0, str_cmd) + ZEND_ARG_INFO(0, str_key) + ZEND_ARG_INFO(0, str_group) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xtrim, 0, 0, 2) + ZEND_ARG_INFO(0, str_key) + ZEND_ARG_INFO(0, i_maxlen) + ZEND_ARG_INFO(0, boo_approximate) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_xdel, 0, 0, 2) + ZEND_ARG_INFO(0, str_key) + ZEND_ARG_ARRAY_INFO(0, arr_ids, 0) +ZEND_END_ARG_INFO() + #endif @@ -114,6 +114,13 @@ static int resend_auth(RedisSock *redis_sock TSRMLS_DC) { return 0; } +/* Helper function and macro to test a RedisSock error prefix. */ +#define REDIS_SOCK_ERRCMP_STATIC(rs, s) redis_sock_errcmp(rs, s, sizeof(s)-1) +static int redis_sock_errcmp(RedisSock *redis_sock, const char *err, size_t errlen) { + return ZSTR_LEN(redis_sock->err) >= errlen && + memcmp(ZSTR_VAL(redis_sock->err), err, errlen) == 0; +} + /* Helper function that will throw an exception for a small number of ERR codes * returned by Redis. Typically we just return FALSE to the caller in the event * of an ERROR reply, but for the following error types: @@ -124,11 +131,19 @@ static int resend_auth(RedisSock *redis_sock TSRMLS_DC) { static void redis_error_throw(RedisSock *redis_sock TSRMLS_DC) { - if (redis_sock != NULL && redis_sock->err != NULL && - memcmp(ZSTR_VAL(redis_sock->err), "ERR", sizeof("ERR") - 1) != 0 && - memcmp(ZSTR_VAL(redis_sock->err), "NOSCRIPT", sizeof("NOSCRIPT") - 1) != 0 && - memcmp(ZSTR_VAL(redis_sock->err), "WRONGTYPE", sizeof("WRONGTYPE") - 1) != 0 - ) { + /* Short circuit if we have no redis_sock or any error */ + if (redis_sock == NULL || redis_sock->err == NULL) + return; + + /* We may want to flip this logic and check for MASTERDOWN, AUTH, + * and LOADING but that may have side effects (esp for things like + * Disque) */ + if (!REDIS_SOCK_ERRCMP_STATIC(redis_sock, "ERR") && + !REDIS_SOCK_ERRCMP_STATIC(redis_sock, "NOSCRIPT") && + !REDIS_SOCK_ERRCMP_STATIC(redis_sock, "WRONGTYPE") && + !REDIS_SOCK_ERRCMP_STATIC(redis_sock, "BUSYGROUP") && + !REDIS_SOCK_ERRCMP_STATIC(redis_sock, "NOGROUP")) + { zend_throw_exception(redis_exception_ce, ZSTR_VAL(redis_sock->err), 0 TSRMLS_CC); } } @@ -442,8 +457,7 @@ redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAMETERS, array_init(z_tab); - redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, z_tab, - numElems, UNSERIALIZE_ALL); + redis_mbulk_reply_loop(redis_sock, z_tab, numElems, UNSERIALIZE_ALL TSRMLS_CC); return z_tab; } @@ -730,6 +744,24 @@ int redis_cmd_append_sstr_key(smart_string *str, char *key, strlen_t len, RedisS return retval; } +/* Append an array key to a redis smart string command. This function + * handles the boilerplate conditionals around string or integer keys */ +int redis_cmd_append_sstr_arrkey(smart_string *cmd, zend_string *kstr, ulong idx) +{ + char *arg, kbuf[128]; + int len; + + if (kstr) { + len = ZSTR_LEN(kstr); + arg = ZSTR_VAL(kstr); + } else { + len = snprintf(kbuf, sizeof(kbuf), "%ld", (long)idx); + arg = (char*)kbuf; + } + + return redis_cmd_append_sstr(cmd, arg, len); +} + PHP_REDIS_API void redis_bulk_double_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx) { char *response; @@ -1137,6 +1169,30 @@ static void array_zip_values_and_scores(RedisSock *redis_sock, zval *z_tab, } static int +read_mbulk_header(RedisSock *redis_sock, int *nelem TSRMLS_DC) +{ + char line[4096]; + size_t len; + + /* Throws exception on failure */ + if (redis_sock_gets(redis_sock, line, sizeof(line)-1, &len TSRMLS_CC) < 0) + return -1; + + if (line[0] != '*') { + if (IS_ATOMIC(redis_sock)) { + if (line[0] == '-') { + redis_sock_set_err(redis_sock, line+1, len-1); + } + } + return -1; + } + + *nelem = atoi(line+1); + + return 0; +} + +static int redis_mbulk_reply_zipped(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, int unserialize, int decode) { @@ -1164,8 +1220,7 @@ redis_mbulk_reply_zipped(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, array_init(z_multi_result); /* pre-allocate array for multi's results. */ /* Grab our key, value, key, value array */ - redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, - z_multi_result, numElems, unserialize); + redis_mbulk_reply_loop(redis_sock, z_multi_result, numElems, unserialize TSRMLS_CC); /* Zip keys and values */ array_zip_values_and_scores(redis_sock, z_multi_result, decode TSRMLS_CC); @@ -1179,6 +1234,243 @@ redis_mbulk_reply_zipped(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, return 0; } +/* Consume message ID */ +PHP_REDIS_API int +redis_sock_read_single_line(RedisSock *redis_sock, char *buffer, size_t buflen, + size_t *linelen, int set_err TSRMLS_DC) +{ + REDIS_REPLY_TYPE type; + long info; + + if (redis_read_reply_type(redis_sock, &type, &info TSRMLS_CC) < 0 || + (type != TYPE_LINE && type != TYPE_ERR)) + { + return -1; + } + + if (redis_sock_gets(redis_sock, buffer, buflen, linelen TSRMLS_CC) < 0) { + return -1; + } + + if (set_err && type == TYPE_ERR) { + if (IS_ATOMIC(redis_sock)) { + redis_sock_set_err(redis_sock, buffer, *linelen); + } + } + + return type == TYPE_LINE ? 0 : -1; +} + +/* Helper function to consume Redis stream message data. This is useful for + * multiple stream callers (e.g. XREAD[GROUP], and X[REV]RANGE handlers). */ +PHP_REDIS_API int +redis_read_stream_messages(RedisSock *redis_sock, int count, zval *z_ret + TSRMLS_DC) +{ + zval zv, *z_message = &zv; + int i, mhdr, fields; + char id[1024]; + size_t idlen; + + /* Iterate over each message */ + for (i = 0; i < count; i++) { + /* Consume inner multi-bulk header, message ID itself and finaly + * the multi-bulk header for field and values */ + if ((read_mbulk_header(redis_sock, &mhdr TSRMLS_CC) < 0 || mhdr != 2) || + redis_sock_read_single_line(redis_sock, id, sizeof(id), &idlen, 0 TSRMLS_CC) < 0 || + (read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0)) + { + return -1; + } + + REDIS_MAKE_STD_ZVAL(z_message); + array_init(z_message); + + redis_mbulk_reply_loop(redis_sock, z_message, fields, UNSERIALIZE_VALS TSRMLS_CC); + array_zip_values_and_scores(redis_sock, z_message, SCORE_DECODE_NONE TSRMLS_CC); + add_assoc_zval_ex(z_ret, id, idlen, z_message); + } + + return 0; +} + +PHP_REDIS_API int +redis_xrange_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + zval *z_tab, void *ctx) +{ + int messages; + zval zv, *z_messages = &zv; + + REDIS_MAKE_STD_ZVAL(z_messages); + array_init(z_messages); + + if (read_mbulk_header(redis_sock, &messages TSRMLS_CC) < 0 || + redis_read_stream_messages(redis_sock, messages, z_messages TSRMLS_CC) < 0) + { + zval_dtor(z_messages); + REDIS_FREE_ZVAL(z_messages); + if (IS_ATOMIC(redis_sock)) { + RETVAL_FALSE; + } else { + add_next_index_bool(z_tab, 0); + } + return -1; + } + + if (IS_ATOMIC(redis_sock)) { + RETVAL_ZVAL(z_messages, 0, 1); + } else { + add_next_index_zval(z_tab, z_messages); + } + + return 0; +} + +PHP_REDIS_API int +redis_read_stream_messages_multi(RedisSock *redis_sock, int count, zval *z_streams + TSRMLS_DC) +{ + zval zv, *z_messages = &zv; + int i, shdr, messages; + char *id; + int idlen; + + for (i = 0; i < count; i++) { + if ((read_mbulk_header(redis_sock, &shdr TSRMLS_CC) < 0 || shdr != 2) || + (id = redis_sock_read(redis_sock, &idlen TSRMLS_CC)) == NULL || + read_mbulk_header(redis_sock, &messages TSRMLS_CC) < 0) + { + if (id) efree(id); + return -1; + } + + REDIS_MAKE_STD_ZVAL(z_messages); + array_init(z_messages); + + if (redis_read_stream_messages(redis_sock, messages, z_messages TSRMLS_CC) < 0) + goto failure; + + add_assoc_zval_ex(z_streams, id, idlen, z_messages); + efree(id); + } + + return 0; +failure: + efree(id); + zval_dtor(z_messages); + REDIS_FREE_ZVAL(z_messages); + return -1; +} + +PHP_REDIS_API int +redis_xread_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + zval *z_tab, void *ctx) +{ + zval zv, *z_rv = &zv; + int streams; + + if (read_mbulk_header(redis_sock, &streams TSRMLS_CC) < 0) + goto failure; + + REDIS_MAKE_STD_ZVAL(z_rv); + array_init(z_rv); + + if (redis_read_stream_messages_multi(redis_sock, streams, z_rv TSRMLS_CC) < 0) + goto cleanup; + + if (IS_ATOMIC(redis_sock)) { + RETVAL_ZVAL(z_rv, 0, 1); + } else { + add_next_index_zval(z_tab, z_rv); + } + return 0; + +cleanup: + zval_dtor(z_rv); + REDIS_FREE_ZVAL(z_rv); +failure: + if (IS_ATOMIC(redis_sock)) { + RETVAL_FALSE; + } else { + add_next_index_bool(z_tab, 0); + } + return -1; +} + +/* This helper function does that actual XCLAIM response handling, which can be used by both + * Redis and RedisCluster. Note that XCLAIM is somewhat unique in that its reply type depends + * on whether or not it was called with the JUSTID option */ +PHP_REDIS_API int +redis_read_xclaim_response(RedisSock *redis_sock, int count, zval *rv TSRMLS_DC) { + zval zv, *z_msg = &zv; + REDIS_REPLY_TYPE type; + char id[1024]; + int i, fields; + long li; + size_t idlen; + + for (i = 0; i < count; i++) { + /* Consume inner reply type */ + if (redis_read_reply_type(redis_sock, &type, &li TSRMLS_CC) < 0 || + (type != TYPE_LINE && type != TYPE_MULTIBULK)) return -1; + + if (type == TYPE_LINE) { + /* JUSTID variant */ + if (redis_sock_gets(redis_sock, id, sizeof(id), &idlen TSRMLS_CC) < 0) + return -1; + add_next_index_stringl(rv, id, idlen); + } else { + if (li != 2 || redis_sock_read_single_line(redis_sock, id, sizeof(id), &idlen, 0 TSRMLS_CC) < 0 || + (read_mbulk_header(redis_sock, &fields TSRMLS_CC) < 0 || fields % 2 != 0)) return -1; + + REDIS_MAKE_STD_ZVAL(z_msg); + array_init(z_msg); + + redis_mbulk_reply_loop(redis_sock, z_msg, fields, UNSERIALIZE_VALS TSRMLS_CC); + array_zip_values_and_scores(redis_sock, z_msg, SCORE_DECODE_NONE TSRMLS_CC); + add_assoc_zval_ex(rv, id, idlen, z_msg); + } + } + + return 0; +} + +PHP_REDIS_API int +redis_xclaim_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + zval *z_tab, void *ctx) +{ + zval zv, *z_ret = &zv; + int messages; + + /* All XCLAIM responses start multibulk */ + if (read_mbulk_header(redis_sock, &messages TSRMLS_CC) < 0) + goto failure; + + REDIS_MAKE_STD_ZVAL(z_ret); + array_init(z_ret); + + if (redis_read_xclaim_response(redis_sock, messages, z_ret TSRMLS_CC) < 0) { + zval_dtor(z_ret); + REDIS_FREE_ZVAL(z_ret); + goto failure; + } + + if (IS_ATOMIC(redis_sock)) { + RETVAL_ZVAL(z_ret, 0, 1); + } else { + add_next_index_zval(z_tab, z_ret); + } + return 0; + +failure: + if (IS_ATOMIC(redis_sock)) { + RETVAL_FALSE; + } else { + add_next_index_bool(z_tab, 0); + } + return -1; +} + /* Zipped key => value reply but we don't touch anything (e.g. CONFIG GET) */ PHP_REDIS_API int redis_mbulk_reply_zipped_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx) { @@ -1262,6 +1554,30 @@ PHP_REDIS_API void redis_string_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock efree(response); } +PHP_REDIS_API +void redis_single_line_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + zval *z_tab, void *ctx) +{ + char buffer[4096]; + size_t len; + + if (redis_sock_read_single_line(redis_sock, buffer, sizeof(buffer), &len, 1 TSRMLS_CC) < 0) { + if (IS_ATOMIC(redis_sock)) { + RETURN_FALSE; + } else { + add_next_index_bool(z_tab, 0); + } + return; + } + + //str = estrndup(buffer, len); + if (IS_ATOMIC(redis_sock)) { + RETVAL_STRINGL(buffer, len); + } else { + add_next_index_stringl(z_tab, buffer, len); + } +} + /* like string response, but never unserialized. */ PHP_REDIS_API void redis_ping_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, @@ -1597,8 +1913,7 @@ PHP_REDIS_API int redis_sock_read_multibulk_reply(INTERNAL_FUNCTION_PARAMETERS, #endif array_init(z_multi_result); /* pre-allocate array for multi's results. */ - redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, - z_multi_result, numElems, UNSERIALIZE_ALL); + redis_mbulk_reply_loop(redis_sock, z_multi_result, numElems, UNSERIALIZE_ALL TSRMLS_CC); if (IS_ATOMIC(redis_sock)) { RETVAL_ZVAL(z_multi_result, 0, 1); @@ -1640,8 +1955,7 @@ redis_mbulk_reply_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval #endif array_init(z_multi_result); /* pre-allocate array for multi's results. */ - redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, - z_multi_result, numElems, UNSERIALIZE_NONE); + redis_mbulk_reply_loop(redis_sock, z_multi_result, numElems, UNSERIALIZE_NONE TSRMLS_CC); if (IS_ATOMIC(redis_sock)) { RETVAL_ZVAL(z_multi_result, 0, 1); @@ -1653,8 +1967,8 @@ redis_mbulk_reply_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval } PHP_REDIS_API void -redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, - zval *z_tab, int count, int unserialize) +redis_mbulk_reply_loop(RedisSock *redis_sock, zval *z_tab, int count, + int unserialize TSRMLS_DC) { char *line; int i, len; @@ -2090,32 +2404,27 @@ redis_read_reply_type(RedisSock *redis_sock, REDIS_REPLY_TYPE *reply_type, /* * Read a single line response, having already consumed the reply-type byte */ -PHP_REDIS_API int +static int redis_read_variant_line(RedisSock *redis_sock, REDIS_REPLY_TYPE reply_type, - zval *z_ret TSRMLS_DC) + int as_string, zval *z_ret TSRMLS_DC) { // Buffer to read our single line reply char inbuf[4096]; - size_t line_size; + size_t len; /* Attempt to read our single line reply */ - if(redis_sock_gets(redis_sock, inbuf, sizeof(inbuf), &line_size TSRMLS_CC) < 0) { + if(redis_sock_gets(redis_sock, inbuf, sizeof(inbuf), &len TSRMLS_CC) < 0) { return -1; } - // If this is an error response, check if it is a SYNC error, and throw in - // that case + /* Throw exception on SYNC error otherwise just set error string */ if(reply_type == TYPE_ERR) { - /* Set our last error */ - redis_sock_set_err(redis_sock, inbuf, line_size); - - /* Handle throwable errors */ + redis_sock_set_err(redis_sock, inbuf, len); redis_error_throw(redis_sock TSRMLS_CC); - - /* Set our response to FALSE */ ZVAL_FALSE(z_ret); + } else if (as_string) { + ZVAL_STRINGL(z_ret, inbuf, len); } else { - /* Set our response to TRUE */ ZVAL_TRUE(z_ret); } @@ -2140,8 +2449,8 @@ redis_read_variant_bulk(RedisSock *redis_sock, int size, zval *z_ret } PHP_REDIS_API int -redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret - TSRMLS_DC) +redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, int status_strings, + zval *z_ret TSRMLS_DC) { long reply_info; REDIS_REPLY_TYPE reply_type; @@ -2166,8 +2475,8 @@ redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret #if (PHP_MAJOR_VERSION < 7) ALLOC_INIT_ZVAL(z_subelem); #endif - redis_read_variant_line(redis_sock, reply_type, z_subelem - TSRMLS_CC); + redis_read_variant_line(redis_sock, reply_type, status_strings, + z_subelem TSRMLS_CC); add_next_index_zval(z_ret, z_subelem); break; case TYPE_INT: @@ -2192,7 +2501,7 @@ redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret array_init(z_subelem); add_next_index_zval(z_ret, z_subelem); redis_read_multibulk_recursive(redis_sock, reply_info, - z_subelem TSRMLS_CC); + status_strings, z_subelem TSRMLS_CC); break; default: // Stop the compiler from whinging @@ -2206,9 +2515,9 @@ redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret return 0; } -PHP_REDIS_API int -redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, - zval *z_tab, void *ctx) +static int +variant_reply_generic(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + int status_strings, zval *z_tab, void *ctx) { // Reply type, and reply size vars REDIS_REPLY_TYPE reply_type; @@ -2229,7 +2538,7 @@ redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, switch(reply_type) { case TYPE_ERR: case TYPE_LINE: - redis_read_variant_line(redis_sock, reply_type, z_ret TSRMLS_CC); + redis_read_variant_line(redis_sock, reply_type, status_strings, z_ret TSRMLS_CC); break; case TYPE_INT: ZVAL_LONG(z_ret, reply_info); @@ -2243,9 +2552,8 @@ redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, // If we've got more than zero elements, parse our multi bulk // response recursively - if(reply_info > -1) { - redis_read_multibulk_recursive(redis_sock, reply_info, z_ret - TSRMLS_CC); + if (reply_info > -1) { + redis_read_multibulk_recursive(redis_sock, reply_info, status_strings, z_ret TSRMLS_CC); } break; default: @@ -2269,4 +2577,18 @@ redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, return 0; } +PHP_REDIS_API int +redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + zval *z_tab, void *ctx) +{ + return variant_reply_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, 0, z_tab, ctx); +} + +PHP_REDIS_API int +redis_read_variant_reply_strings(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + zval *z_tab, void *ctx) +{ + return variant_reply_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, 1, z_tab, ctx); +} + /* vim: set tabstop=4 softtabstop=4 expandtab shiftwidth=4: */ @@ -21,6 +21,7 @@ int redis_cmd_append_sstr_long(smart_string *str, long append); int redis_cmd_append_sstr_dbl(smart_string *str, double value); int redis_cmd_append_sstr_zval(smart_string *str, zval *z, RedisSock *redis_sock TSRMLS_DC); int redis_cmd_append_sstr_key(smart_string *str, char *key, strlen_t len, RedisSock *redis_sock, short *slot); +int redis_cmd_append_sstr_arrkey(smart_string *cmd, zend_string *kstr, ulong idx); PHP_REDIS_API int redis_spprintf(RedisSock *redis_sock, short *slot TSRMLS_DC, char **ret, char *kw, char *fmt, ...); @@ -33,6 +34,8 @@ PHP_REDIS_API void redis_boolean_response_impl(INTERNAL_FUNCTION_PARAMETERS, Red PHP_REDIS_API void redis_boolean_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHP_REDIS_API void redis_bulk_double_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHP_REDIS_API void redis_string_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); +PHP_REDIS_API void redis_single_line_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + zval *z_tab, void *ctx); PHP_REDIS_API void redis_ping_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHP_REDIS_API void redis_info_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHP_REDIS_API void redis_parse_info_response(char *response, zval *z_ret); @@ -43,9 +46,13 @@ PHP_REDIS_API int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC); PHP_REDIS_API int redis_sock_server_open(RedisSock *redis_sock TSRMLS_DC); PHP_REDIS_API int redis_sock_disconnect(RedisSock *redis_sock, int force TSRMLS_DC); PHP_REDIS_API zval *redis_sock_read_multibulk_reply_zval(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab); +PHP_REDIS_API int redis_sock_read_single_line(RedisSock *redis_sock, char *buffer, + size_t buflen, size_t *linelen, int set_err TSRMLS_DC); PHP_REDIS_API char *redis_sock_read_bulk_reply(RedisSock *redis_sock, int bytes TSRMLS_DC); PHP_REDIS_API int redis_sock_read_multibulk_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *_z_tab, void *ctx); -PHP_REDIS_API void redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, int count, int unserialize); +//PHP_REDIS_API void redis_mbulk_reply_loop(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, int count, int unserialize); +PHP_REDIS_API void redis_mbulk_reply_loop(RedisSock *redis_sock, zval *z_tab, int count, int unserialize TSRMLS_DC); + PHP_REDIS_API int redis_mbulk_reply_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHP_REDIS_API int redis_mbulk_reply_zipped_raw(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); @@ -56,7 +63,15 @@ PHP_REDIS_API int redis_mbulk_reply_assoc(INTERNAL_FUNCTION_PARAMETERS, RedisSoc PHP_REDIS_API int redis_sock_read_scan_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, REDIS_SCAN_TYPE type, zend_long *iter); -PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS, + +PHP_REDIS_API int redis_xrange_reply(INTERNAL_FUNCTION_PARAMETERS, + RedisSock *redis_sock, zval *z_tab, void *ctx); +PHP_REDIS_API int redis_xread_reply(INTERNAL_FUNCTION_PARAMETERS, + RedisSock *redis_sock, zval *z_tab, void *ctx); +PHP_REDIS_API int redis_xclaim_reply(INTERNAL_FUNCTION_PARAMETERS, + RedisSock *redis_sock, zval *z_tab, void *ctx); + +PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHP_REDIS_API int redis_unsubscribe_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); @@ -78,16 +93,22 @@ redis_unserialize(RedisSock *redis_sock, const char *val, int val_len, zval *z_r PHP_REDIS_API int redis_pack(RedisSock *redis_sock, zval *z, char **val, strlen_t *val_len TSRMLS_DC); PHP_REDIS_API int redis_unpack(RedisSock *redis_sock, const char *val, int val_len, zval *z_ret TSRMLS_DC); +PHP_REDIS_API int +redis_read_stream_messages(RedisSock *redis_sock, int count, zval *z_ret TSRMLS_DC); +PHP_REDIS_API int +redis_read_stream_messages_multi(RedisSock *redis_sock, int count, zval *z_ret TSRMLS_DC); +PHP_REDIS_API int +redis_read_xclaim_response(RedisSock *redis_sock, int count, zval *rv TSRMLS_DC); + /* * Variant Read methods, mostly to implement eval */ PHP_REDIS_API int redis_read_reply_type(RedisSock *redis_sock, REDIS_REPLY_TYPE *reply_type, long *reply_info TSRMLS_DC); -PHP_REDIS_API int redis_read_variant_line(RedisSock *redis_sock, REDIS_REPLY_TYPE reply_type, zval *z_ret TSRMLS_DC); PHP_REDIS_API int redis_read_variant_bulk(RedisSock *redis_sock, int size, zval *z_ret TSRMLS_DC); -PHP_REDIS_API int redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, zval *z_ret TSRMLS_DC); +PHP_REDIS_API int redis_read_multibulk_recursive(RedisSock *redis_sock, int elements, int status_strings, zval *z_ret TSRMLS_DC); PHP_REDIS_API int redis_read_variant_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); - +PHP_REDIS_API int redis_read_variant_reply_strings(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHP_REDIS_API void redis_client_list_reply(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab); #endif diff --git a/php_redis.h b/php_redis.h index b076e51b..d85c13ff 100644 --- a/php_redis.h +++ b/php_redis.h @@ -222,6 +222,21 @@ PHP_METHOD(Redis, pfadd); PHP_METHOD(Redis, pfcount); PHP_METHOD(Redis, pfmerge); +/* STREAMS */ +PHP_METHOD(Redis, xack); +PHP_METHOD(Redis, xadd); +PHP_METHOD(Redis, xclaim); +PHP_METHOD(Redis, xdel); +PHP_METHOD(Redis, xgroup); +PHP_METHOD(Redis, xinfo); +PHP_METHOD(Redis, xlen); +PHP_METHOD(Redis, xpending); +PHP_METHOD(Redis, xrange); +PHP_METHOD(Redis, xread); +PHP_METHOD(Redis, xreadgroup); +PHP_METHOD(Redis, xrevrange); +PHP_METHOD(Redis, xtrim); + /* Reflection */ PHP_METHOD(Redis, getHost); PHP_METHOD(Redis, getPort); @@ -402,6 +402,19 @@ static zend_function_entry redis_functions[] = { PHP_ME(Redis, unwatch, arginfo_void, ZEND_ACC_PUBLIC) PHP_ME(Redis, wait, arginfo_wait, ZEND_ACC_PUBLIC) PHP_ME(Redis, watch, arginfo_watch, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xack, arginfo_xack, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xadd, arginfo_xadd, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xclaim, arginfo_xclaim, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xdel, arginfo_xdel, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xgroup, arginfo_xgroup, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xinfo, arginfo_xinfo, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xlen, arginfo_key, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xpending, arginfo_xpending, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xrange, arginfo_xrange, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xread, arginfo_xread, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xreadgroup, arginfo_xreadgroup, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xrevrange, arginfo_xrange, ZEND_ACC_PUBLIC) + PHP_ME(Redis, xtrim, arginfo_xtrim, ZEND_ACC_PUBLIC) PHP_ME(Redis, zAdd, arginfo_zadd, ZEND_ACC_PUBLIC) PHP_ME(Redis, zCard, arginfo_key, ZEND_ACC_PUBLIC) PHP_ME(Redis, zCount, arginfo_key_min_max, ZEND_ACC_PUBLIC) @@ -1413,7 +1426,7 @@ PHP_METHOD(Redis, sAdd) /* {{{ proto boolean Redis::sAddArray(string key, array $values) */ PHP_METHOD(Redis, sAddArray) { - REDIS_PROCESS_KW_CMD("SADD", redis_key_arr_cmd, redis_long_response); + REDIS_PROCESS_KW_CMD("SADD", redis_key_val_arr_cmd, redis_long_response); } /* }}} */ /* {{{ proto int Redis::sSize(string key) */ @@ -2441,7 +2454,7 @@ redis_sock_read_multibulk_multi_reply_loop(INTERNAL_FUNCTION_PARAMETERS, add_next_index_zval(z_tab, z_ret); int num = atol(inbuf + 1); - if (num > 0 && redis_read_multibulk_recursive(redis_sock, num, z_ret TSRMLS_CC) < 0) { + if (num > 0 && redis_read_multibulk_recursive(redis_sock, num, 0, z_ret TSRMLS_CC) < 0) { } if (fi) fi = fi->next; } @@ -3573,4 +3586,60 @@ PHP_METHOD(Redis, georadiusbymember) { REDIS_PROCESS_CMD(georadiusbymember, redis_read_variant_reply); } +/* + * Streams + */ + +PHP_METHOD(Redis, xack) { + REDIS_PROCESS_CMD(xack, redis_long_response); +} + +PHP_METHOD(Redis, xadd) { + REDIS_PROCESS_CMD(xadd, redis_single_line_reply); +} + +PHP_METHOD(Redis, xclaim) { + REDIS_PROCESS_CMD(xclaim, redis_xclaim_reply); +} + +PHP_METHOD(Redis, xdel) { + REDIS_PROCESS_KW_CMD("XDEL", redis_key_str_arr_cmd, redis_long_response); +} + +PHP_METHOD(Redis, xgroup) { + REDIS_PROCESS_CMD(xgroup, redis_read_variant_reply); +} + +PHP_METHOD(Redis, xinfo) { + REDIS_PROCESS_CMD(xinfo, redis_read_variant_reply); +} + +PHP_METHOD(Redis, xlen) { + REDIS_PROCESS_KW_CMD("XLEN", redis_key_cmd, redis_long_response); +} + +PHP_METHOD(Redis, xpending) { + REDIS_PROCESS_CMD(xpending, redis_read_variant_reply_strings); +} + +PHP_METHOD(Redis, xrange) { + REDIS_PROCESS_KW_CMD("XRANGE", redis_xrange_cmd, redis_xrange_reply); +} + +PHP_METHOD(Redis, xread) { + REDIS_PROCESS_CMD(xread, redis_xread_reply); +} + +PHP_METHOD(Redis, xreadgroup) { + REDIS_PROCESS_CMD(xreadgroup, redis_xread_reply); +} + +PHP_METHOD(Redis, xrevrange) { + REDIS_PROCESS_KW_CMD("XREVRANGE", redis_xrange_cmd, redis_xrange_reply); +} + +PHP_METHOD(Redis, xtrim) { + REDIS_PROCESS_CMD(xtrim, redis_long_response); +} + /* vim: set tabstop=4 softtabstop=4 expandtab shiftwidth=4: */ diff --git a/redis_array.c b/redis_array.c index 9f00a74c..217ab63e 100644 --- a/redis_array.c +++ b/redis_array.c @@ -735,7 +735,6 @@ PHP_METHOD(RedisArray, keys) RedisArray *ra; char *pattern; strlen_t pattern_len; - int i; /* Make sure the prototype is correct */ if(zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os", @@ -764,7 +763,6 @@ PHP_METHOD(RedisArray, keys) PHP_METHOD(RedisArray, getOption) { zval *object, z_fun, z_args[1]; - int i; RedisArray *ra; zend_long opt; @@ -791,7 +789,6 @@ PHP_METHOD(RedisArray, getOption) PHP_METHOD(RedisArray, setOption) { zval *object, z_fun, z_args[2]; - int i; RedisArray *ra; zend_long opt; char *val_str; @@ -822,7 +819,6 @@ PHP_METHOD(RedisArray, setOption) PHP_METHOD(RedisArray, select) { zval *object, z_fun, z_args[1]; - int i; RedisArray *ra; zend_long opt; diff --git a/redis_cluster.c b/redis_cluster.c index 4271b9d9..130b961a 100644 --- a/redis_cluster.c +++ b/redis_cluster.c @@ -252,6 +252,19 @@ zend_function_entry redis_cluster_functions[] = { PHP_ME(RedisCluster, unlink, arginfo_del, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, unwatch, arginfo_void, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, watch, arginfo_watch, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xack, arginfo_xack, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xadd, arginfo_xadd, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xclaim, arginfo_xclaim, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xdel, arginfo_xdel, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xgroup, arginfo_xgroup, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xinfo, arginfo_xinfo, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xlen, arginfo_key, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xpending, arginfo_xpending, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xrange, arginfo_xrange, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xread, arginfo_xread, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xreadgroup, arginfo_xreadgroup, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xrevrange, arginfo_xrange, ZEND_ACC_PUBLIC) + PHP_ME(RedisCluster, xtrim, arginfo_xtrim, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, zadd, arginfo_zadd, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, zcard, arginfo_key, ZEND_ACC_PUBLIC) PHP_ME(RedisCluster, zcount, arginfo_key_min_max, ZEND_ACC_PUBLIC) @@ -1101,7 +1114,7 @@ PHP_METHOD(RedisCluster, keys) { } /* Ensure we can get a response */ - resp = cluster_read_resp(c TSRMLS_CC); + resp = cluster_read_resp(c, 0 TSRMLS_CC); if (!resp) { php_error_docref(0 TSRMLS_CC, E_WARNING, "Can't read response from %s:%d", ZSTR_VAL(node->sock->host), @@ -1305,7 +1318,7 @@ PHP_METHOD(RedisCluster, sadd) { /* {{{ proto long RedisCluster::saddarray(string key, array values) */ PHP_METHOD(RedisCluster, saddarray) { - CLUSTER_PROCESS_KW_CMD("SADD", redis_key_arr_cmd, cluster_long_resp, 0); + CLUSTER_PROCESS_KW_CMD("SADD", redis_key_val_arr_cmd, cluster_long_resp, 0); } /* }}} */ @@ -2963,6 +2976,65 @@ PHP_METHOD(RedisCluster, ping) { } /* }}} */ +/* {{{ proto long RedisCluster::xack(string key, string group, array ids) }}} */ +PHP_METHOD(RedisCluster, xack) { + CLUSTER_PROCESS_CMD(xack, cluster_long_resp, 0); +} + +/* {{{ proto string RedisCluster::xadd(string key, string id, array field_values) }}} */ +PHP_METHOD(RedisCluster, xadd) { + CLUSTER_PROCESS_CMD(xadd, cluster_single_line_resp, 0); +} + +/* {{{ proto array RedisCluster::xclaim(string key, string group, string consumer, + * long min_idle_time, array ids, array options) */ +PHP_METHOD(RedisCluster, xclaim) { + CLUSTER_PROCESS_CMD(xclaim, cluster_xclaim_resp, 0); +} + +PHP_METHOD(RedisCluster, xdel) { + CLUSTER_PROCESS_KW_CMD("XDEL", redis_key_str_arr_cmd, cluster_long_resp, 0); +} + +/* {{{ proto variant RedisCluster::xgroup(string op, [string key, string arg1, string arg2]) }}} */ +PHP_METHOD(RedisCluster, xgroup) { + CLUSTER_PROCESS_CMD(xgroup, cluster_variant_resp, 0); +} + +/* {{{ proto variant RedisCluster::xinfo(string op, [string arg1, string arg2]); */ +PHP_METHOD(RedisCluster, xinfo) { + CLUSTER_PROCESS_CMD(xinfo, cluster_variant_resp, 0); +} + +/* {{{ proto string RedisCluster::xlen(string key) }}} */ +PHP_METHOD(RedisCluster, xlen) { + CLUSTER_PROCESS_KW_CMD("XLEN", redis_key_cmd, cluster_long_resp, 1); +} + +PHP_METHOD(RedisCluster, xpending) { + CLUSTER_PROCESS_CMD(xpending, cluster_variant_resp_strings, 1); +} + +PHP_METHOD(RedisCluster, xrange) { + CLUSTER_PROCESS_KW_CMD("XRANGE", redis_xrange_cmd, cluster_xrange_resp, 1); +} + +PHP_METHOD(RedisCluster, xrevrange) { + CLUSTER_PROCESS_KW_CMD("XREVRANGE", redis_xrange_cmd, cluster_xrange_resp, 1); +} + +PHP_METHOD(RedisCluster, xread) { + CLUSTER_PROCESS_CMD(xread, cluster_xread_resp, 1); +} + +PHP_METHOD(RedisCluster, xreadgroup) { + CLUSTER_PROCESS_CMD(xreadgroup, cluster_xread_resp, 0); +} + +PHP_METHOD(RedisCluster, xtrim) { + CLUSTER_PROCESS_CMD(xtrim, cluster_long_resp, 0); +} + /* {{{ proto string RedisCluster::echo(string key, string msg) * proto string RedisCluster::echo(array host_port, string msg) */ PHP_METHOD(RedisCluster, echo) { diff --git a/redis_cluster.h b/redis_cluster.h index 74903e67..b4d0a8ba 100644 --- a/redis_cluster.h +++ b/redis_cluster.h @@ -263,6 +263,21 @@ PHP_METHOD(RedisCluster, zscan); PHP_METHOD(RedisCluster, hscan); PHP_METHOD(RedisCluster, sscan); +/* STREAMS */ +PHP_METHOD(RedisCluster, xack); +PHP_METHOD(RedisCluster, xadd); +PHP_METHOD(RedisCluster, xclaim); +PHP_METHOD(RedisCluster, xdel); +PHP_METHOD(RedisCluster, xgroup); +PHP_METHOD(RedisCluster, xinfo); +PHP_METHOD(RedisCluster, xlen); +PHP_METHOD(RedisCluster, xpending); +PHP_METHOD(RedisCluster, xrange); +PHP_METHOD(RedisCluster, xread); +PHP_METHOD(RedisCluster, xreadgroup); +PHP_METHOD(RedisCluster, xrevrange); +PHP_METHOD(RedisCluster, xtrim); + /* Transactions */ PHP_METHOD(RedisCluster, multi); PHP_METHOD(RedisCluster, exec); diff --git a/redis_commands.c b/redis_commands.c index c47669ad..e9d26c3d 100644 --- a/redis_commands.c +++ b/redis_commands.c @@ -1050,13 +1050,16 @@ int redis_key_varval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, } /* Commands that take a key and then an array of values */ -int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, - char *kw, char **cmd, int *cmd_len, short *slot, - void **ctx) +#define VAL_TYPE_VALUES 1 +#define VAL_TYPE_STRINGS 2 +static int gen_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, int valtype, char **cmd, int *cmd_len, + short *slot, void **ctx) { zval *z_arr, *z_val; HashTable *ht_arr; smart_string cmdstr = {0}; + zend_string *zstr; int key_free, val_free, argc = 1; strlen_t val_len, key_len; char *key, *val; @@ -1080,10 +1083,17 @@ int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, if (key_free) efree(key); /* Iterate our hash table, serializing and appending values */ + assert(valtype == VAL_TYPE_VALUES || valtype == VAL_TYPE_STRINGS); ZEND_HASH_FOREACH_VAL(ht_arr, z_val) { - val_free = redis_pack(redis_sock, z_val, &val, &val_len TSRMLS_CC); - redis_cmd_append_sstr(&cmdstr, val, val_len); - if (val_free) efree(val); + if (valtype == VAL_TYPE_VALUES) { + val_free = redis_pack(redis_sock, z_val, &val, &val_len TSRMLS_CC); + redis_cmd_append_sstr(&cmdstr, val, val_len); + if (val_free) efree(val); + } else { + zstr = zval_get_string(z_val); + redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(zstr), ZSTR_LEN(zstr)); + zend_string_release(zstr); + } } ZEND_HASH_FOREACH_END(); *cmd_len = cmdstr.len; @@ -1092,6 +1102,22 @@ int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, return SUCCESS; } +int redis_key_val_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, + void **ctx) +{ + return gen_key_arr_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, kw, + VAL_TYPE_VALUES, cmd, cmd_len, slot, ctx); +} + +int redis_key_str_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, + void **ctx) +{ + return gen_key_arr_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, kw, + VAL_TYPE_STRINGS, cmd, cmd_len, slot, ctx); +} + /* Generic function that takes a variable number of keys, with an optional * timeout value. This can handle various SUNION/SUNIONSTORE/BRPOP type * commands. */ @@ -3147,6 +3173,611 @@ int redis_command_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, return SUCCESS; } +/* XADD */ +int redis_xadd_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + zend_string *arrkey; + zval *z_fields, *value; + zend_long maxlen = 0; + zend_bool approx = 0; + ulong idx; + HashTable *ht_fields; + int fcount, argc; + char *key, *id; + strlen_t keylen, idlen; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa|lb", &key, &keylen, + &id, &idlen, &z_fields, &maxlen, &approx) == FAILURE) + { + return FAILURE; + } + + /* At least one field and string are required */ + ht_fields = Z_ARRVAL_P(z_fields); + if ((fcount = zend_hash_num_elements(ht_fields)) == 0) { + return FAILURE; + } + + if (maxlen < 0 || (maxlen == 0 && approx != 0)) { + php_error_docref(NULL TSRMLS_CC, E_WARNING, + "Warning: Invalid MAXLEN argument or approximate flag"); + } + + + /* Calculate argc for XADD. It's a bit complex because we've got + * an optional MAXLEN argument which can either take the form MAXLEN N + * or MAXLEN ~ N */ + argc = 2 + (fcount*2) + (maxlen > 0 ? (approx ? 3 : 2) : 0); + + /* XADD key ID field string [field string ...] */ + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XADD"); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + + /* Now append our MAXLEN bits if we've got them */ + if (maxlen > 0) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "MAXLEN"); + REDIS_CMD_APPEND_SSTR_OPT_STATIC(&cmdstr, approx, "~"); + redis_cmd_append_sstr_long(&cmdstr, maxlen); + } + + /* Now append ID and field(s) */ + redis_cmd_append_sstr(&cmdstr, id, idlen); + ZEND_HASH_FOREACH_KEY_VAL(ht_fields, idx, arrkey, value) { + redis_cmd_append_sstr_arrkey(&cmdstr, arrkey, idx); + redis_cmd_append_sstr_zval(&cmdstr, value, redis_sock TSRMLS_CC); + } ZEND_HASH_FOREACH_END(); + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +// XPENDING key group [start end count] [consumer] +int redis_xpending_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + char *key, *group, *start = NULL, *end = NULL, *consumer = NULL; + strlen_t keylen, grouplen, startlen, endlen, consumerlen; + int argc; + zend_long count = -1; + + // XPENDING mystream group55 - + 10 consumer-123 + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|ssls", &key, + &keylen, &group, &grouplen, &start, &startlen, + &end, &endlen, &count, &consumer, &consumerlen) + == FAILURE) + { + return FAILURE; + } + + /* If we've been passed a start argument, we also need end and count */ + if (start != NULL && (end == NULL || count < 0)) { + return FAILURE; + } + + /* Calculate argc. It's either 2, 5, or 6 */ + argc = 2 + (start != NULL ? 3 + (consumer != NULL) : 0); + + /* Construct command and add required arguments */ + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XPENDING"); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + redis_cmd_append_sstr(&cmdstr, group, grouplen); + + /* Add optional argumentst */ + if (start) { + redis_cmd_append_sstr(&cmdstr, start, startlen); + redis_cmd_append_sstr(&cmdstr, end, endlen); + redis_cmd_append_sstr_long(&cmdstr, (long)count); + + /* Finally add consumer if we have it */ + if (consumer) redis_cmd_append_sstr(&cmdstr, consumer, consumerlen); + } + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* X[REV]RANGE key start end [COUNT count] */ +int redis_xrange_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, + void **ctx) +{ + smart_string cmdstr = {0}; + char *key, *start, *end; + strlen_t keylen, startlen, endlen; + zend_long count = -1; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sss|l", &key, &keylen, + &start, &startlen, &end, &endlen, &count) + == FAILURE) + { + return FAILURE; + } + + redis_cmd_init_sstr(&cmdstr, 3 + (2 * (count > -1)), kw, strlen(kw)); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + redis_cmd_append_sstr(&cmdstr, start, startlen); + redis_cmd_append_sstr(&cmdstr, end, endlen); + + if (count > -1) { + redis_cmd_append_sstr(&cmdstr, "COUNT", sizeof("COUNT")-1); + redis_cmd_append_sstr_long(&cmdstr, count); + } + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* Helper function to take an associative array and append the Redis + * STREAMS stream [stream...] id [id ...] arguments to a command string. */ +static int +append_stream_args(smart_string *cmdstr, HashTable *ht, RedisSock *redis_sock, + short *slot TSRMLS_DC) +{ + char *kptr, kbuf[40]; + int klen, i, pos = 0; + zend_string *key, *idstr; + short oldslot; + zval **id; + ulong idx; + + /* Append STREAM qualifier */ + REDIS_CMD_APPEND_SSTR_STATIC(cmdstr, "STREAMS"); + + /* Sentinel slot */ + if (slot) oldslot = -1; + + /* Allocate memory to keep IDs */ + id = emalloc(sizeof(*id) * zend_hash_num_elements(ht)); + + /* Iterate over our stream => id array appending streams and retaining each + * value for final arguments */ + ZEND_HASH_FOREACH_KEY_VAL(ht, idx, key, id[pos++]) { + if (key) { + klen = ZSTR_LEN(key); + kptr = ZSTR_VAL(key); + } else { + klen = snprintf(kbuf, sizeof(kbuf), "%ld", (long)idx); + kptr = (char*)kbuf; + } + + /* Append stream key */ + redis_cmd_append_sstr_key(cmdstr, kptr, klen, redis_sock, slot); + + /* Protect the user against CROSSSLOT to avoid confusion */ + if (slot) { + if (oldslot != -1 && *slot != oldslot) { + php_error_docref(NULL TSRMLS_CC, E_WARNING, + "Warning, not all keys hash to the same slot!"); + efree(id); + return FAILURE; + } + oldslot = *slot; + } + } ZEND_HASH_FOREACH_END(); + + /* Add our IDs */ + for (i = 0; i < pos; i++) { + idstr = zval_get_string(id[i]); + redis_cmd_append_sstr(cmdstr, ZSTR_VAL(idstr), ZSTR_LEN(idstr)); + zend_string_release(idstr); + } + + /* Clean up ID container array */ + efree(id); + + return 0; +} + +/* XREAD [COUNT count] [BLOCK ms] STREAMS key [key ...] id [id ...] */ +int redis_xread_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + zend_long count = -1, block = -1; + zval *z_streams; + int argc, scount; + HashTable *kt; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a|ll", &z_streams, + &count, &block) == FAILURE) + { + return FAILURE; + } + + /* At least one stream and ID is required */ + kt = Z_ARRVAL_P(z_streams); + if ((scount = zend_hash_num_elements(kt)) < 1) { + return FAILURE; + } + + /* Calculate argc and start constructing command */ + argc = 1 + (2 * scount) + (2 * (count > -1)) + (2 * (block > -1)); + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XREAD"); + + /* Append COUNT if we have it */ + if (count > -1) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "COUNT"); + redis_cmd_append_sstr_long(&cmdstr, count); + } + + /* Append BLOCK if we have it */ + if (block > -1) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "BLOCK"); + redis_cmd_append_sstr_long(&cmdstr, block); + } + + /* Append final STREAM key [key ...] id [id ...] arguments */ + if (append_stream_args(&cmdstr, kt, redis_sock, slot TSRMLS_CC) < 0) { + efree(cmdstr.c); + return FAILURE; + } + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* XREADGROUP GROUP group consumer [COUNT count] [BLOCK ms] + * STREAMS key [key ...] id [id ...] */ +int redis_xreadgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + zval *z_streams; + HashTable *kt; + char *group, *consumer; + strlen_t grouplen, consumerlen; + int scount, argc; + zend_long count = -1, block = -1; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa|ll", &group, + &grouplen, &consumer, &consumerlen, &z_streams, + &count, &block) == FAILURE) + { + return FAILURE; + } + + /* Redis requires at least one stream */ + kt = Z_ARRVAL_P(z_streams); + if ((scount = zend_hash_num_elements(kt)) < 1) { + return FAILURE; + } + + /* Calculate argc and start constructing commands */ + argc = 4 + (2 * scount) + (2 * (count > -1)) + (2 * (block > -1)); + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XREADGROUP"); + + /* Group and consumer */ + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "GROUP"); + redis_cmd_append_sstr(&cmdstr, group, grouplen); + redis_cmd_append_sstr(&cmdstr, consumer, consumerlen); + + /* Append COUNT if we have it */ + if (count > -1) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "COUNT"); + redis_cmd_append_sstr_long(&cmdstr, count); + } + + /* Append BLOCK argument if we have it */ + if (block > -1) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "BLOCK"); + redis_cmd_append_sstr_long(&cmdstr, block); + } + + /* Finally append stream and id args */ + if (append_stream_args(&cmdstr, kt, redis_sock, slot TSRMLS_CC) < 0) { + efree(cmdstr.c); + return FAILURE; + } + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* XACK key group id [id ...] */ +int redis_xack_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + char *key, *group; + strlen_t keylen, grouplen; + zend_string *idstr; + zval *z_ids, *z_id; + HashTable *ht_ids; + int idcount; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa", &key, &keylen, + &group, &grouplen, &z_ids) == FAILURE) + { + return FAILURE; + } + + ht_ids = Z_ARRVAL_P(z_ids); + if ((idcount = zend_hash_num_elements(ht_ids)) < 1) { + return FAILURE; + } + + /* Create command and add initial arguments */ + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, 2 + idcount, "XACK"); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + redis_cmd_append_sstr(&cmdstr, group, grouplen); + + /* Append IDs */ + ZEND_HASH_FOREACH_VAL(ht_ids, z_id) { + idstr = zval_get_string(z_id); + redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(idstr), ZSTR_LEN(idstr)); + zend_string_release(idstr); + } ZEND_HASH_FOREACH_END(); + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* XCLAIM options container */ +typedef struct xclaimOptions { + struct { + char *type; + zend_long time; + } idle; + zend_long retrycount; + int force; + int justid; +} xclaimOptions; + +/* Helper to extract XCLAIM options */ +static void get_xclaim_options(zval *z_arr, xclaimOptions *opt TSRMLS_DC) { + HashTable *ht; + zend_string *zkey; + char *kval; + strlen_t klen; + ulong idx; + zval *zv; + + PHPREDIS_NOTUSED(idx); + + /* Initialize options array to sane defaults */ + memset(opt, 0, sizeof(*opt)); + opt->retrycount = -1; + opt->idle.time = -1; + + /* Early return if we don't have any options */ + if (z_arr == NULL) + return; + + /* Iterate over our options array */ + ht = Z_ARRVAL_P(z_arr); + ZEND_HASH_FOREACH_KEY_VAL(ht, idx, zkey, zv) { + if (zkey) { + /* Every key => value xclaim option requires a long and Redis + * treats -1 as not being passed so skip negative values too. */ + if (Z_TYPE_P(zv) != IS_LONG || Z_LVAL_P(zv) < 0) + continue; + + kval = ZSTR_VAL(zkey); + klen = ZSTR_LEN(zkey); + if (klen == 4) { + if (!strncasecmp(kval, "TIME", 4)) { + opt->idle.type = "TIME"; + opt->idle.time = Z_LVAL_P(zv); + } else if (!strncasecmp(kval, "IDLE", 4)) { + opt->idle.type = "IDLE"; + opt->idle.time = Z_LVAL_P(zv); + } + } else if (klen == 10 && !strncasecmp(kval, "RETRYCOUNT", 10)) { + opt->retrycount = Z_LVAL_P(zv); + } + } else { + if (Z_TYPE_P(zv) == IS_STRING) { + kval = Z_STRVAL_P(zv); + klen = Z_STRLEN_P(zv); + if (klen == 5 && !strncasecmp(kval, "FORCE", 5)) { + opt->force = 1; + } else if (klen == 6 && !strncasecmp(kval, "JUSTID", 6)) { + opt->justid = 1; + } + } + } + } ZEND_HASH_FOREACH_END(); +} + +/* Count argc for any options we may have */ +static int xclaim_options_argc(xclaimOptions *opt) { + int argc = 0; + + if (opt->idle.type != NULL && opt->idle.time != -1) + argc += 2; + if (opt->retrycount != -1) + argc += 2; + if (opt->force) + argc++; + if (opt->justid) + argc++; + + return argc; +} + +/* Append XCLAIM options */ +static void append_xclaim_options(smart_string *cmd, xclaimOptions *opt) { + /* IDLE/TIME long */ + if (opt->idle.type != NULL && opt->idle.time != -1) { + redis_cmd_append_sstr(cmd, opt->idle.type, strlen(opt->idle.type)); + redis_cmd_append_sstr_long(cmd, opt->idle.time); + } + + /* RETRYCOUNT */ + if (opt->retrycount != -1) { + REDIS_CMD_APPEND_SSTR_STATIC(cmd, "RETRYCOUNT"); + redis_cmd_append_sstr_long(cmd, opt->retrycount); + } + + /* FORCE and JUSTID */ + if (opt->force) + REDIS_CMD_APPEND_SSTR_STATIC(cmd, "FORCE"); + if (opt->justid) + REDIS_CMD_APPEND_SSTR_STATIC(cmd, "JUSTID"); +} + +/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> + [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>] + [FORCE] [JUSTID] */ +int redis_xclaim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + char *key, *group, *consumer; + strlen_t keylen, grouplen, consumerlen; + zend_long min_idle; + int argc, id_count; + zval *z_ids, *z_id, *z_opts = NULL; + zend_string *zstr; + HashTable *ht_ids; + xclaimOptions opts; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sssla|a", &key, &keylen, + &group, &grouplen, &consumer, &consumerlen, &min_idle, + &z_ids, &z_opts) == FAILURE) + { + return FAILURE; + } + + /* At least one id is required */ + ht_ids = Z_ARRVAL_P(z_ids); + if ((id_count = zend_hash_num_elements(ht_ids)) < 1) { + return FAILURE; + } + + /* Extract options array if we've got them */ + get_xclaim_options(z_opts, &opts TSRMLS_CC); + + /* Now we have enough information to calculate argc */ + argc = 4 + id_count + xclaim_options_argc(&opts); + + /* Start constructing our command */ + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XCLAIM"); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + redis_cmd_append_sstr(&cmdstr, group, grouplen); + redis_cmd_append_sstr(&cmdstr, consumer, consumerlen); + redis_cmd_append_sstr_long(&cmdstr, min_idle); + + /* Add IDs */ + ZEND_HASH_FOREACH_VAL(ht_ids, z_id) { + zstr = zval_get_string(z_id); + redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(zstr), ZSTR_LEN(zstr)); + zend_string_release(zstr); + } ZEND_HASH_FOREACH_END(); + + /* Finally add our options */ + append_xclaim_options(&cmdstr, &opts); + + /* Success */ + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* XGROUP HELP + * XGROUP SETID key group id + * XGROUP DELGROUP key groupname + * XGROUP CREATE key groupname id + * XGROUP DELCONSUMER key groupname consumername */ +int redis_xgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + char *op, *key = NULL, *arg1 = NULL, *arg2 = NULL; + strlen_t oplen, keylen, arg1len, arg2len; + int argc = ZEND_NUM_ARGS(); + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|ssss", &op, &oplen, + &key, &keylen, &arg1, &arg1len, &arg2, &arg2len) + == FAILURE) + { + return FAILURE; + } + + if (argc == 1 && oplen == 4 && !strncasecmp(op, "HELP", 4)) { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "s", "HELP", 4); + return SUCCESS; + } else if (argc == 4 && ((oplen == 5 && !strncasecmp(op, "SETID", 5)) || + (oplen == 6 && !strncasecmp(op, "CREATE", 6)) || + (oplen == 11 && !strncasecmp(op, "DELCONSUMER", 11)))) + { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "skss", op, oplen, key, keylen, + arg1, arg1len, arg2, arg2len); + return SUCCESS; + } else if (argc == 3 && ((oplen == 7 && !strncasecmp(op, "DELGROUP", 7)))) { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "sks", op, oplen, key, + keylen, arg1, arg1len); + return SUCCESS; + } + + /* Didn't detect any valid XGROUP command pattern */ + return FAILURE; +} + + + +/* XINFO CONSUMERS key group + * XINFO GROUPS key + * XINFO STREAM key + * XINFO HELP */ +int redis_xinfo_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + char *op, *key, *arg; + strlen_t oplen, keylen, arglen; + char fmt[4]; + int argc = ZEND_NUM_ARGS(); + + if (argc > 3 || zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|ss", + &op, &oplen, &key, &keylen, &arg, + &arglen) == FAILURE) + { + return FAILURE; + } + + /* Our format is simply "s", "sk" or "sks" depending on argc */ + memcpy(fmt, "sks", sizeof("sks")-1); + fmt[argc] = '\0'; + + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XINFO", fmt, op, oplen, key, keylen, + arg, arglen); + return SUCCESS; +} + +/* XTRIM MAXLEN [~] count */ +int redis_xtrim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + char *key; + strlen_t keylen; + zend_long maxlen; + zend_bool approx = 0; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sl|b", &key, &keylen, + &maxlen, &approx) == FAILURE) + { + return FAILURE; + } + + if (approx) { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XTRIM", "kssl", key, keylen, + "MAXLEN", 6, "~", 1, maxlen); + } else { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XTRIM", "ksl", key, keylen, + "MAXLEN", 6, maxlen); + } + + return SUCCESS; +} + /* * Redis commands that don't deal with the server at all. The RedisSock* * pointer is the only thing retreived differently, so we just take that diff --git a/redis_commands.h b/redis_commands.h index 413b868e..2f3e53de 100644 --- a/redis_commands.h +++ b/redis_commands.h @@ -72,7 +72,10 @@ int redis_key_dbl_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, int redis_key_varval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char *kw, char **cmd, int *cmd_len, short *slot, void **ctx); -int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, +int redis_key_val_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, void **ctx); + +int redis_key_str_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char *kw, char **cmd, int *cmd_len, short *slot, void **ctx); /* Construct SCAN and similar commands, as well as check iterator */ @@ -112,6 +115,9 @@ int redis_eval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, int redis_flush_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char *kw, char **cmd, int *cmd_len, short *slot, void **ctx); +int redis_xrange_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, void **ctx); + /* Commands which need a unique construction mechanism. This is either because * they don't share a signature with any other command, or because there is * specific processing we do (e.g. verifying subarguments) that make them @@ -256,6 +262,33 @@ int redis_georadiusbymember_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_s int redis_migrate_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, char **cmd, int *cmd_len, short *slot, void **ctx); +int redis_xadd_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + +int redis_xclaim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + +int redis_xpending_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + +int redis_xack_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + +int redis_xgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + +int redis_xinfo_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + +int redis_xread_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + +int redis_xreadgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + +int redis_xtrim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx); + /* Commands that don't communicate with Redis at all (such as getOption, * setOption, _prefix, _serialize, etc). These can be handled in one place * with the method of grabbing our RedisSock* object in different ways diff --git a/redis_session.c b/redis_session.c index 08022cb1..7f1a2656 100644 --- a/redis_session.c +++ b/redis_session.c @@ -1061,7 +1061,7 @@ PS_READ_FUNC(rediscluster) { redisCluster *c = PS_GET_MOD_DATA(); clusterReply *reply; char *cmd, *skey; - int cmdlen, skeylen; + int cmdlen, skeylen, free_flag; short slot; /* Set up our command and slot information */ @@ -1084,7 +1084,7 @@ PS_READ_FUNC(rediscluster) { efree(cmd); /* Attempt to read reply */ - reply = cluster_read_resp(c TSRMLS_CC); + reply = cluster_read_resp(c, 0 TSRMLS_CC); if (!reply || c->err) { if (reply) cluster_free_reply(reply, 1); return FAILURE; @@ -1099,16 +1099,20 @@ PS_READ_FUNC(rediscluster) { *val = reply->str; *vallen = reply->len; } + + free_flag = 0; #else if (reply->str == NULL) { *val = ZSTR_EMPTY_ALLOC(); } else { *val = zend_string_init(reply->str, reply->len, 0); } + + free_flag = 1; #endif /* Clean up */ - cluster_free_reply(reply, 0); + cluster_free_reply(reply, free_flag); /* Success! */ return SUCCESS; @@ -1148,7 +1152,7 @@ PS_WRITE_FUNC(rediscluster) { efree(cmd); /* Attempt to read reply */ - reply = cluster_read_resp(c TSRMLS_CC); + reply = cluster_read_resp(c, 0 TSRMLS_CC); if (!reply || c->err) { if (reply) cluster_free_reply(reply, 1); return FAILURE; @@ -1188,7 +1192,7 @@ PS_DESTROY_FUNC(rediscluster) { efree(cmd); /* Attempt to read reply */ - reply = cluster_read_resp(c TSRMLS_CC); + reply = cluster_read_resp(c, 0 TSRMLS_CC); if (!reply || c->err) { if (reply) cluster_free_reply(reply, 1); return FAILURE; diff --git a/tests/RedisTest.php b/tests/RedisTest.php index b5da01e8..df650ee5 100644 --- a/tests/RedisTest.php +++ b/tests/RedisTest.php @@ -16,6 +16,11 @@ class Redis_Test extends TestSuite 'Cupertino' => Array(-122.032182, 37.322998) ); + protected $serializers = Array( + Redis::SERIALIZER_NONE, + Redis::SERIALIZER_PHP, + ); + /** * @var Redis */ @@ -35,12 +40,20 @@ class Redis_Test extends TestSuite $this->redis = $this->newInstance(); $info = $this->redis->info(); $this->version = (isset($info['redis_version'])?$info['redis_version']:'0.0.0'); + + if (defined('Redis::SERIALIZER_IGBINARY')) { + $this->serializers[] = Redis::SERIALIZER_IGBINARY; + } } protected function minVersionCheck($version) { return version_compare($this->version, $version, "ge"); } + protected function mstime() { + return round(microtime(true)*1000); + } + protected function newInstance() { $r = new Redis(); @@ -5223,6 +5236,437 @@ class Redis_Test extends TestSuite $this->assertEquals($this->redis->lrange('mylist', 0, -1), Array('A','B','C','D')); } + /* STREAMS */ + + protected function addStreamEntries($key, $count) { + $ids = Array(); + + $this->redis->del($key); + + for ($i = 0; $i < $count; $i++) { + $ids[] = $this->redis->xAdd($key, '*', Array('field' => "value:$i")); + } + + return $ids; + } + + protected function addStreamsAndGroups($arr_streams, $count, $arr_groups) { + $ids = Array(); + + foreach ($arr_streams as $str_stream) { + $ids[$str_stream] = $this->addStreamEntries($str_stream, $count); + foreach ($arr_groups as $str_group => $str_id) { + $this->redis->xGroup('CREATE', $str_stream, $str_group, $str_id); + } + } + + return $ids; + } + + public function testXAdd() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + $this->redis->del('stream'); + for ($i = 0; $i < 5; $i++) { + $id = $this->redis->xAdd("stream", '*', Array('k1' => 'v1', 'k2' => 'v2')); + $this->assertEquals($this->redis->xLen('stream'), $i+1); + + /* Redis should return <timestamp>-<sequence> */ + $bits = explode('-', $id); + $this->assertEquals(count($bits), 2); + $this->assertTrue(is_numeric($bits[0])); + $this->assertTrue(is_numeric($bits[1])); + } + + /* Test an absolute maximum length */ + for ($i = 0; $i < 100; $i++) { + $this->redis->xAdd('stream', '*', Array('k' => 'v'), 10); + } + $this->assertEquals($this->redis->xLen('stream'), 10); + + /* Not the greatest test but I'm unsure if approximate trimming is + * totally deterministic, so just make sure we are able to add with + * an approximate maxlen argument structure */ + $id = $this->redis->xAdd('stream', '*', Array('k' => 'v'), 10, true); + $this->assertEquals(count(explode('-', $id)), 2); + + /* Empty message should fail */ + $this->redis->xAdd('stream', '*', Array()); + } + + protected function doXRangeTest($reverse) { + $key = '{stream}'; + + if ($reverse) { + list($cmd,$a1,$a2) = Array('xRevRange', '+', 0); + } else { + list($cmd,$a1,$a2) = Array('xRange', 0, '+'); + } + + $this->redis->del($key); + for ($i = 0; $i < 3; $i++) { + $msg = Array('field' => "value:$i"); + $id = $this->redis->xAdd($key, '*', $msg); + $rows[$id] = $msg; + } + + $messages = $this->redis->$cmd($key, $a1, $a2); + $this->assertEquals(count($messages), 3); + + $i = $reverse ? 2 : 0; + foreach ($messages as $seq => $v) { + $this->assertEquals(count(explode('-', $seq)), 2); + $this->assertEquals($v, Array('field' => "value:$i")); + $i += $reverse ? -1 : 1; + } + + /* Test COUNT option */ + for ($count = 1; $count <= 3; $count++) { + $messages = $this->redis->$cmd($key, $a1, $a2, $count); + $this->assertEquals(count($messages), $count); + } + } + + public function testXRange() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + foreach (Array(false, true) as $reverse) { + foreach ($this->serializers as $serializer) { + foreach (Array(NULL, 'prefix:') as $prefix) { + $this->redis->setOption(Redis::OPT_PREFIX, $prefix); + $this->redis->setOption(Redis::OPT_SERIALIZER, $serializer); + $this->doXRangeTest($reverse); + } + } + } + } + + protected function testXLen() { + if (!$this->minVersionCheck("5.0")) + $this->markTestSkipped(); + + $this->redis->del('{stream}'); + for ($i = 0; $i < 5; $i++) { + $this->redis->xadd('{stream}', '*', Array('foo' => 'bar')); + $this->assertEquals($this->redis->xLen('{stream}'), $i+1); + } + } + + public function testXGroup() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + $this->addStreamEntries('s', 2); + + /* CREATE */ + $this->assertTrue($this->redis->xGroup('CREATE', 's', 'mygroup', '$')); + $this->assertFalse($this->redis->xGroup('CREATE', 's', 'mygroup', 'BAD_ID')); + + /* BUSYGROUP */ + $this->redis->xGroup('CREATE', 's', 'mygroup', '$'); + $this->assertTrue(strpos($this->redis->getLastError(), 'BUSYGROUP') === 0); + + /* SETID */ + $this->assertTrue($this->redis->xGroup('SETID', 's', 'mygroup', '$')); + $this->assertFalse($this->redis->xGroup('SETID', 's', 'mygroup', 'BAD_ID')); + + $this->assertEquals($this->redis->xGroup('DELCONSUMER', 's', 'mygroup', 'myconsumer'),0); + + /* DELGROUP not yet implemented in Redis */ + } + + public function testXAck() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + for ($n = 1; $n <= 3; $n++) { + $this->addStreamsAndGroups(Array('{s}'), 3, Array('g1' => 0)); + $msg = $this->redis->xReadGroup('g1', 'c1', Array('{s}' => 0)); + + /* Extract IDs */ + $smsg = array_shift($msg); + $ids = array_keys($smsg); + + /* Now ACK $n messages */ + $ids = array_slice($ids, 0, $n); + $this->assertEquals($this->redis->xAck('{s}', 'g1', $ids), $n); + } + + /* Verify sending no IDs is a failure */ + $this->assertFalse($this->redis->xAck('{s}', 'g1', Array())); + } + + protected function doXReadTest() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + $row = Array('f1' => 'v1', 'f2' => 'v2'); + $msgdata = Array( + '{stream}-1' => $row, + '{stream}-2' => $row, + ); + + /* Append a bit of data and populate STREAM queries */ + $this->redis->del(array_keys($msgdata)); + foreach ($msgdata as $key => $message) { + for ($r = 0; $r < 2; $r++) { + $id = $this->redis->xAdd($key, '*', $message); + $qresult[$key][$id] = $message; + } + $qzero[$key] = 0; + $qnew[$key] = '$'; + $keys[] = $key; + } + + /* Everything from both streams */ + $rmsg = $this->redis->xRead($qzero); + $this->assertEquals($rmsg, $qresult); + + /* Test COUNT option */ + for ($count = 1; $count <= 2; $count++) { + $rmsg = $this->redis->xRead($qzero, $count); + foreach ($keys as $key) { + $this->assertEquals(count($rmsg[$key]), $count); + } + } + + /* Should be empty (no new entries) */ + $this->assertEquals(count($this->redis->xRead($qnew)),0); + + /* Test against a specific ID */ + $id = $this->redis->xAdd('{stream}-1', '*', $row); + $new_id = $this->redis->xAdd('{stream}-1', '*', Array('final' => 'row')); + $rmsg = $this->redis->xRead(Array('{stream}-1' => $id)); + $this->assertEquals( + $this->redis->xRead(Array('{stream}-1' => $id)), + Array('{stream}-1' => Array($new_id => Array('final' => 'row'))) + ); + + /* Emtpy query should fail */ + $this->assertFalse($this->redis->xRead(Array())); + } + + public function testXRead() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + foreach ($this->serializers as $serializer) { + $this->redis->setOption(Redis::OPT_SERIALIZER, $serializer); + $this->doXReadTest(); + } + + /* Don't need to test BLOCK multiple times */ + $m1 = round(microtime(true)*1000); + $this->redis->xRead(Array('somestream' => '$'), -1, 100); + $m2 = round(microtime(true)*1000); + $this->assertTrue($m2 - $m1 >= 100); + } + + protected function compareStreamIds($redis, $control) { + foreach ($control as $stream => $ids) { + $rcount = count($redis[$stream]); + $lcount = count($control[$stream]); + + /* We should have the same number of messages */ + $this->assertEquals($rcount, $lcount); + + /* We should have the exact same IDs */ + foreach ($ids as $k => $id) { + $this->assertTrue(isset($redis[$stream][$id])); + } + } + } + + public function testXReadGroup() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + /* Create some streams and groups */ + $streams = Array('{s}-1', '{s}-2'); + $qstreams = Array('{s}-1' => 0, '{s}-2' => 0); + $groups = Array('g1' => 0, 'g2' => 0); + + $ids = $this->addStreamsAndGroups($streams, 3, $groups); + + /* Test that we get get the IDs we should */ + foreach (Array('g1', 'g2') as $group) { + foreach ($ids as $stream => $messages) { + while ($ids[$stream]) { + /* Read more messages */ + $resp = $this->redis->xReadGroup($group, 'consumer', $qstreams); + + /* They should match with our local control array */ + $this->compareStreamIds($resp, $ids); + + /* Remove a message from our control *and* XACK it in Redis */ + $id = array_shift($ids[$stream]); + $this->redis->xAck($stream, $group, Array($id)); + } + } + } + + /* Test COUNT option */ + for ($c = 1; $c <= 3; $c++) { + $this->addStreamsAndGroups($streams, 3, $groups); + $resp = $this->redis->xReadGroup('g1', 'consumer', $qstreams, $c); + + foreach ($resp as $stream => $smsg) { + $this->assertEquals(count($smsg), $c); + } + } + + /* Finally test BLOCK with a sloppy timing test */ + $t1 = $this->mstime(); + $qnew = Array('{s}-1' => '>', '{s}-2' => '>'); + $this->redis->xReadGroup('g1', 'c1', $qnew, -1, 100); + $t2 = $this->mstime(); + $this->assertTrue($t2 - $t1 >= 100); + } + + public function testXPending() { + if (!$this->minVersionCheck("5.0")) { + return $this->markTestSkipped(); + } + + $rows = 5; + $this->addStreamsAndGroups(Array('s'), $rows, Array('group' => 0)); + + $msg = $this->redis->xReadGroup('group', 'consumer', Array('s' => 0)); + $ids = array_keys($msg['s']); + + for ($n = count($ids); $n >= 0; $n--) { + $xp = $this->redis->xPending('s', 'group'); + $this->assertEquals($xp[0], count($ids)); + + /* Verify we're seeing the IDs themselves */ + for ($idx = 1; $idx <= 2; $idx++) { + if ($xp[$idx]) { + $this->assertPatternMatch($xp[$idx], "/^[0-9].*-[0-9].*/"); + } + } + + if ($ids) { + $id = array_shift($ids); + $this->redis->xAck('s', 'group', Array($id)); + } + } + } + + public function testXDel() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + for ($n = 5; $n > 0; $n--) { + $ids = $this->addStreamEntries('s', 5); + $todel = array_slice($ids, 0, $n); + $this->assertEquals($this->redis->xDel('s', $todel), count($todel)); + } + + /* Empty array should fail */ + $this->assertFalse($this->redis->xDel('s', Array())); + } + + public function testXTrim() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + for ($maxlen = 0; $maxlen <= 50; $maxlen += 10) { + $this->addStreamEntries('stream', 100); + $trimmed = $this->redis->xTrim('stream', $maxlen); + $this->assertEquals($trimmed, 100 - $maxlen); + } + + /* APPROX trimming isn't easily deterministic, so just make sure we + can call it with the flag */ + $this->addStreamEntries('stream', 100); + $this->assertFalse($this->redis->xTrim('stream', 1, true) === false); + } + + /* XCLAIM is one of the most complicated commands, with a great deal of different options + * The following test attempts to verify every combination of every possible option. */ + public function testXClaim() { + if (!$this->minVersionCheck("5.0")) + return $this->markTestSkipped(); + + foreach (Array(0, 100) as $min_idle_time) { + foreach (Array(false, true) as $justid) { + foreach (Array(0, 10) as $retrycount) { + /* We need to test not passing TIME/IDLE as well as passing either */ + if ($min_idle_time == 0) { + $topts = Array(Array(), Array('IDLE', 1000000), Array('TIME', time() * 1000)); + } else { + $topts = Array(NULL); + } + + foreach ($topts as $tinfo) { + if ($tinfo) { + list($ttype, $tvalue) = $tinfo; + } else { + $ttype = NULL; $tvalue = NULL; + } + + /* Add some messages and create a group */ + $this->addStreamsAndGroups(Array('s'), 10, Array('group1' => 0)); + + /* Create a second stream we can FORCE ownership on */ + $fids = $this->addStreamsAndGroups(Array('f'), 10, Array('group1' => 0)); + $fids = $fids['f']; + + /* Have consumer 'Mike' read the messages */ + $oids = $this->redis->xReadGroup('group1', 'Mike', Array('s' => 0)); + $oids = array_keys($oids['s']); /* We're only dealing with stream 's' */ + + /* Construct our options array */ + $opts = Array(); + if ($justid) $opts[] = 'JUSTID'; + if ($retrycount) $opts['RETRYCOUNT'] = $retrycount; + if ($tvalue !== NULL) $opts[$ttype] = $tvalue; + + /* Now have pavlo XCLAIM them */ + $cids = $this->redis->xClaim('s', 'group1', 'Pavlo', $min_idle_time, $oids, $opts); + if (!$justid) $cids = array_keys($cids); + + if ($min_idle_time == 0) { + $this->assertEquals($cids, $oids); + + /* Append the FORCE option to our second stream where we have not already + * assigned to a PEL group */ + $opts[] = 'FORCE'; + $freturn = $this->redis->xClaim('f', 'group1', 'Test', 0, $fids, $opts); + if (!$justid) $freturn = array_keys($freturn); + $this->assertEquals($freturn, $fids); + + if ($retrycount || $tvalue !== NULL) { + $pending = $this->redis->xPending('s', 'group1', 0, '+', 1, 'Pavlo'); + + if ($retrycount) { + $this->assertEquals($pending[0][3], $retrycount); + } + if ($tvalue !== NULL) { + if ($ttype == 'IDLE') { + /* If testing IDLE the value must be >= what we set */ + $this->assertTrue($pending[0][2] >= $tvalue); + } else { + /* Timing tests are notoriously irritating but I don't see + * how we'll get >= 20,000 ms between XCLAIM and XPENDING no + * matter how slow the machine/VM running the tests is */ + $this->assertTrue($pending[0][2] <= 20000); + } + } + } + } else { + /* We're verifying that we get no messages when we've set 100 seconds + * as our idle time, which should match nothing */ + $this->assertEquals($cids, Array()); + } + } + } + } + } + } + public function testSession_savedToRedis() { $this->setSessionHandler(); diff --git a/tests/TestSuite.php b/tests/TestSuite.php index 461ac7dc..c2d68259 100644 --- a/tests/TestSuite.php +++ b/tests/TestSuite.php @@ -65,7 +65,14 @@ class TestSuite { } protected function assertFalse($bool) { - return $this->assertTrue(!$bool); + if(!$bool) + return true; + + $bt = debug_backtrace(false); + self::$errors []= sprintf("Assertion failed: %s:%d (%s)\n", + $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]); + + return false; } protected function assertTrue($bool) { @@ -99,6 +106,15 @@ class TestSuite { $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]); } + protected function assertPatternMatch($str_test, $str_regex) { + if (preg_match($str_regex, $str_test)) + return; + + $bt = debug_backtrace(false); + self::$errors []= sprintf("Assertion failed ('%s' doesnt match '%s'): %s:%d (%s)\n", + $str_test, $str_regex, $bt[0]["file"], $bt[0]["line"], $bt[1]["function"]); + } + protected function markTestSkipped($msg='') { $bt = debug_backtrace(false); self::$warnings []= sprintf("Skipped test: %s:%d (%s) %s\n", |