Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormichael-grunder <michael.grunder@gmail.com>2014-06-14 03:41:37 +0400
committermichael-grunder <michael.grunder@gmail.com>2015-05-06 01:00:51 +0300
commit2451c75ed8fbb11d4f444a2c68dc69e5941f8e95 (patch)
treefb69d90838a3c7ea59c5bf1f94f9de1622e7c4c4 /cluster_library.c
parent846f945f7df803d003d8c6c45e628e72ac736f7c (diff)
WATCH command
Implemented the WATCH command for RedisCluster. This command can take any number of keys, so phpredis splits the request across the cluster in the best way it can. For every key in a multiple key command, Redis Cluster requires that they all hash to the same SLOT, else it will return a CROSSLOT error and fail. For WATCH in RedisCluster, a few things to note: * The command will fail if phpredis is out of sync with the keyspace. This is because we'll need to know where to deliver each command, or can't possibly deliver them correctlhy. * The command will fail if any command delivery failures occur on any node. This is the case either for a normal communication error or if RedisCluster returns to us MOVED/ASK redirection.
Diffstat (limited to 'cluster_library.c')
-rw-r--r--cluster_library.c143
1 files changed, 143 insertions, 0 deletions
diff --git a/cluster_library.c b/cluster_library.c
index f5c77eca..de407f43 100644
--- a/cluster_library.c
+++ b/cluster_library.c
@@ -28,6 +28,129 @@ static void cluster_dump_nodes(redisCluster *c) {
}
}
+/* Cluster key distribution helpers. For a small handlful of commands, we
+ * want to distribute them across 1-N nodes. These methods provide
+ * simple containers for the purposes of splitting keys/values in this way */
+
+/* Free cluster distribution list inside a HashTable */
+static void cluster_dist_free_ht(void *p) {
+ clusterDistList *dl = *(clusterDistList**)p;
+ int i;
+
+ for(i=0; i < dl->len; i++) {
+ if(dl->entry[i].key_free) efree(dl->entry[i].key);
+ if(dl->entry[i].val_free) efree(dl->entry[i].val);
+ }
+
+ efree(dl->entry);
+ efree(dl);
+}
+
+/* Spin up a HashTable that will contain distribution lists */
+HashTable *cluster_dist_create() {
+ HashTable *ret;
+
+ ALLOC_HASHTABLE(ret);
+ zend_hash_init(ret, 0, NULL, cluster_dist_free_ht, 0);
+
+ return ret;
+}
+
+/* Free distribution list */
+void cluster_dist_free(HashTable *ht) {
+ zend_hash_destroy(ht);
+ efree(ht);
+}
+
+/* Create a clusterDistList object */
+static clusterDistList *cluster_dl_create() {
+ clusterDistList *dl;
+
+ dl = emalloc(sizeof(clusterDistList));
+ dl->entry = emalloc(CLUSTER_KEYDIST_ALLOC * sizeof(clusterKeyVal));
+ dl->size = CLUSTER_KEYDIST_ALLOC;
+ dl->len = 0;
+
+ return dl;
+}
+
+/* Add a key to a dist list, returning the keval entry */
+static clusterKeyVal *cluster_dl_add_key(clusterDistList *dl, char *key,
+ int key_len, int key_free)
+{
+ // Reallocate if required
+ if(dl->len==dl->size) {
+ dl->entry = erealloc(dl->entry, sizeof(clusterKeyVal) * dl->size * 2);
+ dl->size *= 2;
+ }
+
+ // Set key info
+ dl->entry[dl->len].key = key;
+ dl->entry[dl->len].key_len = key_len;
+ dl->entry[dl->len].key_free = key_free;
+
+ // NULL out any values
+ dl->entry[dl->len].val = NULL;
+ dl->entry[dl->len].val_len = 0;
+ dl->entry[dl->len].val_free = 0;
+
+ return &(dl->entry[dl->len++]);
+}
+
+/* Add a key, returning a pointer to the entry where passed for easy adding
+ * of values to match this key */
+int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key,
+ int key_len, clusterKeyVal **kv)
+{
+ int key_free;
+ short slot;
+ clusterDistList **ppdl, *dl;
+ clusterKeyVal *retptr;
+
+ // Prefix our key and hash it
+ key_free = redis_key_prefix(c->flags, &key, &key_len);
+ slot = cluster_hash_key(key, key_len);
+
+ // We can't do this if we don't fully understand the keyspace
+ if(c->master[slot] == NULL) {
+ if(key_free) efree(key);
+ return FAILURE;
+ }
+
+ // Look for this slot
+ if(zend_hash_index_find(ht, (ulong)slot, (void**)&ppdl)==FAILURE) {
+ dl = cluster_dl_create();
+ zend_hash_index_update(ht, (ulong)slot, (void**)&dl,
+ sizeof(clusterDistList*), NULL);
+ } else {
+ dl = *ppdl;
+ }
+
+ // Now actually add this key
+ retptr = cluster_dl_add_key(dl, key, key_len, key_free);
+
+ // Push our return pointer if requested
+ if(kv) *kv = retptr;
+
+ return SUCCESS;
+}
+
+/* Provided a clusterKeyVal, add a value */
+void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *z_val
+ TSRMLS_CC)
+{
+ char *val;
+ int val_len, val_free;
+
+ // Serialize our value
+ val_free = redis_serialize(c->flags, z_val, &val, &val_len TSRMLS_CC);
+
+ // Attach it to the provied keyval entry
+ kv->val = val;
+ kv->val_len = val_len;
+ kv->val_free = val_free;
+}
+
/* Set our last error string encountered */
static void cluster_set_err(redisCluster *c, char *err, int err_len)
{
@@ -1006,6 +1129,26 @@ static int cluster_send_multi(redisCluster *c, short slot TSRMLS_DC) {
return 0;
}
+/* Send a command to a specific node (without falling back), in addition we
+ * can check for a reply type if not sent as TYPE_EOF */
+PHPAPI int cluster_send_direct(redisCluster *c, short slot, char *cmd,
+ int cmd_len, REDIS_REPLY_TYPE rtype TSRMLS_DC)
+{
+ // Try only this node
+ if(cluster_sock_write(c, slot, cmd, cmd_len, 1 TSRMLS_CC)==-1) {
+ return -1;
+ }
+
+ // Require a +OK response
+ if(cluster_check_response(c, slot, &c->reply_type TSRMLS_CC)!=0 ||
+ (rtype != TYPE_EOF && rtype != c->reply_type))
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
/* Send a command to given slot in our cluster. If we get a MOVED or ASK error
* we attempt to send the command to the node as directed. */
PHPAPI short cluster_send_command(redisCluster *c, short slot,