diff options
author | michael-grunder <michael.grunder@gmail.com> | 2015-02-26 18:43:20 +0300 |
---|---|---|
committer | michael-grunder <michael.grunder@gmail.com> | 2015-05-06 01:05:30 +0300 |
commit | 043b360651651a5a7c53a93f2de4cd5c4989b6dc (patch) | |
tree | 625ebb0b99176f9049d144159e15aee84ea0610b /cluster_library.c | |
parent | 0bb500c1e0f845cceaddca877504ec8c478837b7 (diff) |
More updates for auto-failover logic
Diffstat (limited to 'cluster_library.c')
-rw-r--r-- | cluster_library.c | 544 |
1 files changed, 309 insertions, 235 deletions
diff --git a/cluster_library.c b/cluster_library.c index 4fecbd94..cc40f37f 100644 --- a/cluster_library.c +++ b/cluster_library.c @@ -40,10 +40,6 @@ static void cluster_log(char *fmt, ...) fprintf(stderr, "%s\n", buffer); } -/* Direct handling of variant replies, in a hiredis like way. These methods - * are used for non userland facing commands, as well as passed through from - * them when the reply is just variant (e.g. eval) */ - /* Debug function to dump a clusterReply structure recursively */ static void dump_reply(clusterReply *reply, int indent) { smart_str buf = {0}; @@ -171,10 +167,28 @@ cluster_multibulk_resp_recursive(RedisSock *sock, size_t elements, } } +/* Return the socket for a slot and slave index */ +static RedisSock *cluster_slot_sock(redisCluster *c, unsigned short slot, + ulong slaveidx) +{ + redisClusterNode **node; + + /* Return the master if we're not looking for a slave */ + if (slaveidx == 0) { + return SLOT_SOCK(c, slot); + } + + /* Abort if we can't find this slave */ + if (!SLOT_SLAVES(c, slot) || zend_hash_index_find(SLOT_SLAVES(c,slot), + slaveidx, (void**)&node)==FAILURE) return NULL; + + /* Success, return the slave */ + return (*node)->sock; +} + /* Read the response from a cluster */ clusterReply *cluster_read_resp(redisCluster *c TSRMLS_DC) { - return cluster_read_sock_resp(SLOT_SOCK(c,c->reply_slot), c->reply_type, - c->reply_len TSRMLS_CC); + return cluster_read_sock_resp(c->cmd_sock,c->reply_type,c->reply_len TSRMLS_CC); } /* Read any sort of response from the socket, having already issued the @@ -229,9 +243,78 @@ cluster_read_sock_resp(RedisSock *redis_sock, REDIS_REPLY_TYPE type, return r; } -/* Cluster key distribution helpers. For a small handlful of commands, we want +/* + * Helpers to send various 'control type commands to a specific node, e.g. + * MULTI, ASKING, READONLY, READWRITE, etc + */ + +/* Send a command to the specific socket and validate reply type */ +static int cluster_send_direct(RedisSock *redis_sock, char *cmd, int cmd_len, + REDIS_REPLY_TYPE type TSRMLS_DC) +{ + /* Send the command and validate the reply type */ + if (!CLUSTER_SEND_PAYLOAD(redis_sock,cmd,cmd_len) || + !CLUSTER_VALIDATE_REPLY_TYPE(redis_sock, type)) return -1; + + /* Success! */ + return 0; +} + +static int cluster_send_asking(RedisSock *redis_sock TSRMLS_DC) { + return cluster_send_direct(redis_sock, RESP_ASKING_CMD, + sizeof(RESP_ASKING_CMD)-1, TYPE_LINE TSRMLS_CC); +} + +static int cluster_send_readonly(RedisSock *redis_sock TSRMLS_DC) { + int ret; + + /* We don't have to do anything if we're already in readonly mode */ + if (redis_sock->readonly) return 0; + + /* Return success if we can send it */ + ret = cluster_send_direct(redis_sock, RESP_READONLY_CMD, + sizeof(RESP_READONLY_CMD)-1, TYPE_LINE TSRMLS_CC); + + /* Flag this socket as READONLY if our command worked */ + redis_sock->readonly = !ret; + + /* Return the result of our send */ + return ret; +} + +static int cluster_send_multi(redisCluster *c, short slot TSRMLS_DC) { + if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_MULTI_CMD, + sizeof(RESP_MULTI_CMD)-1, TYPE_LINE TSRMLS_CC)==0) + { + c->cmd_sock->mode = MULTI; + return 0; + } + return -1; +} + +PHPAPI int cluster_send_exec(redisCluster *c, short slot TSRMLS_DC) { + if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_EXEC_CMD, + sizeof(RESP_EXEC_CMD)-1, TYPE_MULTIBULK TSRMLS_CC)) + { + return c->reply_len; + } + return -1; +} + +PHPAPI int cluster_send_discard(redisCluster *c, short slot TSRMLS_DC) { + if (cluster_send_direct(SLOT_SOCK(c,slot), RESP_DISCARD_CMD, + sizeof(RESP_DISCARD_CMD)-1, TYPE_LINE TSRMLS_CC)) + { + return 0; + } + return -1; +} + +/* + * Cluster key distribution helpers. For a small handlful of commands, we want * to distribute them across 1-N nodes. These methods provide simple containers - * for the purposes of splitting keys/values in this way */ + * for the purposes of splitting keys/values in this way + * */ /* Free cluster distribution list inside a HashTable */ static void cluster_dist_free_ht(void *p) { @@ -312,7 +395,7 @@ int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key, // Prefix our key and hash it key_free = redis_key_prefix(c->flags, &key, &key_len); - slot = cluster_hash_key(key, key_len); + slot = cluster_hash_key(key, key_len); // We can't do this if we don't fully understand the keyspace if(c->master[slot] == NULL) { @@ -553,14 +636,19 @@ cluster_node_create(redisCluster *c, char *host, size_t host_len, PHPAPI int cluster_node_add_slave(redisClusterNode *master, redisClusterNode *slave) { + ulong index; + // Allocate our slaves hash table if we haven't yet if(!master->slaves) { ALLOC_HASHTABLE(master->slaves); zend_hash_init(master->slaves, 0, NULL, ht_free_slave, 0); + index = 1; + } else { + index = master->slaves->nNextFreeElement; } - return zend_hash_next_index_insert(master->slaves, (void*)&slave, - sizeof(redisClusterNode*), NULL)!=SUCCESS; + return zend_hash_index_update(master->slaves, index, (void*)&slave, + sizeof(redisClusterNode*), NULL) != SUCCESS; } /* Sanity check/validation for CLUSTER SLOTS command */ @@ -646,64 +734,32 @@ PHPAPI void cluster_free_node(redisClusterNode *node) { efree(node); } -/* When we're in an ASK redirection state, Redis Cluster wants us to send - * the command only after starting our query with ASKING, or it'll just - * bounce us back and forth until the slots have migrated */ -static int cluster_send_asking(RedisSock *redis_sock TSRMLS_DC) -{ - char buf[255]; - - // Make sure we can send the request - if(redis_check_eof(redis_sock, 1 TSRMLS_CC) || - php_stream_write(redis_sock->stream, RESP_ASKING_CMD, - sizeof(RESP_ASKING_CMD)-1) != sizeof(RESP_ASKING_CMD)-1) - { - return -1; - } - - // Read our reply type - if((redis_check_eof(redis_sock, 1 TSRMLS_CC) == - 1) || - (php_stream_getc(redis_sock->stream) != TYPE_LINE)) { - return -1; - } - - // Consume the rest of our response - if(!php_stream_gets(redis_sock->stream, buf, sizeof(buf))) { - return -1; - } - - // Success - return 0; -} - -/* Get a RedisSock object from the host and port where we have been directed - * from an ASK response. We'll first see if we have connected to this node - * already, and return that. If not, we create it and add it to our nodes. */ -static RedisSock *cluster_get_asking_sock(redisCluster *c TSRMLS_DC) { +/* Get or create a redisClusterNode that corresponds to the asking redirection */ +static redisClusterNode *cluster_get_asking_node(redisCluster *c TSRMLS_DC) { redisClusterNode **ppNode; char key[1024]; int key_len; - // It'll be hashed as host:port in our nodes HashTable - key_len = snprintf(key, sizeof(key), "%s:%u", c->redir_host, - c->redir_port); + /* Hashed by host:port */ + key_len = snprintf(key, sizeof(key), "%s:%u", c->redir_host, c->redir_port); - // See if we've already attached to it - if(zend_hash_find(c->nodes, key, key_len+1, (void**)&ppNode)==SUCCESS) - { - return (*ppNode)->sock; + /* See if we've already attached to it */ + if (zend_hash_find(c->nodes, key, key_len+1, (void**)&ppNode) == SUCCESS) { + return *ppNode; } - // Create a redisClusterNode + /* This host:port is unknown to us, so add it */ *ppNode = cluster_node_create(c, c->redir_host, c->redir_host_len, c->redir_port, c->redir_slot, 0); - // Now add it to the nodes we have - zend_hash_update(c->nodes, key, key_len+1, (void*)ppNode, - sizeof(redisClusterNode*), NULL); + /* Return the node */ + return *ppNode; +} - // Return the RedisSock - return (*ppNode)->sock; +/* Get or create a node at the host:port we were asked to check, and return the + * redis_sock for it. */ +static RedisSock *cluster_get_asking_sock(redisCluster *c TSRMLS_DC) { + return cluster_get_asking_node(c TSRMLS_CC)->sock; } /* Initialize seeds */ @@ -800,13 +856,9 @@ cluster_map_keyspace(redisCluster *c TSRMLS_DC) { static int cluster_set_redirection(redisCluster* c, char *msg, int moved) { char *host, *port; - - // Move past MOVED or ASK - if(moved) { - msg += MOVED_LEN; - } else { - msg += ASK_LEN; - } + + /* Move past "MOVED" or "ASK */ + msg += moved ? MOVED_LEN : ASK_LEN; // We need a slot seperator if(!(host = strchr(msg, ' '))) return -1; @@ -837,8 +889,8 @@ static int cluster_set_redirection(redisCluster* c, char *msg, int moved) * * This function will return -1 on a critical error (e.g. parse/communication * error, 0 if no redirection was encountered, and 1 if the data was moved. */ -static int cluster_check_response(redisCluster *c, unsigned short slot, - REDIS_REPLY_TYPE *reply_type TSRMLS_DC) +static int cluster_check_response(redisCluster *c, REDIS_REPLY_TYPE *reply_type + TSRMLS_DC) { size_t sz; @@ -846,8 +898,8 @@ static int cluster_check_response(redisCluster *c, unsigned short slot, CLUSTER_CLEAR_ERROR(c); CLUSTER_CLEAR_REPLY(c); - if(-1 == redis_check_eof(SLOT_SOCK(c,slot), 1 TSRMLS_CC) || - EOF == (*reply_type = php_stream_getc(SLOT_STREAM(c,slot)))) + if(-1 == redis_check_eof(c->cmd_sock, 1 TSRMLS_CC) || + EOF == (*reply_type = php_stream_getc(c->cmd_sock->stream))) { return -1; } @@ -858,7 +910,7 @@ static int cluster_check_response(redisCluster *c, unsigned short slot, int moved; // Attempt to read the error - if(!php_stream_gets(SLOT_STREAM(c,slot), inbuf, sizeof(inbuf))) { + if(!php_stream_gets(c->cmd_sock->stream, inbuf, sizeof(inbuf))) { return -1; } @@ -879,7 +931,7 @@ static int cluster_check_response(redisCluster *c, unsigned short slot, } // Fetch the first line of our response from Redis. - if(redis_sock_gets(SLOT_SOCK(c,slot),c->line_reply,sizeof(c->line_reply), + if(redis_sock_gets(c->cmd_sock,c->line_reply,sizeof(c->line_reply), &sz TSRMLS_CC)<0) { return -1; @@ -910,36 +962,129 @@ PHPAPI void cluster_disconnect(redisCluster *c TSRMLS_DC) { } } -/* Attempt to write to a cluster node. If the node is NULL (e.g. it's been - * umapped, we keep falling back until we run out of nodes to try */ -static int cluster_sock_write(redisCluster *c, unsigned short slot, - const char *cmd, size_t sz, int direct TSRMLS_DC) +/* Fisher-Yates shuffle for integer array */ +static void fyshuffle(int *array, size_t len) { + int temp, n = len; + size_t r; + + /* Randomize */ + while (n > 1) { + r = ((int)((double)n-- * (rand() / (RAND_MAX+1.0)))); + temp = array[n]; + array[n] = array[r]; + array[r] = temp; + }; +} + +/* This method attempts to write our command at random to the master and any + * attached slaves, until we either successufly do so, or fail. */ +static int cluster_dist_write(redisCluster *c, const char *cmd, size_t sz, + int nomaster TSRMLS_DC) { + int i, count=1, *nodes; RedisSock *redis_sock; + + /* Allocate enough memory for the master and all of our slaves */ + if (c->master[c->cmd_slot]->slaves) { + count += zend_hash_num_elements(c->master[c->cmd_slot]->slaves); + } + nodes = emalloc(sizeof(int)*count); + + /* Populate our array with the master and each of it's slaves, then + * randomize them, so we will pick from the master or some slave. */ + for (i = 0; i < count; i++) nodes[i] = i; + fyshuffle(nodes, count); + + /* Iterate through our nodes until we find one we can write to or fail */ + for (i = nomaster; i < count; i++) { + /* Get the slave for this index */ + redis_sock = cluster_slot_sock(c, c->cmd_slot, nodes[i]); + if (!redis_sock) continue; + + /* Connect to this node if we haven't already */ + CLUSTER_LAZY_CONNECT(redis_sock); + + /* If we're not on the master, attempt to send the READONLY commadn to + * this slave, and skip it if that fails */ + if (nodes[i] == 0 || redis_sock->readonly || + cluster_send_readonly(redis_sock TSRMLS_CC) == 0) + { + /* Attempt to send the command */ + if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz)) { + c->cmd_sock = redis_sock; + efree(nodes); + return 0; + } + } + } + + /* Clean up our shuffled array */ + efree(nodes); + + /* Couldn't send to the master or any slave */ + return -1; +} + +/* Attempt to write our command to the current c->cmd_sock socket. For write + * commands, we attempt to query the master for this slot, and in the event of + * a failure, try to query every remaining node for a redirection. + * + * If we're issuing a readonly command, we use one of three strategies, depending + * on our redisCluster->failover setting. + * + * REDIS_FAILOVER_NONE: + * The command is treated just like a write command, and will only be executed + * against the known master for this slot. + * REDIS_FAILOVER_ERROR: + * If we're unable to communicate with this slot's master, we attempt the query + * against any slaves (at random) that this master has. + * REDIS_FAILOVER_DISTRIBUTE: + * We pick at random from the master and any slaves it has. This option is + * used to load balance read queries against N slaves. + * + * Once we are able to find a node we can write to, we check for MOVED or + * ASKING redirection, such that the keyspace can be updated. +*/ +static int cluster_sock_write(redisCluster *c, const char *cmd, size_t sz, + int direct TSRMLS_DC) +{ redisClusterNode **seed_node; + RedisSock *redis_sock; + int failover; - /* If we're not in ASK redirection, use the slot requested, otherwise send - * our ASKING command and use the asking slot. */ - if(c->redir_type != REDIR_ASK) { - redis_sock = SLOT_SOCK(c,slot); - } else { + /* First try the socket requested */ + redis_sock = c->cmd_sock; + + /* Readonly is irrelevant if we're not configured to failover */ + failover = c->readonly && c->failover != REDIS_FAILOVER_NONE ? + c->failover : REDIS_FAILOVER_NONE; + + /* If in ASK redirection, get/create the node for that host:port, otherwise + * just use the command socket. */ + if(c->redir_type == REDIR_ASK) { redis_sock = cluster_get_asking_sock(c TSRMLS_CC); if(cluster_send_asking(redis_sock TSRMLS_CC)<0) { return -1; } } - /* If the lazy_connect flag is still set, we've not actually connected to - * this node, so do that now. */ - CLUSTER_LAZY_CONNECT(redis_sock); - - /* First attempt to write it to the slot that's been requested */ - if(redis_sock && redis_sock->stream && - !redis_check_eof(redis_sock, 1 TSRMLS_CC) && - php_stream_write(redis_sock->stream, cmd, sz)==sz) - { - // We were able to write it - return slot; + /* Attempt to send our command payload to the cluster. If we're not set up + * to failover, just try the master. If we're configured to failover on + * error, try the master and then fall back to any slaves. When we're set + * up to distribute the commands, try to write to any node on this slot + * at random. */ + if (failover == REDIS_FAILOVER_NONE) { + /* Success if we can send our payload to the master */ + CLUSTER_LAZY_CONNECT(redis_sock); + if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz)) return 0; + } else if (failover == REDIS_FAILOVER_ERROR) { + /* Try the master, then fall back to any slaves we may have */ + CLUSTER_LAZY_CONNECT(redis_sock); + if (CLUSTER_SEND_PAYLOAD(redis_sock, cmd, sz) || + !cluster_dist_write(c, cmd, sz, 1 TSRMLS_CC)) return 0; + } else if (!cluster_dist_write(c, cmd, sz, 0 TSRMLS_CC)) { + /* We were able to write to a master or slave at random */ + return 0; } /* Don't fall back if direct communication with this slot is required. */ @@ -953,20 +1098,17 @@ static int cluster_sock_write(redisCluster *c, unsigned short slot, /* Grab node */ zend_hash_get_current_data(c->nodes, (void**)&seed_node); - /* Skip this node if it's the one that failed */ - if((*seed_node)->sock == redis_sock) continue; - - /* Skip slave nodes */ - if((*seed_node)->slave) continue; + /* Skip this node if it's the one that failed, or if it's a slave */ + if((*seed_node)->sock == redis_sock || (*seed_node)->slave) continue; + /* Connect to this node if we haven't already */ CLUSTER_LAZY_CONNECT((*seed_node)->sock); /* Attempt to write our request to this node */ - if(!redis_check_eof((*seed_node)->sock, 1 TSRMLS_CC) && - php_stream_write((*seed_node)->sock->stream, cmd, sz)==sz) - { - /* Just return the first slot we think this node handles */ - return (*seed_node)->slot; + if (CLUSTER_SEND_PAYLOAD((*seed_node)->sock, cmd, sz)) { + c->cmd_slot = (*seed_node)->slot; + c->cmd_sock = (*seed_node)->sock; + return 0; } } @@ -1038,44 +1180,6 @@ static void cluster_update_slot(redisCluster *c TSRMLS_DC) { node->slave = 0; } -/* Send EXEC to a specific slot */ -PHPAPI int cluster_send_exec(redisCluster *c, short slot TSRMLS_DC) { - /* We have to be able to write this to the slot requested */ - if(cluster_sock_write(c, slot, RESP_EXEC_CMD, sizeof(RESP_EXEC_CMD)-1, 1 - TSRMLS_CC)==-1) - { - return -1; - } - - /* We have to get a proper response from the slot to continue */ - if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 || - c->reply_type != TYPE_MULTIBULK) - { - return -1; - } - - // Return the number of multi-bulk replies - return c->reply_len; -} - -/* Send DISCARD to a specific slot */ -PHPAPI int cluster_send_discard(redisCluster *c, short slot TSRMLS_DC) { - if(cluster_sock_write(c, slot, RESP_DISCARD_CMD, sizeof(RESP_DISCARD_CMD)-1, - 1 TSRMLS_CC)==-1) - { - return -1; - } - - if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 || - c->reply_type != TYPE_LINE) - { - return -1; - } - - return 0; -} - - /* Abort any transaction in process, by sending DISCARD to any nodes that * have active transactions in progress. If we can't send DISCARD, we need * to disconnect as it would leave us in an undefined state. */ @@ -1102,27 +1206,6 @@ PHPAPI int cluster_abort_exec(redisCluster *c TSRMLS_DC) { return 0; } -/* Send MULTI to a given slot and consume the response. If we can't send the - * command OR we get an error in our response, we have to fail. */ -static int cluster_send_multi(redisCluster *c, short slot TSRMLS_DC) { - /* We have to be able to communicate with the node we want */ - if(cluster_sock_write(c, slot, RESP_MULTI_CMD, sizeof(RESP_MULTI_CMD)-1, 1 - TSRMLS_CC)==-1) - { - return -1; - } - - /* We have to get a proper response */ - if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 || - c->reply_type != TYPE_LINE) - { - return -1; - } - - /* Success */ - return 0; -} - /* Iterate through our slots, looking for the host/port in question. This * should perform well enough as in almost all situations, a few or a few * dozen servers will map all the slots */ @@ -1148,21 +1231,20 @@ PHPAPI short cluster_find_slot(redisCluster *c, const char *host, PHPAPI int cluster_send_slot(redisCluster *c, short slot, char *cmd, int cmd_len, REDIS_REPLY_TYPE rtype TSRMLS_DC) { - // Try only this node - if(cluster_sock_write(c, slot, cmd, cmd_len, 1 TSRMLS_CC)==-1) { - return -1; - } - - // Check our response and verify the type unless passed in as TYPE_EOF - if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 || - (rtype != TYPE_EOF && rtype != c->reply_type)) - { + /* Point our cluster to this slot and it's socket */ + c->cmd_slot = slot; + c->cmd_sock = SLOT_SOCK(c, slot); + + /* Try the slot */ + if(cluster_sock_write(c, cmd, cmd_len, 1 TSRMLS_CC)==-1) { return -1; } - // Update our reply slot - c->reply_slot = slot; + /* Check our response */ + if(cluster_check_response(c, &c->reply_type TSRMLS_CC)!=0 || + (rtype != TYPE_EOF && rtype != c->reply_type)) return -1; + /* Success */ return 0; } @@ -1174,60 +1256,59 @@ PHPAPI short cluster_send_command(redisCluster *c, short slot, const char *cmd, int resp, timedout=0; long msstart; - /* Grab the current time in milliseconds */ + /* Set the slot we're operating against as well as it's socket. These can + * change during our request loop if we have a master failure and are + * configured to fall back to slave nodes, or if we have to fall back to + * a different slot due to no nodes serving this slot being reachable. */ + c->cmd_slot = slot; + c->cmd_sock = SLOT_SOCK(c, slot); + + /* Get the current time in milliseconds to handle any timeout */ msstart = mstime(); - /* Our main cluster request/reply loop. This loop runs until we're able - * to get a valid reply from a node, hit our "request" timeout, or encounter - * a CLUSTERDOWN state from Redis cluster. */ + /* Our main cluster request/reply loop. This loop runs until we're able to + * get a valid reply from a node, hit our "request" timeout, or enounter a + * CLUSTERDOWN state from Redis Cluster. */ do { - /* Send MULTI to the node if we haven't yet. */ - if(c->flags->mode == MULTI && SLOT_SOCK(c,slot)->mode != MULTI) { + /* Send MULTI to the socket if we're in MULTI mode but haven't yet */ + if (c->flags->mode == MULTI && CMD_SOCK(c)->mode != MULTI) { /* We have to fail if we can't send MULTI to the node */ - if(cluster_send_multi(c, slot TSRMLS_CC)==-1) { + if (cluster_send_multi(c, slot TSRMLS_CC) == -1) { zend_throw_exception(redis_cluster_exception_ce, - "Unable to enter MULTI mode on required slot", + "Unable to enter MULTI mode on requested slot", 0 TSRMLS_CC); return -1; } - - /* This node is now inside a transaction */ - SLOT_SOCK(c,slot)->mode = MULTI; } - /* Attempt to send the command to the slot requested */ - if((slot = cluster_sock_write(c, slot, cmd, cmd_len, 0 TSRMLS_CC))==-1) - { - /* We have no choice but to throw an exception. We can't communicate - * with any node at all. */ + /* Attempt to deliver our command to the node, and that failing, to any + * node until we find one that is available. */ + if (cluster_sock_write(c, cmd, cmd_len, 0 TSRMLS_CC) == -1) { + /* We have to abort, as no nodes are reachable */ zend_throw_exception(redis_cluster_exception_ce, "Can't communicate with any node in the cluster", 0 TSRMLS_CC); return -1; } - /* Check the response from the slot we ended up querying. */ - resp = cluster_check_response(c, slot, &c->reply_type TSRMLS_CC); + /* Now check the response from the node we queried. */ + resp = cluster_check_response(c, &c->reply_type TSRMLS_CC); /* Handle MOVED or ASKING redirection */ - if(resp == 1) { - /* If we get a MOVED response inside of a transaction, we have to - abort, because the transaction would be invalid. */ - if(c->flags->mode == MULTI) { - zend_throw_exception(redis_cluster_exception_ce, - "Can't process MULTI sequence when cluster is resharding", - 0 TSRMLS_CC); - return -1; - } - - // In case of a MOVED redirection, update our node mapping - if(c->redir_type == REDIR_MOVED) { - cluster_update_slot(c TSRMLS_CC); - } - slot = c->redir_slot; + if (resp == 1) { + /* Abort if we're in a transaction as it will be invalid */ + if (c->flags->mode == MULTI) { + zend_throw_exception(redis_cluster_exception_ce, + "Can't process MULTI sequence when cluster is resharding", + 0 TSRMLS_CC); + return -1; + } + + /* Update mapping if the data has MOVED */ + if (c->redir_type == REDIR_MOVED) cluster_update_slot(c TSRMLS_CC); } - /* If we didn't get a valid response see if we've now timed out */ + /* Figure out if we've timed out trying to read or write the data */ timedout = resp && c->waitms ? mstime() - msstart >= c->waitms : 0; } while(resp != 0 && !c->clusterdown && !timedout); @@ -1241,9 +1322,7 @@ PHPAPI short cluster_send_command(redisCluster *c, short slot, const char *cmd, "Timed out attempting to find data in the correct node!", 0 TSRMLS_CC); } - // Inform the cluster where to read the rest of our response, - // and clear out redirection flag. - c->reply_slot = slot; + /* Clear redirection flag */ c->redir_type = REDIR_NONE; // Success, return the slot where data exists. @@ -1266,8 +1345,7 @@ PHPAPI void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, // Make sure we can read the response if(c->reply_type != TYPE_BULK || - (resp = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot), - c->reply_len TSRMLS_CC))==NULL) + (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len TSRMLS_CC))==NULL) { if(c->flags->mode != MULTI) { RETURN_FALSE; @@ -1289,8 +1367,7 @@ PHPAPI void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, // Make sure we can read the response if(c->reply_type != TYPE_BULK || - (resp = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot), - c->reply_len TSRMLS_CC))==NULL) + (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len TSRMLS_CC))==NULL) { CLUSTER_RETURN_FALSE(c); } @@ -1317,8 +1394,7 @@ PHPAPI void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, // Make sure we can read the response if(c->reply_type != TYPE_BULK || - (resp = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot), - c->reply_len TSRMLS_CC))==NULL) + (resp = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len TSRMLS_CC))==NULL) { CLUSTER_RETURN_FALSE(c); } @@ -1441,7 +1517,7 @@ PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, sctx->cb.no_separation = 0; /* We're in a subscribe loop */ - c->subscribed_slot = c->reply_slot; + c->subscribed_slot = c->cmd_slot; /* Multibulk response, {[pattern], type, channel, payload} */ while(1) { @@ -1700,9 +1776,7 @@ PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, array_init(z_result); // Call our specified callback - if(cb(SLOT_SOCK(c,c->reply_slot), z_result, c->reply_len, ctx TSRMLS_CC) - ==FAILURE) - { + if(cb(c->cmd_sock, z_result, c->reply_len, ctx TSRMLS_CC)==FAILURE) { zval_dtor(z_result); FREE_ZVAL(z_result); CLUSTER_RETURN_FALSE(c); @@ -1730,15 +1804,14 @@ PHPAPI int cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, } // Read the BULK size - if(cluster_check_response(c, c->reply_slot, &c->reply_type TSRMLS_CC),0 || + if(cluster_check_response(c, &c->reply_type TSRMLS_CC),0 || c->reply_type != TYPE_BULK) { return FAILURE; } // Read the iterator - if((pit = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot), - c->reply_len TSRMLS_CC))==NULL) + if((pit = redis_sock_read_bulk_reply(c->cmd_sock,c->reply_len TSRMLS_CC))==NULL) { return FAILURE; } @@ -1748,7 +1821,7 @@ PHPAPI int cluster_scan_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, efree(pit); // We'll need another MULTIBULK response for the payload - if(cluster_check_response(c, c->reply_slot, &c->reply_type TSRMLS_CC)<0) + if(cluster_check_response(c, &c->reply_type TSRMLS_CC)<0) { return FAILURE; } @@ -1783,8 +1856,7 @@ PHPAPI void cluster_info_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, char *info; // Read our bulk response - if((info = redis_sock_read_bulk_reply(SLOT_SOCK(c,c->reply_slot), - c->reply_len TSRMLS_CC))==NULL) + if((info = redis_sock_read_bulk_reply(c->cmd_sock, c->reply_len TSRMLS_CC))==NULL) { CLUSTER_RETURN_FALSE(c); } @@ -1810,8 +1882,7 @@ PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, // Pull our next response if directed if(pull) { - if(cluster_check_response(c, c->reply_slot, &c->reply_type - TSRMLS_CC)<0) + if(cluster_check_response(c, &c->reply_type TSRMLS_CC)<0) { return NULL; } @@ -1826,9 +1897,7 @@ PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, array_init(z_result); // Call our callback - if(cb(SLOT_SOCK(c,c->reply_slot), z_result, c->reply_len, NULL TSRMLS_CC) - ==FAILURE) - { + if(cb(c->cmd_sock, z_result, c->reply_len, NULL TSRMLS_CC)==FAILURE) { zval_dtor(z_result); FREE_ZVAL(z_result); return NULL; @@ -1846,13 +1915,18 @@ PHPAPI void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, clusterFoldItem *fi = c->multi_head; while(fi) { - if(cluster_check_response(c, fi->slot, &c->reply_type TSRMLS_CC)<0) { + /* Set the slot where we should look for responses. We don't allow + * failover inside a transaction, so it will be the master we have + * mapped. */ + c->cmd_slot = fi->slot; + c->cmd_sock = SLOT_SOCK(c, fi->slot); + + if(cluster_check_response(c, &c->reply_type TSRMLS_CC)<0) { zval_dtor(c->multi_resp); efree(c->multi_resp); RETURN_FALSE; } - c->reply_slot = fi->slot; fi->callback(INTERNAL_FUNCTION_PARAM_PASSTHRU, c, fi->ctx); fi = fi->next; } @@ -1869,11 +1943,10 @@ PHPAPI void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS, { clusterMultiCtx *mctx = (clusterMultiCtx*)ctx; - // Protect against an invalid response type, -1 response length, and failure - // to consume the responses. + /* Protect against an invalid response type, -1 response length, and failure + * to consume the responses. */ short fail = c->reply_type != TYPE_MULTIBULK || c->reply_len == -1 || - mbulk_resp_loop(SLOT_SOCK(c,c->reply_slot), mctx->z_multi, - c->reply_len, NULL TSRMLS_CC)==FAILURE; + mbulk_resp_loop(c->cmd_sock, mctx->z_multi, c->reply_len, NULL TSRMLS_CC)==FAILURE; // If we had a failure, pad results with FALSE to indicate failure. Non // existant keys (e.g. for MGET will come back as NULL) @@ -2203,4 +2276,5 @@ int mbulk_resp_loop_assoc(RedisSock *redis_sock, zval *z_result, // Success! return SUCCESS; } + /* vim: set tabstop=4 softtabstops=4 noexpandtab shiftwidth=4: */ |