Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormichael-grunder <michael.grunder@gmail.com>2015-02-26 18:43:20 +0300
committermichael-grunder <michael.grunder@gmail.com>2015-05-06 01:05:30 +0300
commit043b360651651a5a7c53a93f2de4cd5c4989b6dc (patch)
tree625ebb0b99176f9049d144159e15aee84ea0610b /cluster_library.c
parent0bb500c1e0f845cceaddca877504ec8c478837b7 (diff)
More updates for auto-failover logic
Diffstat (limited to 'cluster_library.c')
-rw-r--r--cluster_library.c544
1 files changed, 309 insertions, 235 deletions
diff --git a/cluster_library.c b/cluster_library.c
index 4fecbd94..cc40f37f 100644
--- a/cluster_library.c
+++ b/cluster_library.c
@@ -40,10 +40,6 @@ static void cluster_log(char *fmt, ...)
fprintf(stderr, "%s\n", buffer);
}
-/* Direct handling of variant replies, in a hiredis like way. These methods
- * are used for non userland facing commands, as well as passed through from
- * them when the reply is just variant (e.g. eval) */
-
/* Debug function to dump a clusterReply structure recursively */
static void dump_reply(clusterReply *reply, int indent) {
smart_str buf = {0};
@@ -171,10 +167,28 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements,
}
}
+/* Return the socket for a slot and slave index */
+static RedisSock *cluster_slot_sock(redisCluster *c, unsigned short slot,
+ ulong slaveidx)
+{
+ redisClusterNode **node;
+
+ /* Return the master if we're not looking for a slave */
+ if (slaveidx == 0) {
+ return SLOT_SOCK(c, slot);
+ }
+
+ /* Abort if we can't find this slave */
+ if (!SLOT_SLAVES(c, slot) || zend_hash_index_find(SLOT_SLAVES(c,slot),
+ slaveidx, (void**)&node)==FAILURE) return NULL;
+
+ /* Success, return the slave */
+ return (*node)->sock;
+}
+
/* Read the response from a cluster */
clusterReply *cluster_read_resp(redisCluster *c TSRMLS_DC) {
- return cluster_read_sock_resp(SLOT_SOCK(c,c->reply_slot), c->reply_type,
- c->reply_len TSRMLS_CC);
+ return cluster_read_sock_resp(c->cmd_sock,c->reply_type,c->reply_len TSRMLS_CC);
}
/* Read any sort of response from the socket, having already issued the
@@ -229,9 +243,78 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type,
return r;
}
-/* Cluster key distribution helpers. For a small handlful of commands, we want
+/*
+ * Helpers to send various 'control type commands to a specific node, e.g.
+ * MULTI, ASKING, READONLY, READWRITE, etc
+ */
+
+/* Send a command to the specific socket and validate reply type */
+static int cluster_send_direct(RedisSock *redis_sock, char *cmd, int cmd_len,
+ REDIS_REPLY_TYPE type TSRMLS_DC)
+{
+ /* Send the command and validate the reply type */
+ if (!CLUSTER_SEND_PAYLOAD(redis_sock,cmd,cmd_len) ||
+ !CLUSTER_VALIDATE_REPLY_TYPE(redis_sock, type)) return -1;
+
+ /* Success! */
+ return 0;
+}
+
+static int cluster_send_asking(RedisSock *redis_sock TSRMLS_DC) {
+ return cluster_send_direct(redis_sock, RESP_ASKING_CMD,
+ sizeof(RESP_ASKING_CMD)-1, TYPE_LINE TSRMLS_CC);
+}
+
+static int cluster_send_readonly(RedisSock *redis_sock TSRMLS_DC) {
+ int ret;
+
+ /* We don't have to do anything if we're already in readonly mode */
+ if (redis_sock->readonly) return 0;
+
+ /* Return success if we can send it */
+ ret = cluster_send_direct(redis_sock, RESP_READONLY_CMD,
+ sizeof(RESP_READONLY_CMD)-1, TYPE_LINE TSRMLS_CC);
+
+ /* Flag this socket as READONLY if our command worked */
+ redis_sock->readonly = !ret;
+
+ /* Return the result of our send */
+ return ret;
+}
+
+static int cluster_send_multi(redisCluster *c, short slot TSRMLS_DC) {
+ if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_MULTI_CMD,
+ sizeof(RESP_MULTI_CMD)-1, TYPE_LINE TSRMLS_CC)==0)
+ {
+ c->cmd_sock->mode = MULTI;
+ return 0;
+ }
+ return -1;
+}
+
+PHPAPI int cluster_send_exec(redisCluster *c, short slot TSRMLS_DC) {
+ if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_EXEC_CMD,
+ sizeof(RESP_EXEC_CMD)-1, TYPE_MULTIBULK TSRMLS_CC))
+ {
+ return c->reply_len;
+ }
+ return -1;
+}
+
+PHPAPI int cluster_send_discard(redisCluster *c, short slot TSRMLS_DC) {
+ if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_DISCARD_CMD,
+ sizeof(RESP_DISCARD_CMD)-1, TYPE_LINE TSRMLS_CC))
+ {
+ return 0;
+ }
+ return -1;
+}
+
+/*
+ * Cluster key distribution helpers. For a small handlful of commands, we want
* to distribute them across 1-N nodes. These methods provide simple containers
- * for the purposes of splitting keys/values in this way */
+ * for the purposes of splitting keys/values in this way
+ * */
/* Free cluster distribution list inside a HashTable */
static void cluster_dist_free_ht(void *p) {
@@ -312,7 +395,7 @@ int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key,
// Prefix our key and hash it
key_free = redis_key_prefix(c->flags, &key, &key_len);
- slot = cluster_hash_key(key, key_len);
+ slot = cluster_hash_key(key, key_len);
// We can't do this if we don't fully understand the keyspace
if(c->master[slot] == NULL) {
@@ -553,14 +636,19 @@ cluster_node_create(redisCluster *c, char *host, size_t host_len,
PHPAPI int
cluster_node_add_slave(redisClusterNode *master, redisClusterNode *slave)
{
+ ulong index;
+
// Allocate our slaves hash table if we haven't yet
if(!master->slaves) {
ALLOC_HASHTABLE(master->slaves);
zend_hash_init(master->slaves, 0, NULL, ht_free_slave, 0);
+ index = 1;
+ } else {
+ index = master->slaves->nNextFreeElement;
}
- return zend_hash_next_index_insert(master->slaves, (void*)&slave,
- sizeof(redisClusterNode*), NULL)!=SUCCESS;
+ return zend_hash_index_update(master->slaves, index, (void*)&slave,
+ sizeof(redisClusterNode*), NULL) != SUCCESS;
}
/* Sanity check/validation for CLUSTER SLOTS command */
@@ -646,64 +734,32 @@ PHPAPI void cluster_free_node(redisClusterNode *node) {
efree(node);
}
-/* When we're in an ASK redirection state, Redis Cluster wants us to send
- * the command only after starting our query with ASKING, or it'll just
- * bounce us back and forth until the slots have migrated */
-static int cluster_send_asking(RedisSock *redis_sock TSRMLS_DC)
-{
- char buf[255];
-
- // Make sure we can send the request
- if(redis_check_eof(redis_sock, 1 TSRMLS_CC) ||
- php_stream_write(redis_sock->stream, RESP_ASKING_CMD,
- sizeof(RESP_ASKING_CMD)-1) != sizeof(RESP_ASKING_CMD)-1)
- {
- return -1;
- }
-
- // Read our reply type
- if((redis_check_eof(redis_sock, 1 TSRMLS_CC) == - 1) ||
- (php_stream_getc(redis_sock->stream) != TYPE_LINE)) {
- return -1;
- }
-
- // Consume the rest of our response
- if(!php_stream_gets(redis_sock->stream, buf, sizeof(buf))) {
- return -1;
- }
-
- // Success
- return 0;
-}
-
-/* Get a RedisSock object from the host and port where we have been directed
- * from an ASK response. We'll first see if we have connected to this node
- * already, and return that. If not, we create it and add it to our nodes. */
-static RedisSock *cluster_get_asking_sock(redisCluster *c TSRMLS_DC) {
+/* Get or create a redisClusterNode that corresponds to the asking redirection */
+static redisClusterNode *cluster_get_asking_node(redisCluster *c TSRMLS_DC) {
redisClusterNode **ppNode;
char key[1024];
int key_len;
- // It'll be hashed as host:port in our nodes HashTable
- key_len = snprintf(key, sizeof(key), "%s:%u", c->redir_host,
- c->redir_port);
+ /* Hashed by host:port */
+ key_len = snprintf(key, sizeof(key), "%s:%u", c->redir_host, c->redir_port);
- // See if we've already attached to it
- if(zend_hash_find(c->nodes, key, key_len+1, (void**)&ppNode)==SUCCESS)
- {
- return (*ppNode)->sock;
+ /* See if we've already attached to it */
+ if (zend_hash_find(c->nodes, key, key_len+1, (void**)&ppNode) == SUCCESS) {
+ return *ppNode;
}
- // Create a redisClusterNode
+ /* This host:port is unknown to us, so add it */
*ppNode = cluster_node_create(c, c->redir_host, c->redir_host_len,
c->redir_port, c->redir_slot, 0);
- // Now add it to the nodes we have
- zend_hash_update(c->nodes, key, key_len+1, (void*)ppNode,
- sizeof(redisClusterNode*), NULL);
+ /* Return the node */
+ return *ppNode;
+}
- // Return the RedisSock
- return (*ppNode)->sock;
+/* Get or create a node at the host:port we were asked to check, and return the
+ * redis_sock for it. */
+static RedisSock *cluster_get_asking_sock(redisCluster *c TSRMLS_DC) {
+ return cluster_get_asking_node(c TSRMLS_CC)->sock;
}
/* Initialize seeds */
@@ -800,13 +856,9 @@ cluster_map_keyspace(redisCluster *c TSRMLS_DC) {
static int cluster_set_redirection(redisCluster* c, char *msg, int moved)
{
char *host, *port;
-
- // Move past MOVED or ASK
- if(moved) {
- msg += MOVED_LEN;
- } else {
- msg += ASK_LEN;
- }
+
+ /* Move past "MOVED" or "ASK */
+ msg += moved ? MOVED_LEN : ASK_LEN;
// We need a slot seperator
if(!(host = strchr(msg, ' '))) return -1;
@@ -837,8 +889,8 @@ static int cluster_set_redirection(redisCluster* c, char *msg, int moved)
*
* This function will return -1 on a critical error (e.g. parse/communication
* error, 0 if no redirection was encountered, and 1 if the data was moved. */
-static int cluster_check_response(redisCluster *c, unsigned short slot,
- REDIS_REPLY_TYPE *reply_type TSRMLS_DC)
+static int cluster_check_response(redisCluster *c, REDIS_REPLY_TYPE *reply_type
+ TSRMLS_DC)
{
size_t sz;
@@ -846,8 +898,8 @@ static int cluster_check_response(redisCluster *c, unsigned short slot,
CLUSTER_CLEAR_ERROR(c);
CLUSTER_CLEAR_REPLY(c);
- if(-1 == redis_check_eof(SLOT_SOCK(c,slot), 1 TSRMLS_CC) ||
- EOF == (*reply_type = php_stream_getc(SLOT_STREAM(c,slot))))
+ if(-1 == redis_check_eof(c->cmd_sock, 1 TSRMLS_CC) ||
+ EOF == (*reply_type = php_stream_getc(c->cmd_sock->stream)))
{
return -1;
}
@@ -858,7 +910,7 @@ static int cluster_check_response(redisCluster *c, unsigned short slot,
int moved;
// Attempt to read the error
- if(!php_stream_gets(SLOT_STREAM(c,slot), inbuf, sizeof(inbuf))) {
+ if(!php_stream_gets(c->cmd_sock->stream, inbuf, sizeof(inbuf))) {
return -1;
}
@@ -879,7 +931,7 @@ static int cluster_check_response(redisCluster *c, unsigned short slot,
}
// Fetch the first line of our response from Redis.
- if(redis_sock_gets(SLOT_SOCK(c,slot),c->line_reply,sizeof(c->line_reply),
+ if(redis_sock_gets(c->cmd_sock,c->line_reply,sizeof(c->line_reply),
&sz TSRMLS_CC)<0)
{
return -1;
@@ -910,36 +962,129 @@ PHPAPI void cluster_disconnect(redisCluster *c TSRMLS_DC) {
}
}
-/* Attempt to write to a cluster node. If the node is NULL (e.g. it's been
- * umapped, we keep falling back until we run out of nodes to try */
-static int cluster_sock_write(redisCluster *c, unsigned short slot,
- const char *cmd, size_t sz, int direct TSRMLS_DC)
+/* Fisher-Yates shuffle for integer array */
+static void fyshuffle(int *array, size_t len) {
+ int temp, n = len;
+ size_t r;
+
+ /* Randomize */
+ while (n > 1) {
+ r = ((int)((double)n-- * (rand() / (RAND_MAX+1.0))));
+ temp = array[n];
+ array[n] = array[r];
+ array[r] = temp;
+ };
+}
+
+/* This method attempts to write our command at random to the master and any
+ * attached slaves, until we either successufly do so, or fail. */
+static int cluster_dist_write(redisCluster *c, const char *cmd, size_t sz,
+ int nomaster TSRMLS_DC)
{
+ int i, count=1, *nodes;
RedisSock *redis_sock;
+
+ /* Allocate enough memory for the master and all of our slaves */
+ if (c->master[c->cmd_slot]->slaves) {
+ count += zend_hash_num_elements(c->master[c->cmd_slot]->slaves);
+ }
+ nodes = emalloc(sizeof(int)*count);
+
+ /* Populate our array with the master and each of it's slaves, then
+ * randomize them, so we will pick from the master or some slave. */
+ for (i = 0; i < count; i++) nodes[i] = i;
+ fyshuffle(nodes, count);
+
+ /* Iterate through our nodes until we find one we can write to or fail */
+ for (i = nomaster; i < count; i++) {
+ /* Get the slave for this index */
+ redis_sock = cluster_slot_sock(c, c->cmd_slot, nodes[i]);
+ if (!redis_sock) continue;
+
+ /* Connect to this node if we haven't already */
+ CLUSTER_LAZY_CONNECT(redis_sock);
+
+ /* If we're not on the master, attempt to send the READONLY commadn to
+ * this slave, and skip it if that fails */
+ if (nodes[i] == 0 || redis_sock->readonly ||
+ cluster_send_readonly(redis_sock TSRMLS_CC) == 0)
+ {
+ /* Attempt to send the command */
+ if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz)) {
+ c->cmd_sock = redis_sock;
+ efree(nodes);
+ return 0;
+ }
+ }
+ }
+
+ /* Clean up our shuffled array */
+ efree(nodes);
+
+ /* Couldn't send to the master or any slave */
+ return -1;
+}
+
+/* Attempt to write our command to the current c->cmd_sock socket. For write
+ * commands, we attempt to query the master for this slot, and in the event of
+ * a failure, try to query every remaining node for a redirection.
+ *
+ * If we're issuing a readonly command, we use one of three strategies, depending
+ * on our redisCluster->failover setting.
+ *
+ * REDIS_FAILOVER_NONE:
+ * The command is treated just like a write command, and will only be executed
+ * against the known master for this slot.
+ * REDIS_FAILOVER_ERROR:
+ * If we're unable to communicate with this slot's master, we attempt the query
+ * against any slaves (at random) that this master has.
+ * REDIS_FAILOVER_DISTRIBUTE:
+ * We pick at random from the master and any slaves it has. This option is
+ * used to load balance read queries against N slaves.
+ *
+ * Once we are able to find a node we can write to, we check for MOVED or
+ * ASKING redirection, such that the keyspace can be updated.
+*/
+static int cluster_sock_write(redisCluster *c, const char *cmd, size_t sz,
+ int direct TSRMLS_DC)
+{
redisClusterNode **seed_node;
+ RedisSock *redis_sock;
+ int failover;
- /* If we're not in ASK redirection, use the slot requested, otherwise send
- * our ASKING command and use the asking slot. */
- if(c->redir_type != REDIR_ASK) {
- redis_sock = SLOT_SOCK(c,slot);
- } else {
+ /* First try the socket requested */
+ redis_sock = c->cmd_sock;
+
+ /* Readonly is irrelevant if we're not configured to failover */
+ failover = c->readonly && c->failover != REDIS_FAILOVER_NONE ?
+ c->failover : REDIS_FAILOVER_NONE;
+
+ /* If in ASK redirection, get/create the node for that host:port, otherwise
+ * just use the command socket. */
+ if(c->redir_type == REDIR_ASK) {
redis_sock = cluster_get_asking_sock(c TSRMLS_CC);
if(cluster_send_asking(redis_sock TSRMLS_CC)<0) {
return -1;
}
}
- /* If the lazy_connect flag is still set, we've not actually connected to
- * this node, so do that now. */
- CLUSTER_LAZY_CONNECT(redis_sock);
-
- /* First attempt to write it to the slot that's been requested */
- if(redis_sock && redis_sock->stream &&
- !redis_check_eof(redis_sock, 1 TSRMLS_CC) &&
- php_stream_write(redis_sock->stream, cmd, sz)==sz)
- {
- // We were able to write it
- return slot;
+ /* Attempt to send our command payload to the cluster. If we're not set up
+ * to failover, just try the master. If we're configured to failover on
+ * error, try the master and then fall back to any slaves. When we're set
+ * up to distribute the commands, try to write to any node on this slot
+ * at random. */
+ if (failover == REDIS_FAILOVER_NONE) {
+ /* Success if we can send our payload to the master */
+ CLUSTER_LAZY_CONNECT(redis_sock);
+ if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz)) return 0;
+ } else if (failover == REDIS_FAILOVER_ERROR) {
+ /* Try the master, then fall back to any slaves we may have */
+ CLUSTER_LAZY_CONNECT(redis_sock);
+ if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz) ||
+ !cluster_dist_write(c, cmd, sz, 1 TSRMLS_CC)) return 0;
+ } else if (!cluster_dist_write(c, cmd, sz, 0 TSRMLS_CC)) {
+ /* We were able to write to a master or slave at random */
+ return 0;
}
/* Don't fall back if direct communication with this slot is required. */
@@ -953,20 +1098,17 @@ static int cluster_sock_write(redisCluster *c, unsigned short slot,
/* Grab node */
zend_hash_get_current_data(c->nodes, (void**)&seed_node);
- /* Skip this node if it's the one that failed */
- if((*seed_node)->sock == redis_sock) continue;
-
- /* Skip slave nodes */
- if((*seed_node)->slave) continue;
+ /* Skip this node if it's the one that failed, or if it's a slave */
+ if((*seed_node)->sock == redis_sock || (*seed_node)->slave) continue;
+ /* Connect to this node if we haven't already */
CLUSTER_LAZY_CONNECT((*seed_node)->sock);
/* Attempt to write our request to this node */
- if(!redis_check_eof((*seed_node)->sock, 1 TSRMLS_CC) &&
- php_stream_write((*seed_node)->sock->stream, cmd, sz)==sz)
- {
- /* Just return the first slot we think this node handles */
- return (*seed_node)->slot;
+ if (CLUSTER_SEND_PAYLOAD((*seed_node)->sock, cmd, sz)) {
+ c->cmd_slot = (*seed_node)->slot;
+ c->cmd_sock = (*seed_node)->sock;
+ return 0;
}
}
@@ -1038,44 +1180,6 @@ static void cluster_update_slot(redisCluster *c TSRMLS_DC) {
node->slave = 0;
}
-/* Send EXEC to a specific slot */
-PHPAPI int cluster_send_exec(redisCluster *c, short slot TSRMLS_DC) {
- /* We have to be able to write this to the slot requested */
- if(cluster_sock_write(c, slot, RESP_EXEC_CMD, sizeof(RESP_EXEC_CMD)-1, 1
- TSRMLS_CC)==-1)
- {
- return -1;
- }
-
- /* We have to get a proper response from the slot to continue */
- if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 ||
- c->reply_type != TYPE_MULTIBULK)
- {
- return -1;
- }
-
- // Return the number of multi-bulk replies
- return c->reply_len;
-}
-
-/* Send DISCARD to a specific slot */
-PHPAPI int cluster_send_discard(redisCluster *c, short slot TSRMLS_DC) {
- if(cluster_sock_write(c, slot, RESP_DISCARD_CMD, sizeof(RESP_DISCARD_CMD)-1,
- 1 TSRMLS_CC)==-1)
- {
- return -1;
- }
-
- if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 ||
- c->reply_type != TYPE_LINE)
- {
- return -1;
- }
-
- return 0;
-}
-
-
/* Abort any transaction in process, by sending DISCARD to any nodes that
* have active transactions in progress. If we can't send DISCARD, we need
* to disconnect as it would leave us in an undefined state. */
@@ -1102,27 +1206,6 @@ PHPAPI int cluster_abort_exec(redisCluster *c TSRMLS_DC) {
return 0;
}
-/* Send MULTI to a given slot and consume the response. If we can't send the
- * command OR we get an error in our response, we have to fail. */
-static int cluster_send_multi(redisCluster *c, short slot TSRMLS_DC) {
- /* We have to be able to communicate with the node we want */
- if(cluster_sock_write(c, slot, RESP_MULTI_CMD, sizeof(RESP_MULTI_CMD)-1, 1
- TSRMLS_CC)==-1)
- {
- return -1;
- }
-
- /* We have to get a proper response */
- if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 ||
- c->reply_type != TYPE_LINE)
- {
- return -1;
- }
-
- /* Success */
- return 0;
-}
-
/* Iterate through our slots, looking for the host/port in question. This
* should perform well enough as in almost all situations, a few or a few
* dozen servers will map all the slots */
@@ -1148,21 +1231,20 @@ PHPAPI short cluster_find_slot(redisCluster *c, const char *host,
PHPAPI int cluster_send_slot(redisCluster *c, short slot, char *cmd,
int cmd_len, REDIS_REPLY_TYPE rtype TSRMLS_DC)
{
- // Try only this node
- if(cluster_sock_write(c, slot, cmd, cmd_len, 1 TSRMLS_CC)==-1) {
- return -1;
- }
-
- // Check our response and verify the type unless passed in as TYPE_EOF
- if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 ||
- (rtype != TYPE_EOF && rtype != c->reply_type))
- {
+ /* Point our cluster to this slot and it's socket */
+ c->cmd_slot = slot;
+ c->cmd_sock = SLOT_SOCK(c, slot);
+
+ /* Try the slot */
+ if(cluster_sock_write(c, cmd, cmd_len, 1 TSRMLS_CC)==-1) {
return -1;
}
- // Update our reply slot
- c->reply_slot = slot;
+ /* Check our response */
+ if(cluster_check_response(c, &c->reply_type TSRMLS_CC)!=0 ||
+ (rtype != TYPE_EOF && rtype != c->reply_type)) return -1;
+ /* Success */
return 0;
}
@@ -1174,60 +1256,59 @@ PHPAPI short cluster_send_command(redisCluster *c, short slot, const char *cmd,
int resp, timedout=0;
long msstart;
- /* Grab the current time in milliseconds */
+ /* Set the slot we're operating against as well as it's socket. These can
+ * change during our request loop if we have a master failure and are
+ * configured to fall back to slave nodes, or if we have to fall back to
+ * a different slot due to no nodes serving this slot being reachable. */
+ c->cmd_slot = slot;
+ c->cmd_sock = SLOT_SOCK(c, slot);
+
+ /* Get the current time in milliseconds to handle any timeout */
msstart = mstime();
- /* Our main cluster request/reply loop. This loop runs until we're able
- * to get a valid reply from a node, hit our "request" timeout, or encounter
- * a CLUSTERDOWN state from Redis cluster. */
+ /* Our main cluster request/reply loop. This loop runs until we're able to
+ * get a valid reply from a node, hit our "request" timeout, or enounter a
+ * CLUSTERDOWN state from Redis Cluster. */
do {
- /* Send MULTI to the node if we haven't yet. */
- if(c->flags->mode == MULTI && SLOT_SOCK(c,slot)->mode != MULTI) {
+ /* Send MULTI to the socket if we're in MULTI mode but haven't yet */
+ if (c->flags->mode == MULTI && CMD_SOCK(c)->mode != MULTI) {
/* We have to fail if we can't send MULTI to the node */
- if(cluster_send_multi(c, slot TSRMLS_CC)==-1) {
+ if (cluster_send_multi(c, slot TSRMLS_CC) == -1) {
zend_throw_exception(redis_cluster_exception_ce,
- "Unable to enter MULTI mode on required slot",
+ "Unable to enter MULTI mode on requested slot",
0 TSRMLS_CC);
return -1;
}
-
- /* This node is now inside a transaction */
- SLOT_SOCK(c,slot)->mode = MULTI;
}
- /* Attempt to send the command to the slot requested */
- if((slot = cluster_sock_write(c, slot, cmd, cmd_len, 0 TSRMLS_CC))==-1)
- {
- /* We have no choice but to throw an exception. We can't communicate
- * with any node at all. */
+ /* Attempt to deliver our command to the node, and that failing, to any
+ * node until we find one that is available. */
+ if (cluster_sock_write(c, cmd, cmd_len, 0 TSRMLS_CC) == -1) {
+ /* We have to abort, as no nodes are reachable */
zend_throw_exception(redis_cluster_exception_ce,
"Can't communicate with any node in the cluster",
0 TSRMLS_CC);
return -1;
}
- /* Check the response from the slot we ended up querying. */
- resp = cluster_check_response(c, slot, &c->reply_type TSRMLS_CC);
+ /* Now check the response from the node we queried. */
+ resp = cluster_check_response(c, &c->reply_type TSRMLS_CC);
/* Handle MOVED or ASKING redirection */
- if(resp == 1) {
- /* If we get a MOVED response inside of a transaction, we have to
- abort, because the transaction would be invalid. */
- if(c->flags->mode == MULTI) {
- zend_throw_exception(redis_cluster_exception_ce,
- "Can't process MULTI sequence when cluster is resharding",
- 0 TSRMLS_CC);
- return -1;
- }
-
- // In case of a MOVED redirection, update our node mapping
- if(c->redir_type == REDIR_MOVED) {
- cluster_update_slot(c TSRMLS_CC);
- }
- slot = c->redir_slot;
+ if (resp == 1) {
+ /* Abort if we're in a transaction as it will be invalid */
+ if (c->flags->mode == MULTI) {
+ zend_throw_exception(redis_cluster_exception_ce,
+ "Can't process MULTI sequence when cluster is resharding",
+ 0 TSRMLS_CC);
+ return -1;
+ }
+
+ /* Update mapping if the data has MOVED */
+ if (c->redir_type == REDIR_MOVED) cluster_update_slot(c TSRMLS_CC);
}
- /* If we didn't get a valid response see if we've now timed out */
+ /* Figure out if we've timed out trying to read or write the data */
timedout = resp && c->waitms ? mstime() - msstart >= c->waitms : 0;
} while(resp != 0 && !c->clusterdown && !timedout);
@@ -1241,9 +1322,7 @@ PHPAPI short cluster_send_command(redisCluster *c, short slot, const char *cmd,
"Timed out attempting to find data in the correct node!", 0 TSRMLS_CC);
}
- // Inform the cluster where to read the rest of our response,
- // and clear out redirection flag.
- c->reply_slot = slot;
+ /* Clear redirection flag */
c->redir_type = REDIR_NONE;
// Success, return the slot where data exists.
@@ -1266,8 +1345,7 @@ PHPAPI void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS,
// Make sure we can read the response
if(c->reply_type != TYPE_BULK ||
- (resp = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot),
- c->reply_len TSRMLS_CC))==NULL)
+ (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len TSRMLS_CC))==NULL)
{
if(c->flags->mode != MULTI) {
RETURN_FALSE;
@@ -1289,8 +1367,7 @@ PHPAPI void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
// Make sure we can read the response
if(c->reply_type != TYPE_BULK ||
- (resp = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot),
- c->reply_len TSRMLS_CC))==NULL)
+ (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len TSRMLS_CC))==NULL)
{
CLUSTER_RETURN_FALSE(c);
}
@@ -1317,8 +1394,7 @@ PHPAPI void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
// Make sure we can read the response
if(c->reply_type != TYPE_BULK ||
- (resp = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot),
- c->reply_len TSRMLS_CC))==NULL)
+ (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len TSRMLS_CC))==NULL)
{
CLUSTER_RETURN_FALSE(c);
}
@@ -1441,7 +1517,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
sctx->cb.no_separation = 0;
/* We're in a subscribe loop */
- c->subscribed_slot = c->reply_slot;
+ c->subscribed_slot = c->cmd_slot;
/* Multibulk response, {[pattern], type, channel, payload} */
while(1) {
@@ -1700,9 +1776,7 @@ PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
array_init(z_result);
// Call our specified callback
- if(cb(SLOT_SOCK(c,c->reply_slot), z_result, c->reply_len, ctx TSRMLS_CC)
- ==FAILURE)
- {
+ if(cb(c->cmd_sock, z_result, c->reply_len, ctx TSRMLS_CC)==FAILURE) {
zval_dtor(z_result);
FREE_ZVAL(z_result);
CLUSTER_RETURN_FALSE(c);
@@ -1730,15 +1804,14 @@ PHPAPI int cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
}
// Read the BULK size
- if(cluster_check_response(c, c->reply_slot, &c->reply_type TSRMLS_CC),0 ||
+ if(cluster_check_response(c, &c->reply_type TSRMLS_CC),0 ||
c->reply_type != TYPE_BULK)
{
return FAILURE;
}
// Read the iterator
- if((pit = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot),
- c->reply_len TSRMLS_CC))==NULL)
+ if((pit = redis_sock_read_bulk_reply(c->cmd_sock,c->reply_len TSRMLS_CC))==NULL)
{
return FAILURE;
}
@@ -1748,7 +1821,7 @@ PHPAPI int cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
efree(pit);
// We'll need another MULTIBULK response for the payload
- if(cluster_check_response(c, c->reply_slot, &c->reply_type TSRMLS_CC)<0)
+ if(cluster_check_response(c, &c->reply_type TSRMLS_CC)<0)
{
return FAILURE;
}
@@ -1783,8 +1856,7 @@ PHPAPI void cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
char *info;
// Read our bulk response
- if((info = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot),
- c->reply_len TSRMLS_CC))==NULL)
+ if((info = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len TSRMLS_CC))==NULL)
{
CLUSTER_RETURN_FALSE(c);
}
@@ -1810,8 +1882,7 @@ PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
// Pull our next response if directed
if(pull) {
- if(cluster_check_response(c, c->reply_slot, &c->reply_type
- TSRMLS_CC)<0)
+ if(cluster_check_response(c, &c->reply_type TSRMLS_CC)<0)
{
return NULL;
}
@@ -1826,9 +1897,7 @@ PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
array_init(z_result);
// Call our callback
- if(cb(SLOT_SOCK(c,c->reply_slot), z_result, c->reply_len, NULL TSRMLS_CC)
- ==FAILURE)
- {
+ if(cb(c->cmd_sock, z_result, c->reply_len, NULL TSRMLS_CC)==FAILURE) {
zval_dtor(z_result);
FREE_ZVAL(z_result);
return NULL;
@@ -1846,13 +1915,18 @@ PHPAPI void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
clusterFoldItem *fi = c->multi_head;
while(fi) {
- if(cluster_check_response(c, fi->slot, &c->reply_type TSRMLS_CC)<0) {
+ /* Set the slot where we should look for responses. We don't allow
+ * failover inside a transaction, so it will be the master we have
+ * mapped. */
+ c->cmd_slot = fi->slot;
+ c->cmd_sock = SLOT_SOCK(c, fi->slot);
+
+ if(cluster_check_response(c, &c->reply_type TSRMLS_CC)<0) {
zval_dtor(c->multi_resp);
efree(c->multi_resp);
RETURN_FALSE;
}
- c->reply_slot = fi->slot;
fi->callback(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, fi->ctx);
fi = fi->next;
}
@@ -1869,11 +1943,10 @@ PHPAPI void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS,
{
clusterMultiCtx *mctx = (clusterMultiCtx*)ctx;
- // Protect against an invalid response type, -1 response length, and failure
- // to consume the responses.
+ /* Protect against an invalid response type, -1 response length, and failure
+ * to consume the responses. */
short fail = c->reply_type != TYPE_MULTIBULK || c->reply_len == -1 ||
- mbulk_resp_loop(SLOT_SOCK(c,c->reply_slot), mctx->z_multi,
- c->reply_len, NULL TSRMLS_CC)==FAILURE;
+ mbulk_resp_loop(c->cmd_sock, mctx->z_multi, c->reply_len, NULL TSRMLS_CC)==FAILURE;
// If we had a failure, pad results with FALSE to indicate failure. Non
// existant keys (e.g. for MGET will come back as NULL)
@@ -2203,4 +2276,5 @@ int mbulk_resp_loop_assoc(RedisSock *redis_sock, zval *z_result,
// Success!
return SUCCESS;
}
+
/* vim: set tabstop=4 softtabstops=4 noexpandtab shiftwidth=4: */