From 0bb500c1e0f845cceaddca877504ec8c478837b7 Mon Sep 17 00:00:00 2001 From: michael-grunder Date: Mon, 23 Feb 2015 15:32:35 -0800 Subject: Added options for slave failover --- cluster_library.c | 86 ++++++++++++++++++++++++++----------------------------- 1 file changed, 41 insertions(+), 45 deletions(-) (limited to 'cluster_library.c') diff --git a/cluster_library.c b/cluster_library.c index 373d806c..4fecbd94 100644 --- a/cluster_library.c +++ b/cluster_library.c @@ -913,30 +913,27 @@ PHPAPI void cluster_disconnect(redisCluster *c TSRMLS_DC) { /* Attempt to write to a cluster node. If the node is NULL (e.g. it's been * umapped, we keep falling back until we run out of nodes to try */ static int cluster_sock_write(redisCluster *c, unsigned short slot, - const char *cmd, size_t sz, int - direct TSRMLS_DC) + const char *cmd, size_t sz, int direct TSRMLS_DC) { RedisSock *redis_sock; redisClusterNode **seed_node; - // If we're not in ASK redirection, use the slot requested, otherwise - // send our ASKING command and use the asking slot. + /* If we're not in ASK redirection, use the slot requested, otherwise send + * our ASKING command and use the asking slot. */ if(c->redir_type != REDIR_ASK) { redis_sock = SLOT_SOCK(c,slot); } else { redis_sock = cluster_get_asking_sock(c TSRMLS_CC); - - // Redis Cluster wants this command preceded by the "ASKING" command if(cluster_send_asking(redis_sock TSRMLS_CC)<0) { return -1; } } - // If the lazy_connect flag is still set, we've not actually - // connected to this node, so do that now. + /* If the lazy_connect flag is still set, we've not actually connected to + * this node, so do that now. */ CLUSTER_LAZY_CONNECT(redis_sock); - // First attempt to write it to the slot that's been requested + /* First attempt to write it to the slot that's been requested */ if(redis_sock && redis_sock->stream && !redis_check_eof(redis_sock, 1 TSRMLS_CC) && php_stream_write(redis_sock->stream, cmd, sz)==sz) @@ -945,10 +942,10 @@ static int cluster_sock_write(redisCluster *c, unsigned short slot, return slot; } - // Don't fall back if direct communication with this slot is required. + /* Don't fall back if direct communication with this slot is required. */ if(direct) return -1; - // Fall back by attempting the request against every known node + /* Fall back by attempting the request against every known node */ for(zend_hash_internal_pointer_reset(c->nodes); zend_hash_has_more_elements(c->nodes)==SUCCESS; zend_hash_move_forward(c->nodes)) @@ -959,22 +956,21 @@ static int cluster_sock_write(redisCluster *c, unsigned short slot, /* Skip this node if it's the one that failed */ if((*seed_node)->sock == redis_sock) continue; - // TODO: Allow for failure/redirection queries to be sent - // to slave nodes, but for now, stick with masters. + /* Skip slave nodes */ if((*seed_node)->slave) continue; CLUSTER_LAZY_CONNECT((*seed_node)->sock); - // Attempt to write our request to this node + /* Attempt to write our request to this node */ if(!redis_check_eof((*seed_node)->sock, 1 TSRMLS_CC) && php_stream_write((*seed_node)->sock->stream, cmd, sz)==sz) { - // Just return the first slot we think this node handles + /* Just return the first slot we think this node handles */ return (*seed_node)->slot; } } - // We were unable to write to any node in our cluster + /* We were unable to write to any node in our cluster */ return -1; } @@ -992,7 +988,7 @@ static redisClusterNode *cluster_find_node(redisCluster *c, const char *host, return *ret; } - // Not found + /* Not found */ return NULL; } @@ -1001,57 +997,57 @@ static redisClusterNode *cluster_find_node(redisCluster *c, const char *host, static void cluster_update_slot(redisCluster *c TSRMLS_DC) { redisClusterNode *node; - // Do we already have the new slot mapped + /* Do we already have the new slot mapped */ if(c->master[c->redir_slot]) { - // No need to do anything if it's the same node + /* No need to do anything if it's the same node */ if(!CLUSTER_REDIR_CMP(c)) { return; } - // Check to see if we have this new node mapped + /* Check to see if we have this new node mapped */ node = cluster_find_node(c, c->redir_host, c->redir_port); if(node) { - // Just point to this slot + /* Just point to this slot */ c->master[c->redir_slot] = node; } else { - // Create our node + /* Create our node */ node = cluster_node_create(c, c->redir_host, c->redir_host_len, c->redir_port, c->redir_slot, 0); - // Now point our slot at the node + /* Now point our slot at the node */ c->master[c->redir_slot] = node; } } else { - // Check to see if the ip and port are mapped + /* Check to see if the ip and port are mapped */ node = cluster_find_node(c, c->redir_host, c->redir_port); if(!node) { node = cluster_node_create(c, c->redir_host, c->redir_host_len, c->redir_port, c->redir_slot, 0); } - // Map the slot to this node + /* Map the slot to this node */ c->master[c->redir_slot] = node; } - // Update slot inside of node, so it can be found for command sending + /* Update slot inside of node, so it can be found for command sending */ node->slot = c->redir_slot; - // Make sure we unflag this node as a slave, as Redis Cluster will only - // ever direct us to master nodes. + /* Make sure we unflag this node as a slave, as Redis Cluster will only ever + * direct us to master nodes. */ node->slave = 0; } /* Send EXEC to a specific slot */ PHPAPI int cluster_send_exec(redisCluster *c, short slot TSRMLS_DC) { - // We have to be able to write this to the slot requested + /* We have to be able to write this to the slot requested */ if(cluster_sock_write(c, slot, RESP_EXEC_CMD, sizeof(RESP_EXEC_CMD)-1, 1 TSRMLS_CC)==-1) { return -1; } - // We have to get a proper response from the slot to continue + /* We have to get a proper response from the slot to continue */ if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 || c->reply_type != TYPE_MULTIBULK) { @@ -1086,7 +1082,7 @@ PHPAPI int cluster_send_discard(redisCluster *c, short slot TSRMLS_DC) { PHPAPI int cluster_abort_exec(redisCluster *c TSRMLS_DC) { clusterFoldItem *fi = c->multi_head; - // Loop through our fold items + /* Loop through our fold items */ while(fi) { if(SLOT_SOCK(c,fi->slot)->mode == MULTI) { if(cluster_send_discard(c, fi->slot TSRMLS_CC)<0) { @@ -1099,31 +1095,31 @@ PHPAPI int cluster_abort_exec(redisCluster *c TSRMLS_DC) { fi = fi->next; } - // Update our overall cluster state + /* Update our overall cluster state */ c->flags->mode = ATOMIC; - // Success + /* Success */ return 0; } /* Send MULTI to a given slot and consume the response. If we can't send the * command OR we get an error in our response, we have to fail. */ static int cluster_send_multi(redisCluster *c, short slot TSRMLS_DC) { - // We have to be able to communicate with the node we want + /* We have to be able to communicate with the node we want */ if(cluster_sock_write(c, slot, RESP_MULTI_CMD, sizeof(RESP_MULTI_CMD)-1, 1 TSRMLS_CC)==-1) { return -1; } - // We have to get a proper response + /* We have to get a proper response */ if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 || c->reply_type != TYPE_LINE) { return -1; } - // Success + /* Success */ return 0; } @@ -1185,9 +1181,9 @@ PHPAPI short cluster_send_command(redisCluster *c, short slot, const char *cmd, * to get a valid reply from a node, hit our "request" timeout, or encounter * a CLUSTERDOWN state from Redis cluster. */ do { - // Send MULTI to the node if we haven't yet. + /* Send MULTI to the node if we haven't yet. */ if(c->flags->mode == MULTI && SLOT_SOCK(c,slot)->mode != MULTI) { - // We have to fail if we can't send MULTI to the node + /* We have to fail if we can't send MULTI to the node */ if(cluster_send_multi(c, slot TSRMLS_CC)==-1) { zend_throw_exception(redis_cluster_exception_ce, "Unable to enter MULTI mode on required slot", @@ -1195,28 +1191,28 @@ PHPAPI short cluster_send_command(redisCluster *c, short slot, const char *cmd, return -1; } - // This node is now inside a transaction + /* This node is now inside a transaction */ SLOT_SOCK(c,slot)->mode = MULTI; } - // Attempt to send the command to the slot requested + /* Attempt to send the command to the slot requested */ if((slot = cluster_sock_write(c, slot, cmd, cmd_len, 0 TSRMLS_CC))==-1) { - // We have no choice but to throw an exception. We - // can't communicate with any node at all. + /* We have no choice but to throw an exception. We can't communicate + * with any node at all. */ zend_throw_exception(redis_cluster_exception_ce, "Can't communicate with any node in the cluster", 0 TSRMLS_CC); return -1; } - // Check the response from the slot we ended up querying. + /* Check the response from the slot we ended up querying. */ resp = cluster_check_response(c, slot, &c->reply_type TSRMLS_CC); /* Handle MOVED or ASKING redirection */ if(resp == 1) { - // If we get a MOVED response inside of a transaction, we have to - // abort, because the transaction would be invalid. + /* If we get a MOVED response inside of a transaction, we have to + abort, because the transaction would be invalid. */ if(c->flags->mode == MULTI) { zend_throw_exception(redis_cluster_exception_ce, "Can't process MULTI sequence when cluster is resharding", -- cgit v1.2.3