diff options
author | michael-grunder <michael.grunder@gmail.com> | 2014-06-16 04:13:54 +0400 |
---|---|---|
committer | michael-grunder <michael.grunder@gmail.com> | 2015-05-06 01:02:03 +0300 |
commit | e4bab084b630100658070ddc830fda8852679ca8 (patch) | |
tree | 4846625b8b2c81373392f99831973e2c72308910 /cluster_library.c | |
parent | e31f3aa1cb31b0c1b5817a65f26ea75b49ce16cc (diff) |
MGET
Initial commit of logic for the MGET command, which will distribute
the commands across nodes in the cluster, based on how the slots hash.
The response for MGET in cluster differs from Redis proper in a couple
of ways.
1) Except for a critical failure (e.g. being unable to write the
command *anywhere*, or internal PHP HashTable malfunctions,
the client will always get an array back, even on within the
processing.
2) This array will correspond 1-1 with the keys requested, so that
the client can use the keys passed in array as a sentinel for
the responses.
3) When NO key is found, the resulting value will be NULL, whereas
in the event of an error it will be FALSE.
Diffstat (limited to 'cluster_library.c')
-rw-r--r-- | cluster_library.c | 93 |
1 files changed, 80 insertions, 13 deletions
diff --git a/cluster_library.c b/cluster_library.c index 014d2bb5..f8471cfe 100644 --- a/cluster_library.c +++ b/cluster_library.c @@ -152,6 +152,25 @@ void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *z_val kv->val_free = val_free; } +/* Free allocated memory for a clusterMultiCmd */ +void cluster_multi_free(clusterMultiCmd *mc) { + efree(mc->cmd.c); + efree(mc->args.c); +} + +/* Add an argument to a clusterMultiCmd */ +void cluster_multi_add(clusterMultiCmd *mc, char *data, int data_len) { + mc->argc++; + redis_cmd_append_sstr(&(mc->args), data, data_len); +} + +/* Finalize a clusterMutliCmd by constructing the whole thing */ +void cluster_multi_fini(clusterMultiCmd *mc) { + mc->cmd.len = 0; + redis_cmd_init_sstr(&(mc->cmd), mc->argc, mc->kw, mc->kw_len); + smart_str_appendl(&(mc->cmd), mc->args.c, mc->args.len); +} + /* Set our last error string encountered */ static void cluster_set_err(redisCluster *c, char *err, int err_len) { @@ -1619,7 +1638,6 @@ PHPAPI void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, RETURN_FALSE; } - // Override reply slot, process response, move on c->reply_slot = fi->slot; fi->callback(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, fi->ctx); fi = fi->next; @@ -1631,12 +1649,56 @@ PHPAPI void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, efree(c->multi_resp); } +/* Generic handler for things like MGET/MSET/MSETNX */ +PHPAPI void cluster_mgetset_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, + redisCluster *c, mbulk_cb func, + void *ctx) +{ + clusterMultiCtx *mctx = (clusterMultiCtx*)ctx; + + // Protect against an invalid response type, -1 response length, and failure + // to consume the responses. + short fail = c->reply_type != TYPE_MULTIBULK || c->reply_len == -1 || + func(SLOT_SOCK(c,c->reply_slot), mctx->z_multi, c->reply_len, + NULL TSRMLS_CC)==FAILURE; + + // If we had a failure, pad results with FALSE to indicate failure. Non + // existant keys (e.g. for MGET will come back as NULL) + if(fail) { + while(mctx->count--) { + add_next_index_bool(mctx->z_multi, 0); + } + } + + // If this is the tail of our multi command, we can set our returns + if(mctx->last) { + if(CLUSTER_IS_ATOMIC(c)) { + *return_value = *(mctx->z_multi); + efree(mctx->z_multi); + } else { + add_next_index_zval(c->multi_resp, mctx->z_multi); + } + } + + // Clean up this context item + efree(mctx); +} + +/* Response for MGET */ +PHPAPI void +cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + void *ctx) +{ + cluster_mgetset_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, + mbulk_resp_loop, ctx); +} + /* Raw MULTI BULK reply */ PHPAPI void cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { - cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, - c, mbulk_resp_loop_raw, NULL); + cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, + mbulk_resp_loop_raw, NULL); } /* Unserialize all the things */ @@ -1644,8 +1706,8 @@ PHPAPI void cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { - cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, - c, mbulk_resp_loop, NULL); + cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, + mbulk_resp_loop, NULL); } /* For handling responses where we get key, value, key, value that @@ -1653,16 +1715,16 @@ cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, PHPAPI void cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { - cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, - c, mbulk_resp_loop_zipstr, NULL); + cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, + mbulk_resp_loop_zipstr, NULL); } /* Handling key,value to key=>value where the values are doubles */ PHPAPI void cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { - cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, - c, mbulk_resp_loop_zipdbl, NULL); + cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, + mbulk_resp_loop_zipdbl, NULL); } /* Associate multi bulk response (for HMGET really) */ @@ -1713,11 +1775,16 @@ int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result, line = redis_sock_read(redis_sock, &line_len TSRMLS_CC); if(line == NULL) return FAILURE; - if(redis_unserialize(redis_sock, line, line_len, &z TSRMLS_CC)==1) { - add_next_index_zval(z_result, z); - efree(line); + if(line_len > 0) { + if(redis_unserialize(redis_sock, line, line_len, &z TSRMLS_CC)==1) { + add_next_index_zval(z_result, z); + efree(line); + } else { + add_next_index_stringl(z_result, line, line_len, 0); + } } else { - add_next_index_stringl(z_result, line, line_len, 0); + efree(line); + add_next_index_null(z_result); } } |