diff options
author | Michael Grunder <michael.grunder@gmail.com> | 2018-09-29 21:59:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-29 21:59:01 +0300 |
commit | 2c9e0572361d5131f24fbc81a8f7baaafb671994 (patch) | |
tree | 6982b1e1f17b7cf2fc7e024652fad8212edadacd /cluster_library.c | |
parent | bfd274712eeb372926d1106b3da3c4fc19c0a48a (diff) |
Streams (#1413)
Streams API
Diffstat (limited to 'cluster_library.c')
-rw-r--r-- | cluster_library.c | 165 |
1 files changed, 144 insertions, 21 deletions
diff --git a/cluster_library.c b/cluster_library.c index 8f157873..5f710c5e 100644 --- a/cluster_library.c +++ b/cluster_library.c @@ -103,7 +103,7 @@ void cluster_free_reply(clusterReply *reply, int free_data) { for (i = 0; i < reply->elements && reply->element[i]; i++) { cluster_free_reply(reply->element[i], free_data); } - efree(reply->element); + if (reply->element) efree(reply->element); break; default: break; @@ -113,7 +113,8 @@ void cluster_free_reply(clusterReply *reply, int free_data) { static void cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements, - clusterReply **element, int *err TSRMLS_DC) + clusterReply **element, int status_strings, + int *err TSRMLS_DC) { int i; size_t sz; @@ -141,6 +142,7 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements, return; } r->len = (long long)sz; + if (status_strings) r->str = estrndup(buf, r->len); break; case TYPE_INT: r->integer = len; @@ -155,11 +157,15 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements, } break; case TYPE_MULTIBULK: - r->element = ecalloc(r->len,sizeof(clusterReply*)); - r->elements = r->len; - cluster_multibulk_resp_recursive(sock, r->elements, r->element, - err TSRMLS_CC); - if (*err) return; + if (r->len >= 0) { + r->elements = r->len; + if (r->len > 0) { + r->element = ecalloc(r->len,sizeof(clusterReply*)); + cluster_multibulk_resp_recursive(sock, r->elements, r->element, + status_strings, err TSRMLS_CC); + } + if (*err) return; + } break; default: *err = 1; @@ -191,15 +197,17 @@ static RedisSock *cluster_slot_sock(redisCluster *c, unsigned short slot, } /* Read the response from a cluster */ -clusterReply *cluster_read_resp(redisCluster *c TSRMLS_DC) { - return cluster_read_sock_resp(c->cmd_sock,c->reply_type,c->reply_len TSRMLS_CC); +clusterReply *cluster_read_resp(redisCluster *c, int status_strings TSRMLS_DC) { + return cluster_read_sock_resp(c->cmd_sock, c->reply_type, + status_strings ? c->line_reply : NULL, + c->reply_len TSRMLS_CC); } /* Read any sort of response from the socket, having already issued the * command and consumed the reply type and meta info (length) */ clusterReply* cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type, - size_t len TSRMLS_DC) + char *line_reply, size_t len TSRMLS_DC) { clusterReply *r; @@ -214,6 +222,10 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type, r->integer = len; break; case TYPE_LINE: + if (line_reply) { + r->str = estrndup(line_reply, len); + r->len = len; + } case TYPE_ERR: return r; case TYPE_BULK: @@ -229,7 +241,7 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type, if (len != (size_t)-1) { r->element = ecalloc(len, sizeof(clusterReply*)*len); cluster_multibulk_resp_recursive(redis_sock, len, r->element, - &err TSRMLS_CC); + line_reply != NULL, &err TSRMLS_CC); } break; default: @@ -618,7 +630,7 @@ clusterReply* cluster_get_slots(RedisSock *redis_sock TSRMLS_DC) } // Consume the rest of our response - if ((r = cluster_read_sock_resp(redis_sock, type, len TSRMLS_CC)) == NULL || + if ((r = cluster_read_sock_resp(redis_sock, type, NULL, len TSRMLS_CC)) == NULL || r->type != TYPE_MULTIBULK || r->elements < 1) { if (r) cluster_free_reply(r, 1); @@ -1486,6 +1498,24 @@ PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, efree(resp); } +PHP_REDIS_API void +cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) +{ + char *p; + + /* Cluster already has the reply so abort if this isn't a LINE response *or* if for + * some freaky reason we don't detect a null terminator */ + if (c->reply_type != TYPE_LINE || !(p = memchr(c->line_reply,'\0',sizeof(c->line_reply)))) { + CLUSTER_RETURN_FALSE(c); + } + + if (CLUSTER_IS_ATOMIC(c)) { + CLUSTER_RETURN_STRING(c, c->line_reply, p - c->line_reply); + } else { + add_next_index_stringl(&c->multi_resp, c->line_reply, p - c->line_reply); + } +} + /* BULK response handler */ PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) @@ -1799,12 +1829,15 @@ static void cluster_mbulk_variant_resp(clusterReply *r, zval *z_ret) add_next_index_long(z_ret, r->integer); break; case TYPE_LINE: - add_next_index_bool(z_ret, 1); + if (r->str) { + add_next_index_stringl(z_ret, r->str, r->len); + } else { + add_next_index_bool(z_ret, 1); + } break; case TYPE_BULK: if (r->len > -1) { add_next_index_stringl(z_ret, r->str, r->len); - efree(r->str); } else { add_next_index_null(z_ret); } @@ -1827,15 +1860,16 @@ static void cluster_mbulk_variant_resp(clusterReply *r, zval *z_ret) /* Variant response handling, for things like EVAL and various other responses * where we just map the replies from Redis type values to PHP ones directly. */ -PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, - void *ctx) +static void +cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + int status_strings, void *ctx) { clusterReply *r; zval zv, *z_arr = &zv; int i; // Make sure we can read it - if ((r = cluster_read_resp(c TSRMLS_CC)) == NULL) { + if ((r = cluster_read_resp(c, status_strings TSRMLS_CC)) == NULL) { CLUSTER_RETURN_FALSE(c); } @@ -1849,7 +1883,11 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust RETVAL_FALSE; break; case TYPE_LINE: - RETVAL_TRUE; + if (status_strings) { + RETVAL_STRINGL(r->str, r->len); + } else { + RETVAL_TRUE; + } break; case TYPE_BULK: if (r->len < 0) { @@ -1879,14 +1917,17 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust add_next_index_bool(&c->multi_resp, 0); break; case TYPE_LINE: - add_next_index_bool(&c->multi_resp, 1); + if (status_strings) { + add_next_index_stringl(&c->multi_resp, r->str, r->len); + } else { + add_next_index_bool(&c->multi_resp, 1); + } break; case TYPE_BULK: if (r->len < 0) { add_next_index_null(&c->multi_resp); } else { add_next_index_stringl(&c->multi_resp, r->str, r->len); - efree(r->str); } break; case TYPE_MULTIBULK: @@ -1899,7 +1940,19 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust } // Free our response structs, but not allocated data itself - cluster_free_reply(r, 0); + cluster_free_reply(r, 1); +} + +PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + void *ctx) +{ + cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 0, ctx); +} + +PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, + void *ctx) +{ + cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, ctx); } /* Generic MULTI BULK response processor */ @@ -2052,6 +2105,76 @@ PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS, redisC } } +/* XRANGE */ +PHP_REDIS_API void +cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { + zval zv, *z_messages = &zv; + + REDIS_MAKE_STD_ZVAL(z_messages); + array_init(z_messages); + + c->cmd_sock->serializer = c->flags->serializer; + c->cmd_sock->compression = c->flags->compression; + + if (redis_read_stream_messages(c->cmd_sock, c->reply_len, z_messages TSRMLS_CC) < 0) { + zval_dtor(z_messages); + REDIS_FREE_ZVAL(z_messages); + CLUSTER_RETURN_FALSE(c); + } + + if (CLUSTER_IS_ATOMIC(c)) { + RETVAL_ZVAL(z_messages, 0, 1); + } else { + add_next_index_zval(&c->multi_resp, z_messages); + } +} + +/* XREAD */ +PHP_REDIS_API void +cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { + zval zv, *z_streams = &zv; + + REDIS_MAKE_STD_ZVAL(z_streams); + array_init(z_streams); + + c->cmd_sock->serializer = c->flags->serializer; + c->cmd_sock->compression = c->flags->compression; + + if (redis_read_stream_messages_multi(c->cmd_sock, c->reply_len, z_streams TSRMLS_CC) < 0) { + zval_dtor(z_streams); + REDIS_FREE_ZVAL(z_streams); + CLUSTER_RETURN_FALSE(c); + } + + if (CLUSTER_IS_ATOMIC(c)) { + RETVAL_ZVAL(z_streams, 0, 1); + } else { + add_next_index_zval(&c->multi_resp, z_streams); + } +} + +/* XCLAIM */ +PHP_REDIS_API void +cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) { + zval zv, *z_msg = &zv; + + REDIS_MAKE_STD_ZVAL(z_msg); + array_init(z_msg); + + if (redis_read_xclaim_response(c->cmd_sock, c->reply_len, z_msg TSRMLS_CC) < 0) { + zval_dtor(z_msg); + REDIS_FREE_ZVAL(z_msg); + CLUSTER_RETURN_FALSE(c); + } + + if (CLUSTER_IS_ATOMIC(c)) { + RETVAL_ZVAL(z_msg, 0, 1); + } else { + add_next_index_zval(&c->multi_resp, z_msg); + } + +} + /* MULTI BULK response loop where we might pull the next one */ PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, int pull, mbulk_cb cb, zval *z_ret) |