Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Grunder <michael.grunder@gmail.com>2018-09-29 21:59:01 +0300
committerGitHub <noreply@github.com>2018-09-29 21:59:01 +0300
commit2c9e0572361d5131f24fbc81a8f7baaafb671994 (patch)
tree6982b1e1f17b7cf2fc7e024652fad8212edadacd /redis_commands.c
parentbfd274712eeb372926d1106b3da3c4fc19c0a48a (diff)
Streams (#1413)
Streams API
Diffstat (limited to 'redis_commands.c')
-rw-r--r--redis_commands.c643
1 files changed, 637 insertions, 6 deletions
diff --git a/redis_commands.c b/redis_commands.c
index c47669ad..e9d26c3d 100644
--- a/redis_commands.c
+++ b/redis_commands.c
@@ -1050,13 +1050,16 @@ int redis_key_varval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
}
/* Commands that take a key and then an array of values */
-int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
- char *kw, char **cmd, int *cmd_len, short *slot,
- void **ctx)
+#define VAL_TYPE_VALUES 1
+#define VAL_TYPE_STRINGS 2
+static int gen_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, int valtype, char **cmd, int *cmd_len,
+ short *slot, void **ctx)
{
zval *z_arr, *z_val;
HashTable *ht_arr;
smart_string cmdstr = {0};
+ zend_string *zstr;
int key_free, val_free, argc = 1;
strlen_t val_len, key_len;
char *key, *val;
@@ -1080,10 +1083,17 @@ int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
if (key_free) efree(key);
/* Iterate our hash table, serializing and appending values */
+ assert(valtype == VAL_TYPE_VALUES || valtype == VAL_TYPE_STRINGS);
ZEND_HASH_FOREACH_VAL(ht_arr, z_val) {
- val_free = redis_pack(redis_sock, z_val, &val, &val_len TSRMLS_CC);
- redis_cmd_append_sstr(&cmdstr, val, val_len);
- if (val_free) efree(val);
+ if (valtype == VAL_TYPE_VALUES) {
+ val_free = redis_pack(redis_sock, z_val, &val, &val_len TSRMLS_CC);
+ redis_cmd_append_sstr(&cmdstr, val, val_len);
+ if (val_free) efree(val);
+ } else {
+ zstr = zval_get_string(z_val);
+ redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(zstr), ZSTR_LEN(zstr));
+ zend_string_release(zstr);
+ }
} ZEND_HASH_FOREACH_END();
*cmd_len = cmdstr.len;
@@ -1092,6 +1102,22 @@ int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return SUCCESS;
}
+int redis_key_val_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot,
+ void **ctx)
+{
+ return gen_key_arr_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, kw,
+ VAL_TYPE_VALUES, cmd, cmd_len, slot, ctx);
+}
+
+int redis_key_str_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot,
+ void **ctx)
+{
+ return gen_key_arr_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, kw,
+ VAL_TYPE_STRINGS, cmd, cmd_len, slot, ctx);
+}
+
/* Generic function that takes a variable number of keys, with an optional
* timeout value. This can handle various SUNION/SUNIONSTORE/BRPOP type
* commands. */
@@ -3147,6 +3173,611 @@ int redis_command_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
return SUCCESS;
}
+/* XADD */
+int redis_xadd_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ zend_string *arrkey;
+ zval *z_fields, *value;
+ zend_long maxlen = 0;
+ zend_bool approx = 0;
+ ulong idx;
+ HashTable *ht_fields;
+ int fcount, argc;
+ char *key, *id;
+ strlen_t keylen, idlen;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa|lb", &key, &keylen,
+ &id, &idlen, &z_fields, &maxlen, &approx) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* At least one field and string are required */
+ ht_fields = Z_ARRVAL_P(z_fields);
+ if ((fcount = zend_hash_num_elements(ht_fields)) == 0) {
+ return FAILURE;
+ }
+
+ if (maxlen < 0 || (maxlen == 0 && approx != 0)) {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING,
+ "Warning: Invalid MAXLEN argument or approximate flag");
+ }
+
+
+ /* Calculate argc for XADD. It's a bit complex because we've got
+ * an optional MAXLEN argument which can either take the form MAXLEN N
+ * or MAXLEN ~ N */
+ argc = 2 + (fcount*2) + (maxlen > 0 ? (approx ? 3 : 2) : 0);
+
+ /* XADD key ID field string [field string ...] */
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XADD");
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+
+ /* Now append our MAXLEN bits if we've got them */
+ if (maxlen > 0) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "MAXLEN");
+ REDIS_CMD_APPEND_SSTR_OPT_STATIC(&cmdstr, approx, "~");
+ redis_cmd_append_sstr_long(&cmdstr, maxlen);
+ }
+
+ /* Now append ID and field(s) */
+ redis_cmd_append_sstr(&cmdstr, id, idlen);
+ ZEND_HASH_FOREACH_KEY_VAL(ht_fields, idx, arrkey, value) {
+ redis_cmd_append_sstr_arrkey(&cmdstr, arrkey, idx);
+ redis_cmd_append_sstr_zval(&cmdstr, value, redis_sock TSRMLS_CC);
+ } ZEND_HASH_FOREACH_END();
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+// XPENDING key group [start end count] [consumer]
+int redis_xpending_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ char *key, *group, *start = NULL, *end = NULL, *consumer = NULL;
+ strlen_t keylen, grouplen, startlen, endlen, consumerlen;
+ int argc;
+ zend_long count = -1;
+
+ // XPENDING mystream group55 - + 10 consumer-123
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|ssls", &key,
+ &keylen, &group, &grouplen, &start, &startlen,
+ &end, &endlen, &count, &consumer, &consumerlen)
+ == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* If we've been passed a start argument, we also need end and count */
+ if (start != NULL && (end == NULL || count < 0)) {
+ return FAILURE;
+ }
+
+ /* Calculate argc. It's either 2, 5, or 6 */
+ argc = 2 + (start != NULL ? 3 + (consumer != NULL) : 0);
+
+ /* Construct command and add required arguments */
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XPENDING");
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+ redis_cmd_append_sstr(&cmdstr, group, grouplen);
+
+ /* Add optional argumentst */
+ if (start) {
+ redis_cmd_append_sstr(&cmdstr, start, startlen);
+ redis_cmd_append_sstr(&cmdstr, end, endlen);
+ redis_cmd_append_sstr_long(&cmdstr, (long)count);
+
+ /* Finally add consumer if we have it */
+ if (consumer) redis_cmd_append_sstr(&cmdstr, consumer, consumerlen);
+ }
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* X[REV]RANGE key start end [COUNT count] */
+int redis_xrange_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char *kw, char **cmd, int *cmd_len, short *slot,
+ void **ctx)
+{
+ smart_string cmdstr = {0};
+ char *key, *start, *end;
+ strlen_t keylen, startlen, endlen;
+ zend_long count = -1;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sss|l", &key, &keylen,
+ &start, &startlen, &end, &endlen, &count)
+ == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ redis_cmd_init_sstr(&cmdstr, 3 + (2 * (count > -1)), kw, strlen(kw));
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+ redis_cmd_append_sstr(&cmdstr, start, startlen);
+ redis_cmd_append_sstr(&cmdstr, end, endlen);
+
+ if (count > -1) {
+ redis_cmd_append_sstr(&cmdstr, "COUNT", sizeof("COUNT")-1);
+ redis_cmd_append_sstr_long(&cmdstr, count);
+ }
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* Helper function to take an associative array and append the Redis
+ * STREAMS stream [stream...] id [id ...] arguments to a command string. */
+static int
+append_stream_args(smart_string *cmdstr, HashTable *ht, RedisSock *redis_sock,
+ short *slot TSRMLS_DC)
+{
+ char *kptr, kbuf[40];
+ int klen, i, pos = 0;
+ zend_string *key, *idstr;
+ short oldslot;
+ zval **id;
+ ulong idx;
+
+ /* Append STREAM qualifier */
+ REDIS_CMD_APPEND_SSTR_STATIC(cmdstr, "STREAMS");
+
+ /* Sentinel slot */
+ if (slot) oldslot = -1;
+
+ /* Allocate memory to keep IDs */
+ id = emalloc(sizeof(*id) * zend_hash_num_elements(ht));
+
+ /* Iterate over our stream => id array appending streams and retaining each
+ * value for final arguments */
+ ZEND_HASH_FOREACH_KEY_VAL(ht, idx, key, id[pos++]) {
+ if (key) {
+ klen = ZSTR_LEN(key);
+ kptr = ZSTR_VAL(key);
+ } else {
+ klen = snprintf(kbuf, sizeof(kbuf), "%ld", (long)idx);
+ kptr = (char*)kbuf;
+ }
+
+ /* Append stream key */
+ redis_cmd_append_sstr_key(cmdstr, kptr, klen, redis_sock, slot);
+
+ /* Protect the user against CROSSSLOT to avoid confusion */
+ if (slot) {
+ if (oldslot != -1 && *slot != oldslot) {
+ php_error_docref(NULL TSRMLS_CC, E_WARNING,
+ "Warning, not all keys hash to the same slot!");
+ efree(id);
+ return FAILURE;
+ }
+ oldslot = *slot;
+ }
+ } ZEND_HASH_FOREACH_END();
+
+ /* Add our IDs */
+ for (i = 0; i < pos; i++) {
+ idstr = zval_get_string(id[i]);
+ redis_cmd_append_sstr(cmdstr, ZSTR_VAL(idstr), ZSTR_LEN(idstr));
+ zend_string_release(idstr);
+ }
+
+ /* Clean up ID container array */
+ efree(id);
+
+ return 0;
+}
+
+/* XREAD [COUNT count] [BLOCK ms] STREAMS key [key ...] id [id ...] */
+int redis_xread_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ zend_long count = -1, block = -1;
+ zval *z_streams;
+ int argc, scount;
+ HashTable *kt;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a|ll", &z_streams,
+ &count, &block) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* At least one stream and ID is required */
+ kt = Z_ARRVAL_P(z_streams);
+ if ((scount = zend_hash_num_elements(kt)) < 1) {
+ return FAILURE;
+ }
+
+ /* Calculate argc and start constructing command */
+ argc = 1 + (2 * scount) + (2 * (count > -1)) + (2 * (block > -1));
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XREAD");
+
+ /* Append COUNT if we have it */
+ if (count > -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "COUNT");
+ redis_cmd_append_sstr_long(&cmdstr, count);
+ }
+
+ /* Append BLOCK if we have it */
+ if (block > -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "BLOCK");
+ redis_cmd_append_sstr_long(&cmdstr, block);
+ }
+
+ /* Append final STREAM key [key ...] id [id ...] arguments */
+ if (append_stream_args(&cmdstr, kt, redis_sock, slot TSRMLS_CC) < 0) {
+ efree(cmdstr.c);
+ return FAILURE;
+ }
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* XREADGROUP GROUP group consumer [COUNT count] [BLOCK ms]
+ * STREAMS key [key ...] id [id ...] */
+int redis_xreadgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ zval *z_streams;
+ HashTable *kt;
+ char *group, *consumer;
+ strlen_t grouplen, consumerlen;
+ int scount, argc;
+ zend_long count = -1, block = -1;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa|ll", &group,
+ &grouplen, &consumer, &consumerlen, &z_streams,
+ &count, &block) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* Redis requires at least one stream */
+ kt = Z_ARRVAL_P(z_streams);
+ if ((scount = zend_hash_num_elements(kt)) < 1) {
+ return FAILURE;
+ }
+
+ /* Calculate argc and start constructing commands */
+ argc = 4 + (2 * scount) + (2 * (count > -1)) + (2 * (block > -1));
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XREADGROUP");
+
+ /* Group and consumer */
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "GROUP");
+ redis_cmd_append_sstr(&cmdstr, group, grouplen);
+ redis_cmd_append_sstr(&cmdstr, consumer, consumerlen);
+
+ /* Append COUNT if we have it */
+ if (count > -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "COUNT");
+ redis_cmd_append_sstr_long(&cmdstr, count);
+ }
+
+ /* Append BLOCK argument if we have it */
+ if (block > -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "BLOCK");
+ redis_cmd_append_sstr_long(&cmdstr, block);
+ }
+
+ /* Finally append stream and id args */
+ if (append_stream_args(&cmdstr, kt, redis_sock, slot TSRMLS_CC) < 0) {
+ efree(cmdstr.c);
+ return FAILURE;
+ }
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* XACK key group id [id ...] */
+int redis_xack_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ char *key, *group;
+ strlen_t keylen, grouplen;
+ zend_string *idstr;
+ zval *z_ids, *z_id;
+ HashTable *ht_ids;
+ int idcount;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa", &key, &keylen,
+ &group, &grouplen, &z_ids) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ ht_ids = Z_ARRVAL_P(z_ids);
+ if ((idcount = zend_hash_num_elements(ht_ids)) < 1) {
+ return FAILURE;
+ }
+
+ /* Create command and add initial arguments */
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, 2 + idcount, "XACK");
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+ redis_cmd_append_sstr(&cmdstr, group, grouplen);
+
+ /* Append IDs */
+ ZEND_HASH_FOREACH_VAL(ht_ids, z_id) {
+ idstr = zval_get_string(z_id);
+ redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(idstr), ZSTR_LEN(idstr));
+ zend_string_release(idstr);
+ } ZEND_HASH_FOREACH_END();
+
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* XCLAIM options container */
+typedef struct xclaimOptions {
+ struct {
+ char *type;
+ zend_long time;
+ } idle;
+ zend_long retrycount;
+ int force;
+ int justid;
+} xclaimOptions;
+
+/* Helper to extract XCLAIM options */
+static void get_xclaim_options(zval *z_arr, xclaimOptions *opt TSRMLS_DC) {
+ HashTable *ht;
+ zend_string *zkey;
+ char *kval;
+ strlen_t klen;
+ ulong idx;
+ zval *zv;
+
+ PHPREDIS_NOTUSED(idx);
+
+ /* Initialize options array to sane defaults */
+ memset(opt, 0, sizeof(*opt));
+ opt->retrycount = -1;
+ opt->idle.time = -1;
+
+ /* Early return if we don't have any options */
+ if (z_arr == NULL)
+ return;
+
+ /* Iterate over our options array */
+ ht = Z_ARRVAL_P(z_arr);
+ ZEND_HASH_FOREACH_KEY_VAL(ht, idx, zkey, zv) {
+ if (zkey) {
+ /* Every key => value xclaim option requires a long and Redis
+ * treats -1 as not being passed so skip negative values too. */
+ if (Z_TYPE_P(zv) != IS_LONG || Z_LVAL_P(zv) < 0)
+ continue;
+
+ kval = ZSTR_VAL(zkey);
+ klen = ZSTR_LEN(zkey);
+ if (klen == 4) {
+ if (!strncasecmp(kval, "TIME", 4)) {
+ opt->idle.type = "TIME";
+ opt->idle.time = Z_LVAL_P(zv);
+ } else if (!strncasecmp(kval, "IDLE", 4)) {
+ opt->idle.type = "IDLE";
+ opt->idle.time = Z_LVAL_P(zv);
+ }
+ } else if (klen == 10 && !strncasecmp(kval, "RETRYCOUNT", 10)) {
+ opt->retrycount = Z_LVAL_P(zv);
+ }
+ } else {
+ if (Z_TYPE_P(zv) == IS_STRING) {
+ kval = Z_STRVAL_P(zv);
+ klen = Z_STRLEN_P(zv);
+ if (klen == 5 && !strncasecmp(kval, "FORCE", 5)) {
+ opt->force = 1;
+ } else if (klen == 6 && !strncasecmp(kval, "JUSTID", 6)) {
+ opt->justid = 1;
+ }
+ }
+ }
+ } ZEND_HASH_FOREACH_END();
+}
+
+/* Count argc for any options we may have */
+static int xclaim_options_argc(xclaimOptions *opt) {
+ int argc = 0;
+
+ if (opt->idle.type != NULL && opt->idle.time != -1)
+ argc += 2;
+ if (opt->retrycount != -1)
+ argc += 2;
+ if (opt->force)
+ argc++;
+ if (opt->justid)
+ argc++;
+
+ return argc;
+}
+
+/* Append XCLAIM options */
+static void append_xclaim_options(smart_string *cmd, xclaimOptions *opt) {
+ /* IDLE/TIME long */
+ if (opt->idle.type != NULL && opt->idle.time != -1) {
+ redis_cmd_append_sstr(cmd, opt->idle.type, strlen(opt->idle.type));
+ redis_cmd_append_sstr_long(cmd, opt->idle.time);
+ }
+
+ /* RETRYCOUNT */
+ if (opt->retrycount != -1) {
+ REDIS_CMD_APPEND_SSTR_STATIC(cmd, "RETRYCOUNT");
+ redis_cmd_append_sstr_long(cmd, opt->retrycount);
+ }
+
+ /* FORCE and JUSTID */
+ if (opt->force)
+ REDIS_CMD_APPEND_SSTR_STATIC(cmd, "FORCE");
+ if (opt->justid)
+ REDIS_CMD_APPEND_SSTR_STATIC(cmd, "JUSTID");
+}
+
+/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
+ [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
+ [FORCE] [JUSTID] */
+int redis_xclaim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ smart_string cmdstr = {0};
+ char *key, *group, *consumer;
+ strlen_t keylen, grouplen, consumerlen;
+ zend_long min_idle;
+ int argc, id_count;
+ zval *z_ids, *z_id, *z_opts = NULL;
+ zend_string *zstr;
+ HashTable *ht_ids;
+ xclaimOptions opts;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sssla|a", &key, &keylen,
+ &group, &grouplen, &consumer, &consumerlen, &min_idle,
+ &z_ids, &z_opts) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* At least one id is required */
+ ht_ids = Z_ARRVAL_P(z_ids);
+ if ((id_count = zend_hash_num_elements(ht_ids)) < 1) {
+ return FAILURE;
+ }
+
+ /* Extract options array if we've got them */
+ get_xclaim_options(z_opts, &opts TSRMLS_CC);
+
+ /* Now we have enough information to calculate argc */
+ argc = 4 + id_count + xclaim_options_argc(&opts);
+
+ /* Start constructing our command */
+ REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XCLAIM");
+ redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot);
+ redis_cmd_append_sstr(&cmdstr, group, grouplen);
+ redis_cmd_append_sstr(&cmdstr, consumer, consumerlen);
+ redis_cmd_append_sstr_long(&cmdstr, min_idle);
+
+ /* Add IDs */
+ ZEND_HASH_FOREACH_VAL(ht_ids, z_id) {
+ zstr = zval_get_string(z_id);
+ redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(zstr), ZSTR_LEN(zstr));
+ zend_string_release(zstr);
+ } ZEND_HASH_FOREACH_END();
+
+ /* Finally add our options */
+ append_xclaim_options(&cmdstr, &opts);
+
+ /* Success */
+ *cmd = cmdstr.c;
+ *cmd_len = cmdstr.len;
+ return SUCCESS;
+}
+
+/* XGROUP HELP
+ * XGROUP SETID key group id
+ * XGROUP DELGROUP key groupname
+ * XGROUP CREATE key groupname id
+ * XGROUP DELCONSUMER key groupname consumername */
+int redis_xgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ char *op, *key = NULL, *arg1 = NULL, *arg2 = NULL;
+ strlen_t oplen, keylen, arg1len, arg2len;
+ int argc = ZEND_NUM_ARGS();
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|ssss", &op, &oplen,
+ &key, &keylen, &arg1, &arg1len, &arg2, &arg2len)
+ == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ if (argc == 1 && oplen == 4 && !strncasecmp(op, "HELP", 4)) {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "s", "HELP", 4);
+ return SUCCESS;
+ } else if (argc == 4 && ((oplen == 5 && !strncasecmp(op, "SETID", 5)) ||
+ (oplen == 6 && !strncasecmp(op, "CREATE", 6)) ||
+ (oplen == 11 && !strncasecmp(op, "DELCONSUMER", 11))))
+ {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "skss", op, oplen, key, keylen,
+ arg1, arg1len, arg2, arg2len);
+ return SUCCESS;
+ } else if (argc == 3 && ((oplen == 7 && !strncasecmp(op, "DELGROUP", 7)))) {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "sks", op, oplen, key,
+ keylen, arg1, arg1len);
+ return SUCCESS;
+ }
+
+ /* Didn't detect any valid XGROUP command pattern */
+ return FAILURE;
+}
+
+
+
+/* XINFO CONSUMERS key group
+ * XINFO GROUPS key
+ * XINFO STREAM key
+ * XINFO HELP */
+int redis_xinfo_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ char *op, *key, *arg;
+ strlen_t oplen, keylen, arglen;
+ char fmt[4];
+ int argc = ZEND_NUM_ARGS();
+
+ if (argc > 3 || zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|ss",
+ &op, &oplen, &key, &keylen, &arg,
+ &arglen) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ /* Our format is simply "s", "sk" or "sks" depending on argc */
+ memcpy(fmt, "sks", sizeof("sks")-1);
+ fmt[argc] = '\0';
+
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XINFO", fmt, op, oplen, key, keylen,
+ arg, arglen);
+ return SUCCESS;
+}
+
+/* XTRIM MAXLEN [~] count */
+int redis_xtrim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
+ char **cmd, int *cmd_len, short *slot, void **ctx)
+{
+ char *key;
+ strlen_t keylen;
+ zend_long maxlen;
+ zend_bool approx = 0;
+
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sl|b", &key, &keylen,
+ &maxlen, &approx) == FAILURE)
+ {
+ return FAILURE;
+ }
+
+ if (approx) {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XTRIM", "kssl", key, keylen,
+ "MAXLEN", 6, "~", 1, maxlen);
+ } else {
+ *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XTRIM", "ksl", key, keylen,
+ "MAXLEN", 6, maxlen);
+ }
+
+ return SUCCESS;
+}
+
/*
* Redis commands that don't deal with the server at all. The RedisSock*
* pointer is the only thing retreived differently, so we just take that