Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Grunder <michael.grunder@gmail.com>2018-09-29 21:59:01 +0300
committerGitHub <noreply@github.com>2018-09-29 21:59:01 +0300
commit2c9e0572361d5131f24fbc81a8f7baaafb671994 (patch)
tree6982b1e1f17b7cf2fc7e024652fad8212edadacd /cluster_library.c
parentbfd274712eeb372926d1106b3da3c4fc19c0a48a (diff)
Streams (#1413)
Streams API
Diffstat (limited to 'cluster_library.c')
-rw-r--r--cluster_library.c165
1 files changed, 144 insertions, 21 deletions
diff --git a/cluster_library.c b/cluster_library.c
index 8f157873..5f710c5e 100644
--- a/cluster_library.c
+++ b/cluster_library.c
@@ -103,7 +103,7 @@ void cluster_free_reply(clusterReply *reply, int free_data) {
for (i = 0; i < reply->elements && reply->element[i]; i++) {
cluster_free_reply(reply->element[i], free_data);
}
- efree(reply->element);
+ if (reply->element) efree(reply->element);
break;
default:
break;
@@ -113,7 +113,8 @@ void cluster_free_reply(clusterReply *reply, int free_data) {
static void
cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements,
- clusterReply **element, int *err TSRMLS_DC)
+ clusterReply **element, int status_strings,
+ int *err TSRMLS_DC)
{
int i;
size_t sz;
@@ -141,6 +142,7 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements,
return;
}
r->len = (long long)sz;
+ if (status_strings) r->str = estrndup(buf, r->len);
break;
case TYPE_INT:
r->integer = len;
@@ -155,11 +157,15 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements,
}
break;
case TYPE_MULTIBULK:
- r->element = ecalloc(r->len,sizeof(clusterReply*));
- r->elements = r->len;
- cluster_multibulk_resp_recursive(sock, r->elements, r->element,
- err TSRMLS_CC);
- if (*err) return;
+ if (r->len >= 0) {
+ r->elements = r->len;
+ if (r->len > 0) {
+ r->element = ecalloc(r->len,sizeof(clusterReply*));
+ cluster_multibulk_resp_recursive(sock, r->elements, r->element,
+ status_strings, err TSRMLS_CC);
+ }
+ if (*err) return;
+ }
break;
default:
*err = 1;
@@ -191,15 +197,17 @@ static RedisSock *cluster_slot_sock(redisCluster *c, unsigned short slot,
}
/* Read the response from a cluster */
-clusterReply *cluster_read_resp(redisCluster *c TSRMLS_DC) {
- return cluster_read_sock_resp(c->cmd_sock,c->reply_type,c->reply_len TSRMLS_CC);
+clusterReply *cluster_read_resp(redisCluster *c, int status_strings TSRMLS_DC) {
+ return cluster_read_sock_resp(c->cmd_sock, c->reply_type,
+ status_strings ? c->line_reply : NULL,
+ c->reply_len TSRMLS_CC);
}
/* Read any sort of response from the socket, having already issued the
* command and consumed the reply type and meta info (length) */
clusterReply*
cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type,
- size_t len TSRMLS_DC)
+ char *line_reply, size_t len TSRMLS_DC)
{
clusterReply *r;
@@ -214,6 +222,10 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type,
r->integer = len;
break;
case TYPE_LINE:
+ if (line_reply) {
+ r->str = estrndup(line_reply, len);
+ r->len = len;
+ }
case TYPE_ERR:
return r;
case TYPE_BULK:
@@ -229,7 +241,7 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type,
if (len != (size_t)-1) {
r->element = ecalloc(len, sizeof(clusterReply*)*len);
cluster_multibulk_resp_recursive(redis_sock, len, r->element,
- &err TSRMLS_CC);
+ line_reply != NULL, &err TSRMLS_CC);
}
break;
default:
@@ -618,7 +630,7 @@ clusterReply* cluster_get_slots(RedisSock *redis_sock TSRMLS_DC)
}
// Consume the rest of our response
- if ((r = cluster_read_sock_resp(redis_sock, type, len TSRMLS_CC)) == NULL ||
+ if ((r = cluster_read_sock_resp(redis_sock, type, NULL, len TSRMLS_CC)) == NULL ||
r->type != TYPE_MULTIBULK || r->elements < 1)
{
if (r) cluster_free_reply(r, 1);
@@ -1486,6 +1498,24 @@ PHP_REDIS_API void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,
efree(resp);
}
+PHP_REDIS_API void
+cluster_single_line_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx)
+{
+ char *p;
+
+ /* Cluster already has the reply so abort if this isn't a LINE response *or* if for
+ * some freaky reason we don't detect a null terminator */
+ if (c->reply_type != TYPE_LINE || !(p = memchr(c->line_reply,'\0',sizeof(c->line_reply)))) {
+ CLUSTER_RETURN_FALSE(c);
+ }
+
+ if (CLUSTER_IS_ATOMIC(c)) {
+ CLUSTER_RETURN_STRING(c, c->line_reply, p - c->line_reply);
+ } else {
+ add_next_index_stringl(&c->multi_resp, c->line_reply, p - c->line_reply);
+ }
+}
+
/* BULK response handler */
PHP_REDIS_API void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
void *ctx)
@@ -1799,12 +1829,15 @@ static void cluster_mbulk_variant_resp(clusterReply *r, zval *z_ret)
add_next_index_long(z_ret, r->integer);
break;
case TYPE_LINE:
- add_next_index_bool(z_ret, 1);
+ if (r->str) {
+ add_next_index_stringl(z_ret, r->str, r->len);
+ } else {
+ add_next_index_bool(z_ret, 1);
+ }
break;
case TYPE_BULK:
if (r->len > -1) {
add_next_index_stringl(z_ret, r->str, r->len);
- efree(r->str);
} else {
add_next_index_null(z_ret);
}
@@ -1827,15 +1860,16 @@ static void cluster_mbulk_variant_resp(clusterReply *r, zval *z_ret)
/* Variant response handling, for things like EVAL and various other responses
* where we just map the replies from Redis type values to PHP ones directly. */
-PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
- void *ctx)
+static void
+cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ int status_strings, void *ctx)
{
clusterReply *r;
zval zv, *z_arr = &zv;
int i;
// Make sure we can read it
- if ((r = cluster_read_resp(c TSRMLS_CC)) == NULL) {
+ if ((r = cluster_read_resp(c, status_strings TSRMLS_CC)) == NULL) {
CLUSTER_RETURN_FALSE(c);
}
@@ -1849,7 +1883,11 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust
RETVAL_FALSE;
break;
case TYPE_LINE:
- RETVAL_TRUE;
+ if (status_strings) {
+ RETVAL_STRINGL(r->str, r->len);
+ } else {
+ RETVAL_TRUE;
+ }
break;
case TYPE_BULK:
if (r->len < 0) {
@@ -1879,14 +1917,17 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust
add_next_index_bool(&c->multi_resp, 0);
break;
case TYPE_LINE:
- add_next_index_bool(&c->multi_resp, 1);
+ if (status_strings) {
+ add_next_index_stringl(&c->multi_resp, r->str, r->len);
+ } else {
+ add_next_index_bool(&c->multi_resp, 1);
+ }
break;
case TYPE_BULK:
if (r->len < 0) {
add_next_index_null(&c->multi_resp);
} else {
add_next_index_stringl(&c->multi_resp, r->str, r->len);
- efree(r->str);
}
break;
case TYPE_MULTIBULK:
@@ -1899,7 +1940,19 @@ PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisClust
}
// Free our response structs, but not allocated data itself
- cluster_free_reply(r, 0);
+ cluster_free_reply(r, 1);
+}
+
+PHP_REDIS_API void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ void *ctx)
+{
+ cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 0, ctx);
+}
+
+PHP_REDIS_API void cluster_variant_resp_strings(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
+ void *ctx)
+{
+ cluster_variant_resp_generic(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, 1, ctx);
}
/* Generic MULTI BULK response processor */
@@ -2052,6 +2105,76 @@ PHP_REDIS_API void cluster_client_list_resp(INTERNAL_FUNCTION_PARAMETERS, redisC
}
}
+/* XRANGE */
+PHP_REDIS_API void
+cluster_xrange_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
+ zval zv, *z_messages = &zv;
+
+ REDIS_MAKE_STD_ZVAL(z_messages);
+ array_init(z_messages);
+
+ c->cmd_sock->serializer = c->flags->serializer;
+ c->cmd_sock->compression = c->flags->compression;
+
+ if (redis_read_stream_messages(c->cmd_sock, c->reply_len, z_messages TSRMLS_CC) < 0) {
+ zval_dtor(z_messages);
+ REDIS_FREE_ZVAL(z_messages);
+ CLUSTER_RETURN_FALSE(c);
+ }
+
+ if (CLUSTER_IS_ATOMIC(c)) {
+ RETVAL_ZVAL(z_messages, 0, 1);
+ } else {
+ add_next_index_zval(&c->multi_resp, z_messages);
+ }
+}
+
+/* XREAD */
+PHP_REDIS_API void
+cluster_xread_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
+ zval zv, *z_streams = &zv;
+
+ REDIS_MAKE_STD_ZVAL(z_streams);
+ array_init(z_streams);
+
+ c->cmd_sock->serializer = c->flags->serializer;
+ c->cmd_sock->compression = c->flags->compression;
+
+ if (redis_read_stream_messages_multi(c->cmd_sock, c->reply_len, z_streams TSRMLS_CC) < 0) {
+ zval_dtor(z_streams);
+ REDIS_FREE_ZVAL(z_streams);
+ CLUSTER_RETURN_FALSE(c);
+ }
+
+ if (CLUSTER_IS_ATOMIC(c)) {
+ RETVAL_ZVAL(z_streams, 0, 1);
+ } else {
+ add_next_index_zval(&c->multi_resp, z_streams);
+ }
+}
+
+/* XCLAIM */
+PHP_REDIS_API void
+cluster_xclaim_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, void *ctx) {
+ zval zv, *z_msg = &zv;
+
+ REDIS_MAKE_STD_ZVAL(z_msg);
+ array_init(z_msg);
+
+ if (redis_read_xclaim_response(c->cmd_sock, c->reply_len, z_msg TSRMLS_CC) < 0) {
+ zval_dtor(z_msg);
+ REDIS_FREE_ZVAL(z_msg);
+ CLUSTER_RETURN_FALSE(c);
+ }
+
+ if (CLUSTER_IS_ATOMIC(c)) {
+ RETVAL_ZVAL(z_msg, 0, 1);
+ } else {
+ add_next_index_zval(&c->multi_resp, z_msg);
+ }
+
+}
+
/* MULTI BULK response loop where we might pull the next one */
PHP_REDIS_API zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
redisCluster *c, int pull, mbulk_cb cb, zval *z_ret)