Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormichael-grunder <michael.grunder@gmail.com>2014-05-28 01:12:19 +0400
committermichael-grunder <michael.grunder@gmail.com>2015-05-05 23:37:18 +0300
commita8dbe68a211439b37a499ad1828c006fa337c7b5 (patch)
treefb9c30976b95a5d1e1e901c21665f6c41d82c249 /cluster_library.c
parent7ac92792319246965d90545cd7a0dbb896232982 (diff)
Initial commit of RedisCluster send/receive loop for commands.
Every time we communicate with the cluster we have to determine which slot to try, and then if we get a MOVED or ASK redirection, retry at the new slot. In the case of MOVED, we'll update the node mapping we cache locally. For ASK, we'll leave it unchanged but try at the slot where directed
Diffstat (limited to 'cluster_library.c')
-rw-r--r--cluster_library.c343
1 files changed, 339 insertions, 4 deletions
diff --git a/cluster_library.c b/cluster_library.c
index 0b2964f3..24ea4b9e 100644
--- a/cluster_library.c
+++ b/cluster_library.c
@@ -7,6 +7,28 @@
extern zend_class_entry *redis_cluster_exception_ce;
+/* Set our last error string encountered */
+static void cluster_set_err(redisCluster *c, char *err, int err_len)
+{
+ if(err && err_len>0) {
+ if(c->err == NULL) {
+ c->err = emalloc(err_len+1);
+ } else if(err_len > c->err_len) {
+ c->err = erealloc(c->err, err_len + 1);
+ }
+ memcpy(c->err,err,err_len);
+ c->err[err_len]='\0';
+ c->err_len = err_len;
+ } else {
+ if(c->err) {
+ efree(c->err);
+ }
+ c->err = NULL;
+ c->err_len = 0;
+ }
+}
+
+/* Free a cluster info structure */
static void free_cluster_info(clusterNodeInfo *info) {
if(info->name) {
efree(info->name);
@@ -63,7 +85,8 @@ unsigned short cluster_hash_key_zval(zval *z_key) {
const char *kptr;
char buf[255];
int klen;
-
+
+ // Switch based on ZVAL type
switch(Z_TYPE_P(z_key)) {
case IS_STRING:
kptr = Z_STRVAL_P(z_key);
@@ -165,7 +188,8 @@ cluster_parse_node_line(RedisSock *sock, char *line, clusterNodeInfo *info) {
efree(array);
return -1;
}
- info->master_name = estrndup(array[CLUSTER_NODES_MASTER_HASH], CLUSTER_NAME_LEN);
+ info->master_name = estrndup(array[CLUSTER_NODES_MASTER_HASH],
+ CLUSTER_NAME_LEN);
} else {
info->master_name = NULL;
}
@@ -221,8 +245,9 @@ static clusterNodeInfo
}
// Make sure we've got a bulk reply and we can read it
- if(redis_read_reply_type(redis_sock, &type, len TSRMLS_CC)<0 || type!=TYPE_BULK ||
- (reply = redis_sock_read_bulk_reply(redis_sock, *len TSRMLS_CC))==NULL)
+ if(redis_read_reply_type(redis_sock, &type, len TSRMLS_CC)<0 ||
+ type!=TYPE_BULK || (reply = redis_sock_read_bulk_reply(redis_sock,
+ *len TSRMLS_CC))==NULL)
{
efree(cmd);
return NULL;
@@ -279,6 +304,27 @@ cluster_node_create(redisCluster *cluster, clusterNodeInfo *info) {
return node;
}
+/* Helper to create a cluster node struct from each bit of info */
+static redisClusterNode*
+cluster_node_create_ex(redisCluster *c, const char *name,
+ const char *master_name, const char *host,
+ int host_len, unsigned short port,
+ unsigned short start_slot,
+ unsigned short end_slot)
+{
+ clusterNodeInfo info = {0};
+
+ info.name = (char*)name;
+ info.master_name = (char*)master_name;
+ info.host = (char*)host;
+ info.host_len = host_len;
+ info.port = port;
+ info.start_slot = start_slot;
+ info.end_slot = end_slot;
+
+ return cluster_node_create(c, &info);
+}
+
/* Free a redisClusterNode structure */
PHPAPI void cluster_free_node(redisClusterNode *node) {
efree(node->name);
@@ -306,6 +352,47 @@ void cluster_free_info_array(clusterNodeInfo **array, int count) {
efree(array);
}
+/* Get a RedisSock object from the host and port where we have been
+ * directed from an ASK response. We'll first see if we have
+ * connected to this node already, and return that. If not, we
+ * create it and add it to our nodes.
+ */
+static RedisSock *cluster_get_asking_sock(redisCluster *c TSRMLS_DC) {
+ redisClusterNode **ppNode;
+ clusterNodeInfo pInfo = {0};
+ char key[1024];
+ int key_len;
+
+ // It'll be hashed as host:port in our nodes HashTable
+ key_len = snprintf(key, sizeof(key), "%s:%u", c->redir_host,
+ c->redir_port);
+
+ // See if we've already attached to it
+ if(zend_hash_find(c->nodes, key, key_len+1, (void**)&ppNode)==SUCCESS)
+ {
+ return (*ppNode)->sock;
+ }
+
+ // We have yet to encounter this host:port so create
+ pInfo.name = NULL;
+ pInfo.master_name = NULL;
+ pInfo.host = c->redir_host;
+ pInfo.host_len = strlen(c->redir_host);
+ pInfo.port = c->redir_port;
+ pInfo.start_slot = c->redir_slot;
+ pInfo.end_slot = c->redir_slot;
+
+ // Create a redisClusterNode
+ *ppNode = cluster_node_create(c, &pInfo);
+
+ // Now add it to the nodes we have
+ zend_hash_update(c->nodes, key, key_len+1, (void*)ppNode,
+ sizeof(redisClusterNode*), NULL);
+
+ // Return the RedisSock
+ return (*ppNode)->sock;
+}
+
/* Attach a slave to a cluster node */
int
cluster_node_add_slave(redisCluster *cluster, redisClusterNode *master,
@@ -536,4 +623,252 @@ cluster_map_keyspace(redisCluster *cluster TSRMLS_DC) {
return 0;
}
+/* Helper to find if we've got a host:port mapped in our cluster nodes. */
+static redisClusterNode *cluster_find_node(redisCluster *c, const char *host,
+ unsigned short port)
+{
+ redisClusterNode **ret = NULL;
+ int key_len;
+ char key[1024];
+
+ key_len = snprintf(key,sizeof(key),"%s:%d", host, port);
+
+ if(zend_hash_find(c->nodes, key, key_len+1, (void**)&ret)==SUCCESS) {
+ return *ret;
+ }
+
+ // Not found
+ return NULL;
+}
+
+/* Once we write a command to a node in our cluster, this function will check
+ * the reply type and extract information from those that will specify a length
+ * bit. If we encounter an error condition, we'll check for MOVED or ASK
+ * redirection, parsing out slot host and port so the caller can take
+ * appropriate action.
+ *
+ * In the case of a non MOVED/ASK error, we wlll set our cluster error
+ * condition so GetLastError can be queried by the client.
+ *
+ * This function will return -1 on a critical error (e.g. parse/communication
+ * error, 0 if no redirection was encountered, and 1 if the data was moved. */
+static int cluster_check_response(redisCluster *c, unsigned short slot,
+ REDIS_REPLY_TYPE *reply_type,
+ int *reply_len TSRMLS_DC)
+{
+ // Check for general socket EOF and then EOF on our reply type request
+ if((-1 == redis_check_eof(SLOT(c,slot)->sock)) ||
+ (*reply_type = php_stream_getc(SLOT_STREAM(c,slot))))
+ {
+ // Actual communications error
+ return -1;
+ }
+
+ // In the event of an ERROR, check if it's a MOVED/ASK error
+ if(*reply_type == TYPE_ERR) {
+ char inbuf[1024];
+ int moved;
+
+ // Attempt to read the error
+ if(php_stream_gets(SLOT_STREAM(c,slot), inbuf, sizeof(inbuf))<0) {
+ return -1;
+ }
+
+ // Check for MOVED or ASK redirection
+ if((moved = IS_MOVED(inbuf)) || IS_ASK(inbuf)) {
+ char *pslot, *phost, *pport;
+
+ // Move pased MOVED or ASK error message
+ if(moved) {
+ pslot = inbuf + MOVED_LEN;
+ } else {
+ pslot = inbuf + ASK_LEN;
+ }
+
+ // We will need to see a slot separator
+ if(!(phost = strchr(pslot, ' '))) {
+ return -1;
+ }
+
+ // Null terminate at the separator
+ *phost++ = '\0';
+
+ // We'll need to see host:port separator
+ if(!(pport = strchr(phost, ':'))) {
+ return -1;
+ }
+
+ // Null terminate here
+ *pport++ = '\0';
+
+ // Set our cluster redirection information
+ c->redir_type = moved ? REDIR_MOVED : REDIR_ASK;
+ strncpy(c->redir_host, phost, sizeof(c->redir_host));
+ c->redir_host_len = pport - phost - 1;
+ c->redir_slot = (unsigned short)atoi(pslot);
+ c->redir_port = (unsigned short)atoi(pport);
+
+ // Data moved
+ return 1;
+ } else {
+ // Capture the error string Redis returned
+ cluster_set_err(c, inbuf+1, strlen(inbuf+1)-2);
+ return 0;
+ }
+ }
+
+ // For BULK, MULTI BULK, or simply INTEGER response typese we can get
+ // the response length.
+ if(*reply_type == TYPE_INT || *reply_type == TYPE_BULK ||
+ *reply_type == TYPE_MULTIBULK)
+ {
+ char inbuf[255];
+
+ if(php_stream_gets(SLOT_STREAM(c,slot), inbuf, sizeof(inbuf))<0) {
+ return -1;
+ }
+
+ // Size information
+ *reply_len = atoi(inbuf);
+ }
+
+ // Clear out any previous error, and return that the data is here
+ cluster_set_err(c, NULL, 0);
+ return 0;
+}
+
+/* Attempt to write to a cluster node. If the node is NULL (e.g.
+ * it's been umapped, we keep falling back until we run out of nodes
+ * to try */
+static int cluster_sock_write(redisCluster *c, unsigned short slot,
+ const char *cmd, size_t sz TSRMLS_DC)
+{
+ RedisSock *redis_sock;
+ int i;
+
+ // If we're in an ASK redirection state, attempt a connection to that
+ // host and port. Otherwise, try on the requested slot.
+ if(c->redir_type != REDIR_ASK) {
+ redis_sock = SLOT_SOCK(c,slot);
+ } else {
+ redis_sock = cluster_get_asking_sock(c);
+ }
+
+ // First attempt to write it to the slot that's been requested
+ if(redis_sock && !redis_check_eof(redis_sock TSRMLS_CC) &&
+ !php_stream_write(redis_sock->stream, cmd, sz))
+ {
+ // We were able to write it
+ return 0;
+ }
+
+ // Fall back by attempting to write the request to other nodes
+ // TODO: Randomize the slots we request from
+ for(i=0;i<REDIS_CLUSTER_SLOTS;i++) {
+ redis_sock = SLOT_SOCK(c,i);
+
+ // Attempt the write to this node
+ if(!redis_check_eof(redis_sock TSRMLS_CC) &&
+ !php_stream_write(redis_sock->stream, cmd, sz))
+ {
+ // Return the slot where we actually sent the request
+ return i;
+ }
+ }
+
+ // We were unable to write to any node in our cluster
+ return -1;
+}
+
+/* Provided a redisCluster object, the slot where we thought data was and
+ * the slot where data was moved, update our node mapping */
+static void cluster_update_slot(redisCluster *c, short orig_slot TSRMLS_CC) {
+ redisClusterNode *node;
+
+ // Invalidate original slot
+ c->master[orig_slot] = NULL;
+
+ // Do we already have the new slot mapped
+ if(c->master[c->redir_slot]) {
+ // Has this slot changed host or port
+ if(CLUSTER_REDIR_CMP(c)) {
+ // Check to see if we have this new node mapped
+ node = cluster_find_node(c, c->redir_host, c->redir_port);
+
+ if(node) {
+ // Just point to this slot
+ c->master[c->redir_slot] = node;
+ } else {
+ // Create our node
+ node = cluster_node_create_ex(c, NULL, NULL, c->redir_host,
+ c->redir_host_len, c->redir_port, c->redir_slot,
+ c->redir_slot);
+
+ // Now point our slot at the node
+ c->master[c->redir_slot] = node;
+ }
+ }
+ } else {
+ // Check to see if the ip and port are mapped
+ node = cluster_find_node(c, c->redir_host, c->redir_port);
+ if(!node) {
+ node = cluster_node_create_ex(c, NULL, NULL, c->redir_host,
+ c->redir_host_len, c->redir_port, c->redir_slot, c->redir_slot);
+ }
+
+ // Map the slot to this node
+ c->master[c->redir_slot] = node;
+ }
+}
+
+/* Send a command to given slot in our cluster. If we get a MOVED
+ * or ASK error we attempt to send the command to the node as
+ * directed. */
+PHPAPI short cluster_send_command(redisCluster *c, short slot,
+ const char *cmd, int cmd_len TSRMLS_DC)
+{
+ REDIS_REPLY_TYPE reply_type;
+ int resp, reply_len, rslot = slot;
+
+ // Issue commands until we find the right node or fail
+ do {
+ // Attempt to send the command to the slot requested
+ if((slot = cluster_sock_write(c, slot, cmd, cmd_len
+ TSRMLS_CC))==-1)
+ {
+ // We have no choice but to throw an exception. We
+ // can't communicate with any node at all.
+ zend_throw_exception(redis_cluster_exception_ce,
+ "Can't communicate with any node in the cluster",
+ 0 TSRMLS_CC);
+ return -1;
+ }
+
+ // Check the response from the slot we ended up querying.
+ resp = cluster_check_response(c, slot, &reply_type, &reply_len
+ TSRMLS_CC);
+
+ // If we're getting an error condition, impose a slight delay before
+ // we try again (e.g. server went down, election in process). If the
+ // data has been moved, update node configuration, and if ASK has been
+ // encountered, we'll just try again at that slot.
+ if(resp == -1) {
+ // TODO: More robust error handling, count errors and ultimately
+ // fail?
+ sleep(1);
+ } else if(resp == 1) {
+ // In case of a MOVED redirection, update our node mapping
+ if(c->redir_type == REDIR_MOVED) {
+ cluster_update_slot(c, rslot);
+ }
+ }
+ } while(resp != 0);
+
+ // Clear out redirection flag
+ c->redir_type = REDIR_NONE;
+
+ // Success, return the slot where data exists.
+ return slot;
+}
+
/* vim: set tabstop=4 softtabstops=4 noexpandtab shiftwidth=4: */