diff options
author | Michael Grunder <michael.grunder@gmail.com> | 2018-09-29 21:59:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-29 21:59:01 +0300 |
commit | 2c9e0572361d5131f24fbc81a8f7baaafb671994 (patch) | |
tree | 6982b1e1f17b7cf2fc7e024652fad8212edadacd /redis_commands.c | |
parent | bfd274712eeb372926d1106b3da3c4fc19c0a48a (diff) |
Streams (#1413)
Streams API
Diffstat (limited to 'redis_commands.c')
-rw-r--r-- | redis_commands.c | 643 |
1 files changed, 637 insertions, 6 deletions
diff --git a/redis_commands.c b/redis_commands.c index c47669ad..e9d26c3d 100644 --- a/redis_commands.c +++ b/redis_commands.c @@ -1050,13 +1050,16 @@ int redis_key_varval_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, } /* Commands that take a key and then an array of values */ -int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, - char *kw, char **cmd, int *cmd_len, short *slot, - void **ctx) +#define VAL_TYPE_VALUES 1 +#define VAL_TYPE_STRINGS 2 +static int gen_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, int valtype, char **cmd, int *cmd_len, + short *slot, void **ctx) { zval *z_arr, *z_val; HashTable *ht_arr; smart_string cmdstr = {0}; + zend_string *zstr; int key_free, val_free, argc = 1; strlen_t val_len, key_len; char *key, *val; @@ -1080,10 +1083,17 @@ int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, if (key_free) efree(key); /* Iterate our hash table, serializing and appending values */ + assert(valtype == VAL_TYPE_VALUES || valtype == VAL_TYPE_STRINGS); ZEND_HASH_FOREACH_VAL(ht_arr, z_val) { - val_free = redis_pack(redis_sock, z_val, &val, &val_len TSRMLS_CC); - redis_cmd_append_sstr(&cmdstr, val, val_len); - if (val_free) efree(val); + if (valtype == VAL_TYPE_VALUES) { + val_free = redis_pack(redis_sock, z_val, &val, &val_len TSRMLS_CC); + redis_cmd_append_sstr(&cmdstr, val, val_len); + if (val_free) efree(val); + } else { + zstr = zval_get_string(z_val); + redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(zstr), ZSTR_LEN(zstr)); + zend_string_release(zstr); + } } ZEND_HASH_FOREACH_END(); *cmd_len = cmdstr.len; @@ -1092,6 +1102,22 @@ int redis_key_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, return SUCCESS; } +int redis_key_val_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, + void **ctx) +{ + return gen_key_arr_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, kw, + VAL_TYPE_VALUES, cmd, cmd_len, slot, ctx); +} + +int redis_key_str_arr_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, + void **ctx) +{ + return gen_key_arr_cmd(INTERNAL_FUNCTION_PARAM_PASSTHRU, redis_sock, kw, + VAL_TYPE_STRINGS, cmd, cmd_len, slot, ctx); +} + /* Generic function that takes a variable number of keys, with an optional * timeout value. This can handle various SUNION/SUNIONSTORE/BRPOP type * commands. */ @@ -3147,6 +3173,611 @@ int redis_command_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, return SUCCESS; } +/* XADD */ +int redis_xadd_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + zend_string *arrkey; + zval *z_fields, *value; + zend_long maxlen = 0; + zend_bool approx = 0; + ulong idx; + HashTable *ht_fields; + int fcount, argc; + char *key, *id; + strlen_t keylen, idlen; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa|lb", &key, &keylen, + &id, &idlen, &z_fields, &maxlen, &approx) == FAILURE) + { + return FAILURE; + } + + /* At least one field and string are required */ + ht_fields = Z_ARRVAL_P(z_fields); + if ((fcount = zend_hash_num_elements(ht_fields)) == 0) { + return FAILURE; + } + + if (maxlen < 0 || (maxlen == 0 && approx != 0)) { + php_error_docref(NULL TSRMLS_CC, E_WARNING, + "Warning: Invalid MAXLEN argument or approximate flag"); + } + + + /* Calculate argc for XADD. It's a bit complex because we've got + * an optional MAXLEN argument which can either take the form MAXLEN N + * or MAXLEN ~ N */ + argc = 2 + (fcount*2) + (maxlen > 0 ? (approx ? 3 : 2) : 0); + + /* XADD key ID field string [field string ...] */ + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XADD"); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + + /* Now append our MAXLEN bits if we've got them */ + if (maxlen > 0) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "MAXLEN"); + REDIS_CMD_APPEND_SSTR_OPT_STATIC(&cmdstr, approx, "~"); + redis_cmd_append_sstr_long(&cmdstr, maxlen); + } + + /* Now append ID and field(s) */ + redis_cmd_append_sstr(&cmdstr, id, idlen); + ZEND_HASH_FOREACH_KEY_VAL(ht_fields, idx, arrkey, value) { + redis_cmd_append_sstr_arrkey(&cmdstr, arrkey, idx); + redis_cmd_append_sstr_zval(&cmdstr, value, redis_sock TSRMLS_CC); + } ZEND_HASH_FOREACH_END(); + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +// XPENDING key group [start end count] [consumer] +int redis_xpending_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + char *key, *group, *start = NULL, *end = NULL, *consumer = NULL; + strlen_t keylen, grouplen, startlen, endlen, consumerlen; + int argc; + zend_long count = -1; + + // XPENDING mystream group55 - + 10 consumer-123 + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ss|ssls", &key, + &keylen, &group, &grouplen, &start, &startlen, + &end, &endlen, &count, &consumer, &consumerlen) + == FAILURE) + { + return FAILURE; + } + + /* If we've been passed a start argument, we also need end and count */ + if (start != NULL && (end == NULL || count < 0)) { + return FAILURE; + } + + /* Calculate argc. It's either 2, 5, or 6 */ + argc = 2 + (start != NULL ? 3 + (consumer != NULL) : 0); + + /* Construct command and add required arguments */ + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XPENDING"); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + redis_cmd_append_sstr(&cmdstr, group, grouplen); + + /* Add optional argumentst */ + if (start) { + redis_cmd_append_sstr(&cmdstr, start, startlen); + redis_cmd_append_sstr(&cmdstr, end, endlen); + redis_cmd_append_sstr_long(&cmdstr, (long)count); + + /* Finally add consumer if we have it */ + if (consumer) redis_cmd_append_sstr(&cmdstr, consumer, consumerlen); + } + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* X[REV]RANGE key start end [COUNT count] */ +int redis_xrange_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char *kw, char **cmd, int *cmd_len, short *slot, + void **ctx) +{ + smart_string cmdstr = {0}; + char *key, *start, *end; + strlen_t keylen, startlen, endlen; + zend_long count = -1; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sss|l", &key, &keylen, + &start, &startlen, &end, &endlen, &count) + == FAILURE) + { + return FAILURE; + } + + redis_cmd_init_sstr(&cmdstr, 3 + (2 * (count > -1)), kw, strlen(kw)); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + redis_cmd_append_sstr(&cmdstr, start, startlen); + redis_cmd_append_sstr(&cmdstr, end, endlen); + + if (count > -1) { + redis_cmd_append_sstr(&cmdstr, "COUNT", sizeof("COUNT")-1); + redis_cmd_append_sstr_long(&cmdstr, count); + } + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* Helper function to take an associative array and append the Redis + * STREAMS stream [stream...] id [id ...] arguments to a command string. */ +static int +append_stream_args(smart_string *cmdstr, HashTable *ht, RedisSock *redis_sock, + short *slot TSRMLS_DC) +{ + char *kptr, kbuf[40]; + int klen, i, pos = 0; + zend_string *key, *idstr; + short oldslot; + zval **id; + ulong idx; + + /* Append STREAM qualifier */ + REDIS_CMD_APPEND_SSTR_STATIC(cmdstr, "STREAMS"); + + /* Sentinel slot */ + if (slot) oldslot = -1; + + /* Allocate memory to keep IDs */ + id = emalloc(sizeof(*id) * zend_hash_num_elements(ht)); + + /* Iterate over our stream => id array appending streams and retaining each + * value for final arguments */ + ZEND_HASH_FOREACH_KEY_VAL(ht, idx, key, id[pos++]) { + if (key) { + klen = ZSTR_LEN(key); + kptr = ZSTR_VAL(key); + } else { + klen = snprintf(kbuf, sizeof(kbuf), "%ld", (long)idx); + kptr = (char*)kbuf; + } + + /* Append stream key */ + redis_cmd_append_sstr_key(cmdstr, kptr, klen, redis_sock, slot); + + /* Protect the user against CROSSSLOT to avoid confusion */ + if (slot) { + if (oldslot != -1 && *slot != oldslot) { + php_error_docref(NULL TSRMLS_CC, E_WARNING, + "Warning, not all keys hash to the same slot!"); + efree(id); + return FAILURE; + } + oldslot = *slot; + } + } ZEND_HASH_FOREACH_END(); + + /* Add our IDs */ + for (i = 0; i < pos; i++) { + idstr = zval_get_string(id[i]); + redis_cmd_append_sstr(cmdstr, ZSTR_VAL(idstr), ZSTR_LEN(idstr)); + zend_string_release(idstr); + } + + /* Clean up ID container array */ + efree(id); + + return 0; +} + +/* XREAD [COUNT count] [BLOCK ms] STREAMS key [key ...] id [id ...] */ +int redis_xread_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + zend_long count = -1, block = -1; + zval *z_streams; + int argc, scount; + HashTable *kt; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a|ll", &z_streams, + &count, &block) == FAILURE) + { + return FAILURE; + } + + /* At least one stream and ID is required */ + kt = Z_ARRVAL_P(z_streams); + if ((scount = zend_hash_num_elements(kt)) < 1) { + return FAILURE; + } + + /* Calculate argc and start constructing command */ + argc = 1 + (2 * scount) + (2 * (count > -1)) + (2 * (block > -1)); + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XREAD"); + + /* Append COUNT if we have it */ + if (count > -1) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "COUNT"); + redis_cmd_append_sstr_long(&cmdstr, count); + } + + /* Append BLOCK if we have it */ + if (block > -1) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "BLOCK"); + redis_cmd_append_sstr_long(&cmdstr, block); + } + + /* Append final STREAM key [key ...] id [id ...] arguments */ + if (append_stream_args(&cmdstr, kt, redis_sock, slot TSRMLS_CC) < 0) { + efree(cmdstr.c); + return FAILURE; + } + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* XREADGROUP GROUP group consumer [COUNT count] [BLOCK ms] + * STREAMS key [key ...] id [id ...] */ +int redis_xreadgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + zval *z_streams; + HashTable *kt; + char *group, *consumer; + strlen_t grouplen, consumerlen; + int scount, argc; + zend_long count = -1, block = -1; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa|ll", &group, + &grouplen, &consumer, &consumerlen, &z_streams, + &count, &block) == FAILURE) + { + return FAILURE; + } + + /* Redis requires at least one stream */ + kt = Z_ARRVAL_P(z_streams); + if ((scount = zend_hash_num_elements(kt)) < 1) { + return FAILURE; + } + + /* Calculate argc and start constructing commands */ + argc = 4 + (2 * scount) + (2 * (count > -1)) + (2 * (block > -1)); + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XREADGROUP"); + + /* Group and consumer */ + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "GROUP"); + redis_cmd_append_sstr(&cmdstr, group, grouplen); + redis_cmd_append_sstr(&cmdstr, consumer, consumerlen); + + /* Append COUNT if we have it */ + if (count > -1) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "COUNT"); + redis_cmd_append_sstr_long(&cmdstr, count); + } + + /* Append BLOCK argument if we have it */ + if (block > -1) { + REDIS_CMD_APPEND_SSTR_STATIC(&cmdstr, "BLOCK"); + redis_cmd_append_sstr_long(&cmdstr, block); + } + + /* Finally append stream and id args */ + if (append_stream_args(&cmdstr, kt, redis_sock, slot TSRMLS_CC) < 0) { + efree(cmdstr.c); + return FAILURE; + } + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* XACK key group id [id ...] */ +int redis_xack_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + char *key, *group; + strlen_t keylen, grouplen; + zend_string *idstr; + zval *z_ids, *z_id; + HashTable *ht_ids; + int idcount; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ssa", &key, &keylen, + &group, &grouplen, &z_ids) == FAILURE) + { + return FAILURE; + } + + ht_ids = Z_ARRVAL_P(z_ids); + if ((idcount = zend_hash_num_elements(ht_ids)) < 1) { + return FAILURE; + } + + /* Create command and add initial arguments */ + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, 2 + idcount, "XACK"); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + redis_cmd_append_sstr(&cmdstr, group, grouplen); + + /* Append IDs */ + ZEND_HASH_FOREACH_VAL(ht_ids, z_id) { + idstr = zval_get_string(z_id); + redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(idstr), ZSTR_LEN(idstr)); + zend_string_release(idstr); + } ZEND_HASH_FOREACH_END(); + + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* XCLAIM options container */ +typedef struct xclaimOptions { + struct { + char *type; + zend_long time; + } idle; + zend_long retrycount; + int force; + int justid; +} xclaimOptions; + +/* Helper to extract XCLAIM options */ +static void get_xclaim_options(zval *z_arr, xclaimOptions *opt TSRMLS_DC) { + HashTable *ht; + zend_string *zkey; + char *kval; + strlen_t klen; + ulong idx; + zval *zv; + + PHPREDIS_NOTUSED(idx); + + /* Initialize options array to sane defaults */ + memset(opt, 0, sizeof(*opt)); + opt->retrycount = -1; + opt->idle.time = -1; + + /* Early return if we don't have any options */ + if (z_arr == NULL) + return; + + /* Iterate over our options array */ + ht = Z_ARRVAL_P(z_arr); + ZEND_HASH_FOREACH_KEY_VAL(ht, idx, zkey, zv) { + if (zkey) { + /* Every key => value xclaim option requires a long and Redis + * treats -1 as not being passed so skip negative values too. */ + if (Z_TYPE_P(zv) != IS_LONG || Z_LVAL_P(zv) < 0) + continue; + + kval = ZSTR_VAL(zkey); + klen = ZSTR_LEN(zkey); + if (klen == 4) { + if (!strncasecmp(kval, "TIME", 4)) { + opt->idle.type = "TIME"; + opt->idle.time = Z_LVAL_P(zv); + } else if (!strncasecmp(kval, "IDLE", 4)) { + opt->idle.type = "IDLE"; + opt->idle.time = Z_LVAL_P(zv); + } + } else if (klen == 10 && !strncasecmp(kval, "RETRYCOUNT", 10)) { + opt->retrycount = Z_LVAL_P(zv); + } + } else { + if (Z_TYPE_P(zv) == IS_STRING) { + kval = Z_STRVAL_P(zv); + klen = Z_STRLEN_P(zv); + if (klen == 5 && !strncasecmp(kval, "FORCE", 5)) { + opt->force = 1; + } else if (klen == 6 && !strncasecmp(kval, "JUSTID", 6)) { + opt->justid = 1; + } + } + } + } ZEND_HASH_FOREACH_END(); +} + +/* Count argc for any options we may have */ +static int xclaim_options_argc(xclaimOptions *opt) { + int argc = 0; + + if (opt->idle.type != NULL && opt->idle.time != -1) + argc += 2; + if (opt->retrycount != -1) + argc += 2; + if (opt->force) + argc++; + if (opt->justid) + argc++; + + return argc; +} + +/* Append XCLAIM options */ +static void append_xclaim_options(smart_string *cmd, xclaimOptions *opt) { + /* IDLE/TIME long */ + if (opt->idle.type != NULL && opt->idle.time != -1) { + redis_cmd_append_sstr(cmd, opt->idle.type, strlen(opt->idle.type)); + redis_cmd_append_sstr_long(cmd, opt->idle.time); + } + + /* RETRYCOUNT */ + if (opt->retrycount != -1) { + REDIS_CMD_APPEND_SSTR_STATIC(cmd, "RETRYCOUNT"); + redis_cmd_append_sstr_long(cmd, opt->retrycount); + } + + /* FORCE and JUSTID */ + if (opt->force) + REDIS_CMD_APPEND_SSTR_STATIC(cmd, "FORCE"); + if (opt->justid) + REDIS_CMD_APPEND_SSTR_STATIC(cmd, "JUSTID"); +} + +/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> + [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>] + [FORCE] [JUSTID] */ +int redis_xclaim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + smart_string cmdstr = {0}; + char *key, *group, *consumer; + strlen_t keylen, grouplen, consumerlen; + zend_long min_idle; + int argc, id_count; + zval *z_ids, *z_id, *z_opts = NULL; + zend_string *zstr; + HashTable *ht_ids; + xclaimOptions opts; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sssla|a", &key, &keylen, + &group, &grouplen, &consumer, &consumerlen, &min_idle, + &z_ids, &z_opts) == FAILURE) + { + return FAILURE; + } + + /* At least one id is required */ + ht_ids = Z_ARRVAL_P(z_ids); + if ((id_count = zend_hash_num_elements(ht_ids)) < 1) { + return FAILURE; + } + + /* Extract options array if we've got them */ + get_xclaim_options(z_opts, &opts TSRMLS_CC); + + /* Now we have enough information to calculate argc */ + argc = 4 + id_count + xclaim_options_argc(&opts); + + /* Start constructing our command */ + REDIS_CMD_INIT_SSTR_STATIC(&cmdstr, argc, "XCLAIM"); + redis_cmd_append_sstr_key(&cmdstr, key, keylen, redis_sock, slot); + redis_cmd_append_sstr(&cmdstr, group, grouplen); + redis_cmd_append_sstr(&cmdstr, consumer, consumerlen); + redis_cmd_append_sstr_long(&cmdstr, min_idle); + + /* Add IDs */ + ZEND_HASH_FOREACH_VAL(ht_ids, z_id) { + zstr = zval_get_string(z_id); + redis_cmd_append_sstr(&cmdstr, ZSTR_VAL(zstr), ZSTR_LEN(zstr)); + zend_string_release(zstr); + } ZEND_HASH_FOREACH_END(); + + /* Finally add our options */ + append_xclaim_options(&cmdstr, &opts); + + /* Success */ + *cmd = cmdstr.c; + *cmd_len = cmdstr.len; + return SUCCESS; +} + +/* XGROUP HELP + * XGROUP SETID key group id + * XGROUP DELGROUP key groupname + * XGROUP CREATE key groupname id + * XGROUP DELCONSUMER key groupname consumername */ +int redis_xgroup_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + char *op, *key = NULL, *arg1 = NULL, *arg2 = NULL; + strlen_t oplen, keylen, arg1len, arg2len; + int argc = ZEND_NUM_ARGS(); + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|ssss", &op, &oplen, + &key, &keylen, &arg1, &arg1len, &arg2, &arg2len) + == FAILURE) + { + return FAILURE; + } + + if (argc == 1 && oplen == 4 && !strncasecmp(op, "HELP", 4)) { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "s", "HELP", 4); + return SUCCESS; + } else if (argc == 4 && ((oplen == 5 && !strncasecmp(op, "SETID", 5)) || + (oplen == 6 && !strncasecmp(op, "CREATE", 6)) || + (oplen == 11 && !strncasecmp(op, "DELCONSUMER", 11)))) + { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "skss", op, oplen, key, keylen, + arg1, arg1len, arg2, arg2len); + return SUCCESS; + } else if (argc == 3 && ((oplen == 7 && !strncasecmp(op, "DELGROUP", 7)))) { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XGROUP", "sks", op, oplen, key, + keylen, arg1, arg1len); + return SUCCESS; + } + + /* Didn't detect any valid XGROUP command pattern */ + return FAILURE; +} + + + +/* XINFO CONSUMERS key group + * XINFO GROUPS key + * XINFO STREAM key + * XINFO HELP */ +int redis_xinfo_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + char *op, *key, *arg; + strlen_t oplen, keylen, arglen; + char fmt[4]; + int argc = ZEND_NUM_ARGS(); + + if (argc > 3 || zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|ss", + &op, &oplen, &key, &keylen, &arg, + &arglen) == FAILURE) + { + return FAILURE; + } + + /* Our format is simply "s", "sk" or "sks" depending on argc */ + memcpy(fmt, "sks", sizeof("sks")-1); + fmt[argc] = '\0'; + + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XINFO", fmt, op, oplen, key, keylen, + arg, arglen); + return SUCCESS; +} + +/* XTRIM MAXLEN [~] count */ +int redis_xtrim_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, + char **cmd, int *cmd_len, short *slot, void **ctx) +{ + char *key; + strlen_t keylen; + zend_long maxlen; + zend_bool approx = 0; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sl|b", &key, &keylen, + &maxlen, &approx) == FAILURE) + { + return FAILURE; + } + + if (approx) { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XTRIM", "kssl", key, keylen, + "MAXLEN", 6, "~", 1, maxlen); + } else { + *cmd_len = REDIS_CMD_SPPRINTF(cmd, "XTRIM", "ksl", key, keylen, + "MAXLEN", 6, maxlen); + } + + return SUCCESS; +} + /* * Redis commands that don't deal with the server at all. The RedisSock* * pointer is the only thing retreived differently, so we just take that |