diff options
Diffstat (limited to 'src/zabbix_server/lld/lld_manager.c')
-rw-r--r-- | src/zabbix_server/lld/lld_manager.c | 577 |
1 files changed, 577 insertions, 0 deletions
diff --git a/src/zabbix_server/lld/lld_manager.c b/src/zabbix_server/lld/lld_manager.c new file mode 100644 index 00000000000..acbd3b49fa0 --- /dev/null +++ b/src/zabbix_server/lld/lld_manager.c @@ -0,0 +1,577 @@ +/* +** Zabbix +** Copyright (C) 2001-2019 Zabbix SIA +** +** This program is free software; you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation; either version 2 of the License, or +** (at your option) any later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software +** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +**/ + +#include "common.h" + +#include "zbxself.h" +#include "log.h" +#include "zbxipcservice.h" +#include "lld_manager.h" +#include "lld_protocol.h" + +extern unsigned char process_type, program_type; +extern int server_num, process_num; + +extern int CONFIG_LLDWORKER_FORKS; + +/* + * The LLD queue is organized as a queue (rule_queue binary heap) of LLD rules, + * sorted by their oldest value timestamps. The values are stored in linked lists, + * each rule having its own list of values. Values inside list are not sorted, so + * in the case a LLD rule received a value with past timestamp, it will be processed + * in queuing order, not the value chronological order. + * + * During processing the rule with oldest value is popped from queue and sent + * to a free worker. After processing the rule worker sends done response and + * manager removes the oldest value from rule's value list. If there are no more + * values in the list the rule is removed from the index (rule_index hashset), + * otherwise the rule is enqueued back in LLD queue. + * + */ + +typedef struct zbx_lld_value +{ + char *value; + char *error; + zbx_timespec_t ts; + + zbx_uint64_t lastlogsize; + int mtime; + unsigned char meta; + + struct zbx_lld_value *next; +} +zbx_lld_data_t; + +/* queue of values for one LLD rule */ +typedef struct +{ + /* the LLD rule id */ + zbx_uint64_t itemid; + + /* the oldest value in queue */ + zbx_lld_data_t *tail; + + /* the newest value in queue */ + zbx_lld_data_t *head; +} +zbx_lld_rule_t; + +typedef struct +{ + /* workers vector, created during manager initialization */ + zbx_vector_ptr_t workers; + + /* free workers */ + zbx_queue_ptr_t free_workers; + + /* workers indexed by IPC service clients */ + zbx_hashset_t workers_client; + + /* the next worker index to be assigned to new IPC service clients */ + int next_worker_index; + + /* index of queued LLD rules */ + zbx_hashset_t rule_index; + + /* LLD rule queue, ordered by the oldest values */ + zbx_binary_heap_t rule_queue; + + /* the number of queued LLD rules */ + zbx_uint64_t queued_num; + +} +zbx_lld_manager_t; + +typedef struct +{ + zbx_ipc_client_t *client; + zbx_lld_rule_t *rule; +} +zbx_lld_worker_t; + +/* workers_client hashset support */ +static zbx_hash_t worker_hash_func(const void *d) +{ + const zbx_lld_worker_t *worker = *(const zbx_lld_worker_t **)d; + + zbx_hash_t hash = ZBX_DEFAULT_PTR_HASH_FUNC(&worker->client); + + return hash; +} + +static int worker_compare_func(const void *d1, const void *d2) +{ + const zbx_lld_worker_t *p1 = *(const zbx_lld_worker_t **)d1; + const zbx_lld_worker_t *p2 = *(const zbx_lld_worker_t **)d2; + + ZBX_RETURN_IF_NOT_EQUAL(p1->client, p2->client); + return 0; +} + +/* rule_queue binary heap support */ +static int rule_elem_compare_func(const void *d1, const void *d2) +{ + const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1; + const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2; + + const zbx_lld_rule_t *rule1 = (const zbx_lld_rule_t *)e1->data; + const zbx_lld_rule_t *rule2 = (const zbx_lld_rule_t *)e2->data; + + /* compare by timestamp of the oldest value */ + return zbx_timespec_compare(&rule1->head->ts, &rule2->head->ts); +} + +/****************************************************************************** + * * + * Function: lld_data_free * + * * + * Purpose: frees LLD data * + * * + ******************************************************************************/ +static void lld_data_free(zbx_lld_data_t *data) +{ + zbx_free(data->value); + zbx_free(data->error); + zbx_free(data); +} + +/****************************************************************************** + * * + * Function: lld_rule_clear * + * * + * Purpose: clears LLD rule * + * * + ******************************************************************************/ +static void lld_rule_clear(zbx_lld_rule_t *rule) +{ + zbx_lld_data_t *data; + + while (NULL != rule->head) + { + data = rule->head; + rule->head = data->next; + lld_data_free(data); + } +} + +/****************************************************************************** + * * + * Function: lld_worker_free * + * * + * Purpose: frees LLD worker * + * * + ******************************************************************************/ +static void lld_worker_free(zbx_lld_worker_t *worker) +{ + zbx_free(worker); +} + +/****************************************************************************** + * * + * Function: lld_manager_init * + * * + * Purpose: initializes LLD manager * + * * + * Parameters: manager - [IN] the manager to initialize * + * * + ******************************************************************************/ +static void lld_manager_init(zbx_lld_manager_t *manager) +{ + const char *__function_name = "lld_init"; + int i; + zbx_lld_worker_t *worker; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers:%d", __function_name, CONFIG_LLDWORKER_FORKS); + + zbx_vector_ptr_create(&manager->workers); + zbx_queue_ptr_create(&manager->free_workers); + zbx_hashset_create(&manager->workers_client, 0, worker_hash_func, worker_compare_func); + + zbx_hashset_create_ext(&manager->rule_index, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, + (zbx_clean_func_t)lld_rule_clear, + ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC); + + zbx_binary_heap_create(&manager->rule_queue, rule_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY); + + manager->next_worker_index = 0; + + for (i = 0; i < CONFIG_LLDWORKER_FORKS; i++) + { + worker = (zbx_lld_worker_t *)zbx_malloc(NULL, sizeof(zbx_lld_worker_t)); + + worker->client = NULL; + + zbx_vector_ptr_append(&manager->workers, worker); + } + + manager->queued_num = 0; + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); +} + +/****************************************************************************** + * * + * Function: lld_manager_destroy * + * * + * Purpose: destroys LLD manager * + * * + * Parameters: manager - [IN] the manager to destroy * + * * + ******************************************************************************/ +static void lld_manager_destroy(zbx_lld_manager_t *manager) +{ + zbx_binary_heap_destroy(&manager->rule_queue); + zbx_hashset_destroy(&manager->rule_index); + zbx_queue_ptr_destroy(&manager->free_workers); + zbx_hashset_destroy(&manager->workers_client); + zbx_vector_ptr_clear_ext(&manager->workers, (zbx_clean_func_t)lld_worker_free); + zbx_vector_ptr_destroy(&manager->workers); +} + +/****************************************************************************** + * * + * Function: lld_get_worker_by_client * + * * + * Purpose: returns worker by connected IPC client data * + * * + * Parameters: manager - [IN] the manager * + * client - [IN] the connected worker * + * * + * Return value: The LLD worker * + * * + ******************************************************************************/ +static zbx_lld_worker_t *lld_get_worker_by_client(zbx_lld_manager_t *manager, zbx_ipc_client_t *client) +{ + zbx_lld_worker_t **worker, worker_local, *plocal = &worker_local; + + plocal->client = client; + worker = (zbx_lld_worker_t **)zbx_hashset_search(&manager->workers_client, &plocal); + + if (NULL == worker) + { + THIS_SHOULD_NEVER_HAPPEN; + exit(EXIT_FAILURE); + } + + return *worker; +} + +/****************************************************************************** + * * + * Function: lld_register_worker * + * * + * Purpose: registers worker * + * * + * Parameters: manager - [IN] the manager * + * client - [IN] the connected worker IPC client data * + * message - [IN] the received message * + * * + ******************************************************************************/ +static void lld_register_worker(zbx_lld_manager_t *manager, zbx_ipc_client_t *client, + const zbx_ipc_message_t *message) +{ + const char *__function_name = "lld_register_worker"; + zbx_lld_worker_t *worker; + pid_t ppid; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name); + + memcpy(&ppid, message->data, sizeof(ppid)); + + if (ppid != getppid()) + { + zbx_ipc_client_close(client); + zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process"); + } + else + { + if (manager->next_worker_index == manager->workers.values_num) + { + THIS_SHOULD_NEVER_HAPPEN; + exit(EXIT_FAILURE); + } + + worker = (zbx_lld_worker_t *)manager->workers.values[manager->next_worker_index++]; + worker->client = client; + + zbx_hashset_insert(&manager->workers_client, &worker, sizeof(zbx_lld_worker_t *)); + zbx_queue_ptr_push(&manager->free_workers, worker); + } + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); +} + +/****************************************************************************** + * * + * Function: lld_queue_rule * + * * + * Purpose: queues LLD rule * + * * + * Parameters: manager - [IN] the LLD manager * + * rule - [IN] the LLD rule * + * * + ******************************************************************************/ +static void lld_queue_rule(zbx_lld_manager_t *manager, zbx_lld_rule_t *rule) +{ + zbx_binary_heap_elem_t elem = {rule->itemid, rule}; + + zbx_binary_heap_insert(&manager->rule_queue, &elem); +} + +/****************************************************************************** + * * + * Function: lld_queue_request * + * * + * Purpose: queues low level discovery request * + * * + * Parameters: manager - [IN] the LLD manager * + * message - [IN] the message with LLD request * + * * + ******************************************************************************/ +static void lld_queue_request(zbx_lld_manager_t *manager, const zbx_ipc_message_t *message) +{ + const char *__function_name = "lld_queue_request"; + + zbx_uint64_t itemid; + zbx_lld_rule_t *rule; + zbx_lld_data_t *data; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name); + + data = (zbx_lld_data_t *)zbx_malloc(NULL, sizeof(zbx_lld_data_t)); + data->next = NULL; + zbx_lld_deserialize_item_value(message->data, &itemid, &data->value, &data->ts, &data->meta, &data->lastlogsize, + &data->mtime, &data->error); + + zabbix_log(LOG_LEVEL_DEBUG, "queuing discovery rule:" ZBX_FS_UI64, itemid); + + if (NULL == (rule = zbx_hashset_search(&manager->rule_index, &itemid))) + { + zbx_lld_rule_t rule_local = {itemid, data, data}; + + rule = zbx_hashset_insert(&manager->rule_index, &rule_local, sizeof(rule_local)); + lld_queue_rule(manager, rule); + } + else + { + rule->tail->next = data; + rule->tail = data; + } + + manager->queued_num++; + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); +} + +/****************************************************************************** + * * + * Function: lld_process_next_request * + * * + * Purpose: processes next LLD request from queue * + * * + * Parameters: manager - [IN] the LLD manager * + * worker - [IN] the target worker * + * * + ******************************************************************************/ +static void lld_process_next_request(zbx_lld_manager_t *manager, zbx_lld_worker_t *worker) +{ + zbx_binary_heap_elem_t *elem; + unsigned char *buf; + zbx_uint32_t buf_len; + zbx_lld_data_t *data; + + elem = zbx_binary_heap_find_min(&manager->rule_queue); + worker->rule = (zbx_lld_rule_t *)elem->data; + zbx_binary_heap_remove_min(&manager->rule_queue); + + data = worker->rule->head; + buf_len = zbx_lld_serialize_item_value(&buf, worker->rule->itemid, data->value, &data->ts, data->meta, + data->lastlogsize, data->mtime, data->error); + zbx_ipc_client_send(worker->client, ZBX_IPC_LLD_TASK, buf, buf_len); + zbx_free(buf); +} + +/****************************************************************************** + * * + * Function: lld_process_queue * + * * + * Purpose: sends queued LLD rules to free workers * + * * + * Parameters: manager - [IN] the LLD manager * + * * + ******************************************************************************/ +static void lld_process_queue(zbx_lld_manager_t *manager) +{ + zbx_lld_worker_t *worker; + + while (SUCCEED != zbx_binary_heap_empty(&manager->rule_queue)) + { + if (NULL == (worker = zbx_queue_ptr_pop(&manager->free_workers))) + break; + + lld_process_next_request(manager, worker); + } +} + +/****************************************************************************** + * * + * Function: lld_process_result * + * * + * Purpose: processes LLD worker 'done' response * + * * + * Parameters: manager - [IN] the LLD manager * + * Parameters: client - [IN] the worker's IPC client connection * + * * + ******************************************************************************/ +static void lld_process_result(zbx_lld_manager_t *manager, zbx_ipc_client_t *client) +{ + const char *__function_name = "lld_process_result"; + + zbx_lld_worker_t *worker; + zbx_lld_rule_t *rule; + zbx_lld_data_t *data; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name); + + worker = lld_get_worker_by_client(manager, client); + + zabbix_log(LOG_LEVEL_DEBUG, "discovery rule:" ZBX_FS_UI64 " has been processed", worker->rule->itemid); + + rule = worker->rule; + worker->rule = NULL; + + data = rule->head; + rule->head = rule->head->next; + + if (NULL == rule->head) + zbx_hashset_remove_direct(&manager->rule_index, rule); + else + lld_queue_rule(manager, rule); + + lld_data_free(data); + + if (SUCCEED != zbx_binary_heap_empty(&manager->rule_queue)) + lld_process_next_request(manager, worker); + else + zbx_queue_ptr_push(&manager->free_workers, worker); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); +} + +/****************************************************************************** + * * + * Function: lld_manager_thread * + * * + * Purpose: main processing loop * + * * + ******************************************************************************/ +ZBX_THREAD_ENTRY(lld_manager_thread, args) +{ +#define STAT_INTERVAL 5 /* if a process is busy and does not sleep then update status not faster than */ + /* once in STAT_INTERVAL seconds */ + + zbx_ipc_service_t lld_service; + char *error = NULL; + zbx_ipc_client_t *client; + zbx_ipc_message_t *message; + double time_stat, time_now, sec; + zbx_lld_manager_t manager; + zbx_uint64_t processed_num = 0; + + process_type = ((zbx_thread_args_t *)args)->process_type; + server_num = ((zbx_thread_args_t *)args)->server_num; + process_num = ((zbx_thread_args_t *)args)->process_num; + + zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num); + + zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type), + server_num, get_process_type_string(process_type), process_num); + + if (FAIL == zbx_ipc_service_start(&lld_service, ZBX_IPC_SERVICE_LLD, &error)) + { + zabbix_log(LOG_LEVEL_CRIT, "cannot start LLD manager service: %s", error); + zbx_free(error); + exit(EXIT_FAILURE); + } + + lld_manager_init(&manager); + + /* initialize statistics */ + time_stat = zbx_time(); + + zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num); + + update_selfmon_counter(ZBX_PROCESS_STATE_BUSY); + + for (;;) + { + time_now = zbx_time(); + + if (STAT_INTERVAL < time_now - time_stat) + { + zbx_setproctitle("%s #%d [processed " ZBX_FS_UI64 " LLD rules during " ZBX_FS_DBL " sec]", + get_process_type_string(process_type), process_num, processed_num, + time_now - time_stat); + + time_stat = time_now; + processed_num = 0; + } + + update_selfmon_counter(ZBX_PROCESS_STATE_IDLE); + zbx_ipc_service_recv(&lld_service, 1, &client, &message); + update_selfmon_counter(ZBX_PROCESS_STATE_BUSY); + + sec = zbx_time(); + zbx_update_env(sec); + + if (NULL != message) + { + switch (message->code) + { + case ZBX_IPC_LLD_REGISTER: + lld_register_worker(&manager, client, message); + break; + case ZBX_IPC_LLD_REQUEST: + lld_queue_request(&manager, message); + lld_process_queue(&manager); + break; + case ZBX_IPC_LLD_DONE: + lld_process_result(&manager, client); + processed_num++; + manager.queued_num--; + break; + case ZBX_IPC_LLD_QUEUE: + zbx_ipc_client_send(client, message->code, (unsigned char *)&manager.queued_num, + sizeof(zbx_uint64_t)); + break; + } + + zbx_ipc_message_free(message); + } + + if (NULL != client) + zbx_ipc_client_release(client); + } + + zbx_ipc_service_close(&lld_service); + lld_manager_destroy(&manager); + + return 0; +} |