diff options
author | Emmanuel Merali <emmanuel@mobli.com> | 2013-01-29 13:47:36 +0400 |
---|---|---|
committer | Emmanuel Merali <emmanuel@mobli.com> | 2013-01-29 13:47:36 +0400 |
commit | d4c7f6413136a44d4afdfa416b7fe5553dce97d0 (patch) | |
tree | 83653ba6e8b5f4a81975634e36b7095486f1af47 | |
parent | 9ba000c54cc47d470366c59af87ddf9d4dd454bc (diff) |
New select DB command to RedisArray - Added retry delay on reconnect
Added the possibility to delay each reconnection attempt, including a
random factor to prevent several or many concurrent connections from
trying to reconnect at the same time.
Added the select command to RedisArray to select a DB on every
connections in one instruction.
Also, fixed a compiler warning:
redis_array_impl.c:1115:15: warning: incompatible pointer types
assigning to 'zval **' (aka 'struct _zval_struct **') from 'zval
**(*)[2]' [-Wincompatible-pointer-types]
-rw-r--r-- | common.h | 1 | ||||
-rw-r--r-- | library.c | 14 | ||||
-rw-r--r-- | library.h | 2 | ||||
-rw-r--r-- | redis.c | 15 | ||||
-rw-r--r-- | redis_array.c | 57 | ||||
-rw-r--r-- | redis_array.h | 1 | ||||
-rw-r--r-- | redis_array_impl.c | 33 | ||||
-rw-r--r-- | redis_array_impl.h | 4 | ||||
-rw-r--r-- | redis_session.c | 9 |
9 files changed, 112 insertions, 24 deletions
@@ -156,6 +156,7 @@ typedef struct { char *host; short port; double timeout; + long retry_interval; int failed; int status; int persistent; @@ -38,8 +38,9 @@ PHPAPI int redis_check_eof(RedisSock *redis_sock TSRMLS_DC) int eof; int count = 0; - if (!redis_sock->stream) + if (!redis_sock->stream) { return -1; + } eof = php_stream_eof(redis_sock->stream); for (; eof; count++) { @@ -60,6 +61,12 @@ PHPAPI int redis_check_eof(RedisSock *redis_sock TSRMLS_DC) redis_sock->mode = ATOMIC; redis_sock->watching = 0; } + // Wait for a while before trying to reconnect + if (redis_sock->retry_interval) { + // Random factor to avoid having several (or many) concurrent connections trying to reconnect at the same time + long retry_interval = (count ? redis_sock->retry_interval : (random() % redis_sock->retry_interval)); + usleep(retry_interval); + } redis_sock_connect(redis_sock TSRMLS_CC); /* reconnect */ if(redis_sock->stream) { /* check for EOF again. */ eof = php_stream_eof(redis_sock->stream); @@ -821,7 +828,8 @@ PHPAPI void redis_ping_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_s * redis_sock_create */ PHPAPI RedisSock* redis_sock_create(char *host, int host_len, unsigned short port, - double timeout, int persistent, char *persistent_id) + double timeout, int persistent, char *persistent_id, + long retry_interval) { RedisSock *redis_sock; @@ -831,7 +839,7 @@ PHPAPI RedisSock* redis_sock_create(char *host, int host_len, unsigned short por redis_sock->status = REDIS_SOCK_STATUS_DISCONNECTED; redis_sock->watching = 0; redis_sock->dbNumber = 0; - + redis_sock->retry_interval = retry_interval * 1000; redis_sock->persistent = persistent; if(persistent_id) { @@ -19,7 +19,7 @@ PHPAPI void redis_string_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis PHPAPI void redis_ping_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHPAPI void redis_info_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); PHPAPI void redis_type_response(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock, zval *z_tab, void *ctx); -PHPAPI RedisSock* redis_sock_create(char *host, int host_len, unsigned short port, double timeout, int persistent, char *persistent_id); +PHPAPI RedisSock* redis_sock_create(char *host, int host_len, unsigned short port, double timeout, int persistent, char *persistent_id, long retry_interval); PHPAPI int redis_sock_connect(RedisSock *redis_sock TSRMLS_DC); PHPAPI int redis_sock_server_open(RedisSock *redis_sock, int force_connect TSRMLS_DC); PHPAPI int redis_sock_disconnect(RedisSock *redis_sock TSRMLS_DC); @@ -520,7 +520,7 @@ PHP_METHOD(Redis,__destruct) { } } -/* {{{ proto boolean Redis::connect(string host, int port [, double timeout]) +/* {{{ proto boolean Redis::connect(string host, int port [, double timeout [, long retry_interval]]) */ PHP_METHOD(Redis, connect) { @@ -556,6 +556,7 @@ PHPAPI int redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) { int host_len, id; char *host = NULL; long port = -1; + long retry_interval = 0; char *persistent_id = NULL; int persistent_id_len = -1; @@ -568,9 +569,10 @@ PHPAPI int redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) { persistent = 0; #endif - if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os|lds", + if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Os|ldsl", &object, redis_ce, &host, &host_len, &port, - &timeout, &persistent_id, &persistent_id_len) == FAILURE) { + &timeout, &persistent_id, &persistent_id_len, + &retry_interval) == FAILURE) { return FAILURE; } @@ -579,6 +581,11 @@ PHPAPI int redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) { return FAILURE; } + if (retry_interval < 0L || retry_interval > INT_MAX) { + zend_throw_exception(redis_exception_ce, "Invalid retry interval", 0 TSRMLS_CC); + return FAILURE; + } + if(port == -1 && host_len && host[0] != '/') { /* not unix socket, set to default value */ port = 6379; } @@ -595,7 +602,7 @@ PHPAPI int redis_connect(INTERNAL_FUNCTION_PARAMETERS, int persistent) { zend_clear_exception(TSRMLS_C); /* clear exception triggered by non-existent socket during connect(). */ } - redis_sock = redis_sock_create(host, host_len, port, timeout, persistent, persistent_id); + redis_sock = redis_sock_create(host, host_len, port, timeout, persistent, persistent_id, retry_interval); if (redis_sock_server_open(redis_sock, 1 TSRMLS_CC) < 0) { redis_free_socket(redis_sock); diff --git a/redis_array.c b/redis_array.c index d0460f10..be95c50d 100644 --- a/redis_array.c +++ b/redis_array.c @@ -51,6 +51,7 @@ zend_function_entry redis_array_functions[] = { PHP_ME(RedisArray, _rehash, NULL, ZEND_ACC_PUBLIC) /* special implementation for a few functions */ + PHP_ME(RedisArray, select, NULL, ZEND_ACC_PUBLIC) PHP_ME(RedisArray, info, NULL, ZEND_ACC_PUBLIC) PHP_ME(RedisArray, ping, NULL, ZEND_ACC_PUBLIC) PHP_ME(RedisArray, mget, NULL, ZEND_ACC_PUBLIC) @@ -192,6 +193,7 @@ PHP_METHOD(RedisArray, __construct) RedisArray *ra = NULL; zend_bool b_index = 0, b_autorehash = 0; HashTable *hPrev = NULL, *hOpts = NULL; + long l_retry_interval = 0; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z|a", &z0, &z_opts) == FAILURE) { RETURN_FALSE; @@ -232,6 +234,19 @@ PHP_METHOD(RedisArray, __construct) if(FAILURE != zend_hash_find(hOpts, "autorehash", sizeof("autorehash"), (void**)&zpData) && Z_TYPE_PP(zpData) == IS_BOOL) { b_autorehash = Z_BVAL_PP(zpData); } + + /* extract retry_interval option. */ + zval **z_retry_interval_pp; + if (FAILURE != zend_hash_find(hOpts, "retry_interval", sizeof("retry_interval"), (void**)&z_retry_interval_pp)) { + if (Z_TYPE_PP(z_retry_interval_pp) == IS_LONG || Z_TYPE_PP(z_retry_interval_pp) == IS_STRING) { + if (Z_TYPE_PP(z_retry_interval_pp) == IS_LONG) { + l_retry_interval = Z_LVAL_PP(z_retry_interval_pp); + } + else { + l_retry_interval = atol(Z_STRVAL_PP(z_retry_interval_pp)); + } + } + } } /* extract either name of list of hosts from z0 */ @@ -241,7 +256,7 @@ PHP_METHOD(RedisArray, __construct) break; case IS_ARRAY: - ra = ra_make_array(Z_ARRVAL_P(z0), z_fun, z_dist, hPrev, b_index TSRMLS_CC); + ra = ra_make_array(Z_ARRVAL_P(z0), z_fun, z_dist, hPrev, b_index, l_retry_interval TSRMLS_CC); break; default: @@ -688,6 +703,46 @@ PHP_METHOD(RedisArray, setOption) efree(z_args[0]); efree(z_args[1]); } + +PHP_METHOD(RedisArray, select) +{ + zval *object, z_fun, *z_tmp, *z_args[2]; + int i; + RedisArray *ra; + long opt; + + if (zend_parse_method_parameters(ZEND_NUM_ARGS() TSRMLS_CC, getThis(), "Ol", + &object, redis_array_ce, &opt) == FAILURE) { + RETURN_FALSE; + } + + if (redis_array_get(object, &ra TSRMLS_CC) < 0) { + RETURN_FALSE; + } + + /* prepare call */ + ZVAL_STRING(&z_fun, "select", 0); + + /* copy args */ + MAKE_STD_ZVAL(z_args[0]); + ZVAL_LONG(z_args[0], opt); + + array_init(return_value); + for(i = 0; i < ra->count; ++i) { + + MAKE_STD_ZVAL(z_tmp); + + /* Call each node in turn */ + call_user_function(&redis_ce->function_table, &ra->redis[i], + &z_fun, z_tmp, 1, z_args TSRMLS_CC); + + add_assoc_zval(return_value, ra->hosts[i], z_tmp); + } + + /* cleanup */ + efree(z_args[0]); +} + #define HANDLE_MULTI_EXEC(cmd) do {\ if (redis_array_get(getThis(), &ra TSRMLS_CC) >= 0 && ra->z_multi_exec) {\ int i, num_varargs;\ diff --git a/redis_array.h b/redis_array.h index bc7fdd88..b2c7d86a 100644 --- a/redis_array.h +++ b/redis_array.h @@ -15,6 +15,7 @@ PHP_METHOD(RedisArray, _function); PHP_METHOD(RedisArray, _distributor); PHP_METHOD(RedisArray, _rehash); +PHP_METHOD(RedisArray, select); PHP_METHOD(RedisArray, info); PHP_METHOD(RedisArray, ping); PHP_METHOD(RedisArray, mget); diff --git a/redis_array_impl.c b/redis_array_impl.c index d5370c82..c38e2fe6 100644 --- a/redis_array_impl.c +++ b/redis_array_impl.c @@ -29,7 +29,7 @@ extern int le_redis_sock; extern zend_class_entry *redis_ce; RedisArray* -ra_load_hosts(RedisArray *ra, HashTable *hosts TSRMLS_DC) +ra_load_hosts(RedisArray *ra, HashTable *hosts, long retry_interval TSRMLS_DC) { int i, host_len, id; int count = zend_hash_num_elements(hosts); @@ -67,7 +67,7 @@ ra_load_hosts(RedisArray *ra, HashTable *hosts TSRMLS_DC) call_user_function(&redis_ce->function_table, &ra->redis[i], &z_cons, &z_ret, 0, NULL TSRMLS_CC); /* create socket */ - redis_sock = redis_sock_create(host, host_len, port, 0, 0, NULL); /* TODO: persistence? */ + redis_sock = redis_sock_create(host, host_len, port, 0, 0, NULL, retry_interval); /* TODO: persistence? */ /* connect */ redis_sock_server_open(redis_sock, 1 TSRMLS_CC); @@ -158,9 +158,11 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) { zval *z_params_funs, **z_data_pp, *z_fun = NULL, *z_dist = NULL; zval *z_params_index; zval *z_params_autorehash; + zval *z_params_retry_interval; RedisArray *ra = NULL; zend_bool b_index = 0, b_autorehash = 0; + long l_retry_interval = 0; HashTable *hHosts = NULL, *hPrev = NULL; /* find entry */ @@ -223,8 +225,23 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) { } } + /* find retry interval option */ + MAKE_STD_ZVAL(z_params_retry_interval); + array_init(z_params_retry_interval); + sapi_module.treat_data(PARSE_STRING, estrdup(INI_STR("redis.arrays.retryinterval")), z_params_retry_interval TSRMLS_CC); + if (zend_hash_find(Z_ARRVAL_P(z_params_retry_interval), name, strlen(name) + 1, (void **) &z_data_pp) != FAILURE) { + if (Z_TYPE_PP(z_data_pp) == IS_LONG || Z_TYPE_PP(z_data_pp) == IS_STRING) { + if (Z_TYPE_PP(z_data_pp) == IS_LONG) { + l_retry_interval = Z_LVAL_PP(z_data_pp); + } + else { + l_retry_interval = atol(Z_STRVAL_PP(z_data_pp)); + } + } + } + /* create RedisArray object */ - ra = ra_make_array(hHosts, z_fun, z_dist, hPrev, b_index TSRMLS_CC); + ra = ra_make_array(hHosts, z_fun, z_dist, hPrev, b_index, l_retry_interval TSRMLS_CC); ra->auto_rehash = b_autorehash; /* cleanup */ @@ -238,12 +255,14 @@ RedisArray *ra_load_array(const char *name TSRMLS_DC) { efree(z_params_index); zval_dtor(z_params_autorehash); efree(z_params_autorehash); + zval_dtor(z_params_retry_interval); + efree(z_params_retry_interval); return ra; } RedisArray * -ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index TSRMLS_DC) { +ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index, long retry_interval TSRMLS_DC) { int count = zend_hash_num_elements(hosts); @@ -261,10 +280,10 @@ ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev /* init array data structures */ ra_init_function_table(ra); - if(NULL == ra_load_hosts(ra, hosts TSRMLS_CC)) { + if(NULL == ra_load_hosts(ra, hosts, retry_interval TSRMLS_CC)) { return NULL; } - ra->prev = hosts_prev ? ra_make_array(hosts_prev, z_fun, z_dist, NULL, b_index TSRMLS_CC) : NULL; + ra->prev = hosts_prev ? ra_make_array(hosts_prev, z_fun, z_dist, NULL, b_index, retry_interval TSRMLS_CC) : NULL; /* copy function if provided */ if(z_fun) { @@ -1112,7 +1131,7 @@ static void zval_rehash_callback(zend_fcall_info *z_cb, zend_fcall_info_cache *z zval *z_host, *z_count; z_cb->retval_ptr_ptr = &z_ret; - z_cb->params = &z_args; + z_cb->params = (struct _zval_struct ***)&z_args; z_cb->param_count = 2; z_cb->no_separation = 0; diff --git a/redis_array_impl.h b/redis_array_impl.h index 0e00258c..8dd5201b 100644 --- a/redis_array_impl.h +++ b/redis_array_impl.h @@ -5,9 +5,9 @@ #include "common.h" #include "redis_array.h" -RedisArray* ra_load_hosts(RedisArray *ra, HashTable *hosts TSRMLS_DC); +RedisArray *ra_load_hosts(RedisArray *ra, HashTable *hosts, long retry_interval TSRMLS_DC); RedisArray *ra_load_array(const char *name TSRMLS_DC); -RedisArray *ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index TSRMLS_DC); +RedisArray *ra_make_array(HashTable *hosts, zval *z_fun, zval *z_dist, HashTable *hosts_prev, zend_bool b_index, long retry_interval TSRMLS_DC); zval *ra_find_node_by_name(RedisArray *ra, const char *host, int host_len TSRMLS_DC); zval *ra_find_node(RedisArray *ra, const char *key, int key_len, int *out_pos TSRMLS_DC); void ra_init_function_table(RedisArray *ra); diff --git a/redis_session.c b/redis_session.c index 95d90533..009bac51 100644 --- a/redis_session.c +++ b/redis_session.c @@ -206,6 +206,7 @@ PS_OPEN_FUNC(redis) int persistent = 0; int database = -1; char *prefix = NULL, *auth = NULL, *persistent_id = NULL; + long retry_interval = 0; /* translate unix: into file: */ if (!strncmp(save_path+i, "unix:", sizeof("unix:")-1)) { @@ -240,7 +241,6 @@ PS_OPEN_FUNC(redis) convert_to_long_ex(param); weight = Z_LVAL_PP(param); } - if (zend_hash_find(Z_ARRVAL_P(params), "timeout", sizeof("timeout"), (void **) ¶m) != FAILURE) { timeout = atof(Z_STRVAL_PP(param)); } @@ -260,13 +260,10 @@ PS_OPEN_FUNC(redis) convert_to_long_ex(param); database = Z_LVAL_PP(param); } - - /* // not supported yet if (zend_hash_find(Z_ARRVAL_P(params), "retry_interval", sizeof("retry_interval"), (void **) ¶m) != FAILURE) { convert_to_long_ex(param); retry_interval = Z_LVAL_PP(param); } - */ zval_ptr_dtor(¶ms); } @@ -280,9 +277,9 @@ PS_OPEN_FUNC(redis) RedisSock *redis_sock; if(url->host) { - redis_sock = redis_sock_create(url->host, strlen(url->host), url->port, timeout, persistent, persistent_id); + redis_sock = redis_sock_create(url->host, strlen(url->host), url->port, timeout, persistent, persistent_id, retry_interval); } else { /* unix */ - redis_sock = redis_sock_create(url->path, strlen(url->path), 0, timeout, persistent, persistent_id); + redis_sock = redis_sock_create(url->path, strlen(url->path), 0, timeout, persistent, persistent_id, retry_interval); } redis_pool_add(pool, redis_sock, weight, database, prefix, auth TSRMLS_CC); |