Welcome to mirror list, hosted at ThFree Co, Russian Federation.

cluster_library.h - github.com/phpredis/phpredis.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 26df6b2e40ceaee801e350d8ea36ef8dfddd7c02 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
#ifndef _PHPREDIS_CLUSTER_LIBRARY_H
#define _PHPREDIS_CLUSTER_LIBRARY_H

#include "common.h"

#ifdef ZTS
#include "TSRM.h"
#endif

/* Redis cluster hash slots and N-1 which we'll use to find it */
#define REDIS_CLUSTER_SLOTS 16384
#define REDIS_CLUSTER_MOD   (REDIS_CLUSTER_SLOTS-1)

/* Minimum valid CLUSTER NODES line element count
   and the minimum we expect if there are slots */
#define CLUSTER_MIN_NODE_LINE     8
#define CLUSTER_MIN_SLOTS_COUNT   9

/* Length of a cluster name */
#define CLUSTER_NAME_LEN 40

/* The parts for our cluster nodes command */
#define CLUSTER_NODES_HASH        0
#define CLUSTER_NODES_HOST_PORT   1
#define CLUSTER_NODES_TYPE        2
#define CLUSTER_NODES_MASTER_HASH 3
#define CLUSTER_NODES_PING        4
#define CLUSTER_NODES_PONG        5
#define CLUSTER_NODES_EPOCH       6
#define CLUSTER_NODES_CONNECTED   7
#define CLUSTER_SLOTS             8

/* Complete representation for various commands in RESP */
#define RESP_MULTI_CMD         "*1\r\n$5\r\nMULTI\r\n"
#define RESP_EXEC_CMD          "*1\r\n$4\r\nEXEC\r\n"
#define RESP_DISCARD_CMD       "*1\r\n$7\r\nDISCARD\r\n"
#define RESP_UNWATCH_CMD       "*1\r\n$7\r\nUNWATCH\r\n"
#define RESP_CLUSTER_SLOTS_CMD "*2\r\n$7\r\nCLUSTER\r\n$5\r\nSLOTS\r\n"
#define RESP_ASKING_CMD        "*1\r\n$6\r\nASKING\r\n"

/* MOVED/ASK comparison macros */
#define IS_MOVED(p) (p[0]=='M' && p[1]=='O' && p[2]=='V' && p[3]=='E' && \
                     p[4]=='D' && p[5]==' ')
#define IS_ASK(p)   (p[0]=='A' && p[1]=='S' && p[3]=='K' && p[4]==' ')

/* MOVED/ASK lengths */
#define MOVED_LEN (sizeof("MOVED ")-1)
#define ASK_LEN   (sizeof("ASK ")-1)

/* Initial allocation size for key distribution container */
#define CLUSTER_KEYDIST_ALLOC 8

/* Slot/RedisSock/RedisSock->stream macros */
#define SLOT(c,s) (c->master[s])
#define SLOT_SOCK(c,s) (SLOT(c,s)->sock)
#define SLOT_STREAM(c,s) (SLOT_SOCK(c,s)->stream)

/* Compare redirection slot information with what we have */
#define CLUSTER_REDIR_CMP(c) \
    (SLOT_SOCK(c,c->redir_slot)->port != c->redir_port || \
    strlen(SLOT_SOCK(c,c->redir_slot)->host) != c->redir_host_len || \
    memcmp(SLOT_SOCK(c,c->redir_slot)->host,c->redir_host,c->redir_host_len))

/* Lazy connect logic */
#define CLUSTER_LAZY_CONNECT(s) \
    if(s->lazy_connect) { \
        s->lazy_connect = 0; \
        redis_sock_server_open(s, 1 TSRMLS_CC); \
    }

/* Clear out our "last error" */
#define CLUSTER_CLEAR_ERROR(c) \
    if(c->err) { \
        efree(c->err); \
        c->err = NULL; \
        c->err_len = 0; \
    }

/* Reset our last single line reply buffer and length */
#define CLUSTER_CLEAR_REPLY(c) \
    *c->line_reply = '\0'; c->reply_len = 0;

/* Helper to determine if we're in MULTI mode */
#define CLUSTER_IS_ATOMIC(c) (c->flags->mode != MULTI)

/* Helper that either returns false or adds false in multi mode */
#define CLUSTER_RETURN_FALSE(c) \
    if(CLUSTER_IS_ATOMIC(c)) { \
        RETURN_FALSE; \
    } else { \
        add_next_index_bool(c->multi_resp, 0); \
        return; \
    }

/* Helper to either return a bool value or add it to MULTI response */
#define CLUSTER_RETURN_BOOL(c, b) \
    if(CLUSTER_IS_ATOMIC(c)) { \
        if(b==1) {\
            RETURN_TRUE; \
        } else {\
            RETURN_FALSE; \
        } \
    } else { \
        add_next_index_bool(c->multi_resp, b); \
    }

/* Helper to respond with a double or add it to our MULTI response */
#define CLUSTER_RETURN_DOUBLE(c, d) \
    if(CLUSTER_IS_ATOMIC(c)) { \
        RETURN_DOUBLE(d); \
    } else { \
        add_next_index_double(c->multi_resp, d); \
    }

/* Helper to return a string value */
#define CLUSTER_RETURN_STRING(c, str, len) \
    if(CLUSTER_IS_ATOMIC(c)) { \
        RETURN_STRINGL(str, len, 0); \
    } else { \
        add_next_index_stringl(c->multi_resp, str, len, 0); \
    } \

/* Return a LONG value */
#define CLUSTER_RETURN_LONG(c, val) \
    if(CLUSTER_IS_ATOMIC(c)) { \
        RETURN_LONG(val); \
    } else { \
        add_next_index_long(c->multi_resp, val); \
    }

/* Macro to clear out a clusterMultiCmd structure */
#define CLUSTER_MULTI_CLEAR(mc) \
    mc->cmd.len  = 0; \
    mc->args.len = 0; \
    mc->argc     = 0; \

/* Initialzie a clusterMultiCmd with a keyword and length */
#define CLUSTER_MULTI_INIT(mc, keyword, keyword_len) \
    mc.kw     = keyword; \
    mc.kw_len = keyword_len; \

/* Cluster redirection enum */
typedef enum CLUSTER_REDIR_TYPE {
    REDIR_NONE,
    REDIR_MOVED,
    REDIR_ASK
} CLUSTER_REDIR_TYPE;

/* MULTI BULK response callback typedef */
typedef int  (*mbulk_cb)(RedisSock*,zval*,long long, void* TSRMLS_DC);

/* Specific destructor to free a cluster object */
// void redis_destructor_redis_cluster(zend_rsrc_list_entry *rsrc TSRMLS_DC);

/* A Redis Cluster master node */
typedef struct redisClusterNode {
    /* Our Redis socket in question */
    RedisSock *sock;

    /* A slot where one of these lives */
    short slot;

    /* Is this a slave node */
    unsigned short slave;

    /* A HashTable containing any slaves */
    HashTable *slaves;
} redisClusterNode;

/* Forward declarations */
typedef struct clusterFoldItem clusterFoldItem;

/* RedisCluster implementation structure */
typedef struct redisCluster {
    /* Object reference for Zend */
    zend_object std;

    /* Timeout and read timeout */
    double timeout;
    double read_timeout;

    /* Hash table of seed host/ports */
    HashTable *seeds;

    /* RedisCluster masters, by direct slot */
    redisClusterNode *master[REDIS_CLUSTER_SLOTS];

    /* All RedisCluster objects we've created/are connected to */
    HashTable *nodes;

    /* Transaction handling linked list, and where we are as we EXEC */
    clusterFoldItem *multi_head;
    clusterFoldItem *multi_curr;

    /* Variable to store MULTI response */
    zval *multi_resp;

    /* How many failures have we had in a row */
    int failures;

    /* The last ERROR we encountered */
    char *err;
    int err_len;

    /* The slot where we should read replies */
    short reply_slot;

    /* The slot where we're subscribed */
    short subscribed_slot;

    /* One RedisSock* struct for serialization and prefix information */
    RedisSock *flags;

    /* Cluster distribution mode (speed, vs. maintaining order of execution) */
    short dist_mode;

    /* The first line of our last reply, not including our reply type byte 
     * or the trailing \r\n */
    char line_reply[1024];

    /* The last reply type and length or integer response we got */
    REDIS_REPLY_TYPE reply_type;
    long long reply_len;

    /* Last MOVED or ASK redirection response information */
    CLUSTER_REDIR_TYPE redir_type;
    char               redir_host[255];
    int                redir_host_len;
    unsigned short     redir_slot;
    unsigned short     redir_port;
} redisCluster;

/* RedisCluster response processing callback */
typedef void (*cluster_cb)(INTERNAL_FUNCTION_PARAMETERS, redisCluster*, void*);

/* Context for processing transactions */
struct clusterFoldItem {
    /* Response processing callback */
    cluster_cb callback;

    /* The slot where this response was sent */
    short slot;

    /* Any context we need to send to our callback */
    void *ctx;

    /* Next item in our list */
    struct clusterFoldItem *next;
};

/* Key and value container, with info if they need freeing */
typedef struct clusterKeyVal {
    char *key, *val;
    int  key_len,  val_len;
    int  key_free, val_free;
} clusterKeyVal;

/* Container to hold keys (and possibly values) for when we need to distribute
 * commands across more than 1 node (e.g. WATCH, MGET, MSET, etc) */
typedef struct clusterDistList {
    clusterKeyVal *entry;
    size_t len, size;
} clusterDistList;

/* Context for things like MGET/MSET/MSETNX.  When executing in MULTI mode, 
 * we'll want to re-integrate into one running array, except for the last
 * command execution, in which we'll want to return the value (or add it) */
typedef struct clusterMultiCtx {
    /* Our running array */
    zval *z_multi;

    /* How many keys did we request for this bit */
    int count;

    /* Is this the last entry */
    short last;
} clusterMultiCtx;

/* Container for things like MGET, MSET, and MSETNX, which split the command
 * into a header and payload while aggregating to a specific slot. */
typedef struct clusterMultiCmd {
    /* Keyword and keyword length */
    char *kw;
    int  kw_len;

    /* Arguments in our payload */
    int argc;

    /* The full command, built into cmd, and args as we aggregate */
    smart_str cmd;
    smart_str args;
} clusterMultiCmd;

/* Hiredis like structure for processing any sort of reply Redis Cluster might
 * give us, including N level deep nested multi-bulk replies.  Unlike hiredis
 * we don't encode errors, here as that's handled in the cluster structure. */
typedef struct clusterReply {
    REDIS_REPLY_TYPE type;         /* Our reply type */
    long long integer;             /* Integer reply */
    size_t len;                    /* Length of our string */
    char *str;                     /* String reply */
    size_t elements;               /* Count of array elements */
    struct clusterReply **element; /* Array elements */
} clusterReply;

/* Direct variant response handler */
clusterReply *cluster_read_resp(redisCluster *c TSRMLS_DC);
clusterReply *cluster_read_sock_resp(RedisSock *redis_sock, 
    REDIS_REPLY_TYPE type, size_t reply_len TSRMLS_DC);
void cluster_free_reply(clusterReply *reply, int free_data);

/* Cluster distribution helpers for WATCH */
HashTable *cluster_dist_create();
void cluster_dist_free(HashTable *ht);
int cluster_dist_add_key(redisCluster *c, HashTable *ht, char *key, 
    int key_len, clusterKeyVal **kv);
void cluster_dist_add_val(redisCluster *c, clusterKeyVal *kv, zval *val 
    TSRMLS_CC);

/* Aggregation for multi commands like MGET, MSET, and MSETNX */
void cluster_multi_init(clusterMultiCmd *mc, char *kw, int kw_len);
void cluster_multi_free(clusterMultiCmd *mc);
void cluster_multi_add(clusterMultiCmd *mc, char *data, int data_len);
void cluster_multi_fini(clusterMultiCmd *mc);

/* Hash a key to it's slot, using the Redis Cluster hash algorithm */
unsigned short cluster_hash_key_zval(zval *key);
unsigned short cluster_hash_key(const char *key, int len);

PHPAPI short cluster_send_command(redisCluster *c, short slot, const char *cmd, 
    int cmd_len TSRMLS_DC);

PHPAPI void cluster_disconnect(redisCluster *c TSRMLS_DC);

PHPAPI int cluster_send_exec(redisCluster *c, short slot TSRMLS_DC);
PHPAPI int cluster_send_discard(redisCluster *c, short slot TSRMLS_DC);
PHPAPI int cluster_abort_exec(redisCluster *c TSRMLS_DC);
PHPAPI int cluster_reset_multi(redisCluster *c);

PHPAPI short cluster_find_slot(redisCluster *c, const char *host,
    unsigned short port);
PHPAPI int cluster_send_slot(redisCluster *c, short slot, char *cmd, 
    int cmd_len, REDIS_REPLY_TYPE rtype TSRMLS_DC);

PHPAPI int cluster_init_seeds(redisCluster *c, HashTable *ht_seeds);
PHPAPI int cluster_map_keyspace(redisCluster *c TSRMLS_DC);
PHPAPI void cluster_free_node(redisClusterNode *node);

PHPAPI char **cluster_sock_read_multibulk_reply(RedisSock *redis_sock,
    int *len TSRMLS_DC);

/*
 * Redis Cluster response handlers.  Our response handlers generally take the
 * following form:
 *      PHPAPI void handler(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 
 *          void *ctx)
 *
 * Reply handlers are responsible for setting the PHP return value (either to
 * something valid, or FALSE in the case of some failures).
 */

PHPAPI void cluster_bool_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 
    void *ctx);
PHPAPI void cluster_bulk_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 
    void *ctx);
PHPAPI void cluster_bulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,    
    void *ctx);
PHPAPI void cluster_dbl_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 
    void *ctx);
PHPAPI void cluster_1_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 
    void *ctx);
PHPAPI void cluster_long_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 
    void *ctx);
PHPAPI void cluster_type_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c, 
    void *ctx);
PHPAPI void cluster_sub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
    void *ctx);
PHPAPI void cluster_unsub_resp(INTERNAL_FUNCTION_PARAMETERS, redisCluster *c,
    void *ctx);

/* Generic/Variant handler for stuff like EVAL */
PHPAPI void cluster_variant_resp(INTERNAL_FUNCTION_PARAMETERS,
    redisCluster *c, void *ctx);

/* MULTI BULK response functions */
PHPAPI void cluster_gen_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, 
    redisCluster *c, mbulk_cb func, void *ctx);
PHPAPI void cluster_mbulk_raw_resp(INTERNAL_FUNCTION_PARAMETERS, 
    redisCluster *c, void *ctx);
PHPAPI void cluster_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, 
    redisCluster *c, void *ctx);
PHPAPI void cluster_mbulk_zipstr_resp(INTERNAL_FUNCTION_PARAMETERS,
    redisCluster *c, void *ctx);
PHPAPI void cluster_mbulk_zipdbl_resp(INTERNAL_FUNCTION_PARAMETERS,
    redisCluster *c, void *ctx);
PHPAPI void cluster_mbulk_assoc_resp(INTERNAL_FUNCTION_PARAMETERS, 
    redisCluster *c, void *ctx);
PHPAPI void cluster_multi_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS,
    redisCluster *c, void *ctx);
PHPAPI zval *cluster_zval_mbulk_resp(INTERNAL_FUNCTION_PARAMETERS, 
    redisCluster *c, int pull, mbulk_cb cb);

/* Handlers for things like DEL/MGET/MSET/MSETNX */
PHPAPI void cluster_del_resp(INTERNAL_FUNCTION_PARAMETERS, 
    redisCluster *c, void *ctx);
PHPAPI void cluster_mbulk_mget_resp(INTERNAL_FUNCTION_PARAMETERS, 
    redisCluster *c, void *ctx);
PHPAPI void cluster_mset_resp(INTERNAL_FUNCTION_PARAMETERS,
    redisCluster *c, void *ctx);
PHPAPI void cluster_msetnx_resp(INTERNAL_FUNCTION_PARAMETERS,
    redisCluster *c, void *ctx);

/* Response handler for ZSCAN, SSCAN, and HSCAN */
PHPAPI int cluster_kscan_resp(INTERNAL_FUNCTION_PARAMETERS,
    redisCluster *c, REDIS_SCAN_TYPE type, long *it);

/* MULTI BULK processing callbacks */
int mbulk_resp_loop(RedisSock *redis_sock, zval *z_result, 
    long long count, void *ctx TSRMLS_DC);
int mbulk_resp_loop_raw(RedisSock *redis_sock, zval *z_result, 
    long long count, void *ctx TSRMLS_DC);
int mbulk_resp_loop_zipstr(RedisSock *redis_sock, zval *z_result,
    long long count, void *ctx TSRMLS_DC);
int mbulk_resp_loop_zipdbl(RedisSock *redis_sock, zval *z_result,
    long long count, void *ctx TSRMLS_DC);
int mbulk_resp_loop_assoc(RedisSock *redis_sock, zval *z_result,
    long long count, void *ctx TSRMLS_DC);

#endif

/* vim: set tabstop=4 softtabstops=4 noexpandtab shiftwidth=4: */