diff options
Diffstat (limited to 'src/zabbix_server/lld/lld_worker.c')
-rw-r--r-- | src/zabbix_server/lld/lld_worker.c | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/src/zabbix_server/lld/lld_worker.c b/src/zabbix_server/lld/lld_worker.c new file mode 100644 index 00000000000..b4329cd5104 --- /dev/null +++ b/src/zabbix_server/lld/lld_worker.c @@ -0,0 +1,255 @@ +/* +** Zabbix +** Copyright (C) 2001-2019 Zabbix SIA +** +** This program is free software; you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation; either version 2 of the License, or +** (at your option) any later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software +** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +**/ + +#include "common.h" +#include "db.h" +#include "log.h" +#include "zbxipcservice.h" +#include "zbxself.h" +#include "dbcache.h" +#include "proxy.h" +#include "../events.h" + +#include "lld_worker.h" +#include "lld_protocol.h" + +extern unsigned char process_type, program_type; +extern int server_num, process_num; + +/****************************************************************************** + * * + * Function: lld_register_worker * + * * + * Purpose: registers lld worker with lld manager * + * * + * Parameters: socket - [IN] the connections socket * + * * + ******************************************************************************/ +static void lld_register_worker(zbx_ipc_socket_t *socket) +{ + pid_t ppid; + + ppid = getppid(); + + zbx_ipc_socket_write(socket, ZBX_IPC_LLD_REGISTER, (unsigned char *)&ppid, sizeof(ppid)); +} + +/****************************************************************************** + * * + * Function: lld_process_task * + * * + * Purpose: processes lld task and updates rule state/error in configuration * + * cache and database * + * * + * Parameters: message - [IN] the message with LLD request * + * * + ******************************************************************************/ +static void lld_process_task(zbx_ipc_message_t *message) +{ + const char *__function_name = "lld_process_task"; + + zbx_uint64_t itemid, lastlogsize; + char *value, *error; + zbx_timespec_t ts; + zbx_item_diff_t diff; + DC_ITEM item; + int errcode, mtime; + unsigned char state, meta; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name); + + zbx_lld_deserialize_item_value(message->data, &itemid, &value, &ts, &meta, &lastlogsize, &mtime, &error); + + DCconfig_get_items_by_itemids(&item, &itemid, &errcode, 1); + if (SUCCEED != errcode) + goto out; + + zabbix_log(LOG_LEVEL_DEBUG, "processing discovery rule:" ZBX_FS_UI64, itemid); + + diff.flags = ZBX_FLAGS_ITEM_DIFF_UNSET; + + if (NULL != error || NULL != value) + { + if (NULL == error && SUCCEED == lld_process_discovery_rule(itemid, value, &error)) + state = ITEM_STATE_NORMAL; + else + state = ITEM_STATE_NOTSUPPORTED; + + if (state != item.state) + { + diff.state = state; + diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE; + + if (ITEM_STATE_NORMAL == state) + { + zabbix_log(LOG_LEVEL_WARNING, "discovery rule \"%s:%s\" became supported", + item.host.host, item.key_orig); + + zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_LLDRULE, itemid, &ts, + ITEM_STATE_NORMAL, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL); + } + else + { + zabbix_log(LOG_LEVEL_WARNING, "discovery rule \"%s:%s\" became not supported: %s", + item.host.host, item.key_orig, error); + + zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_LLDRULE, itemid, &ts, + ITEM_STATE_NOTSUPPORTED, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, + error); + } + + zbx_process_events(NULL, NULL); + zbx_clean_events(); + } + + /* with successful LLD processing LLD error will be set to empty string */ + if (NULL != error && 0 != strcmp(error, item.error)) + { + diff.error = error; + diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR; + } + } + + if (0 != meta) + { + if (item.lastlogsize != lastlogsize) + { + diff.lastlogsize = lastlogsize; + diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE; + } + if (item.mtime != mtime) + { + diff.mtime = mtime; + diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME; + } + } + + if (ZBX_FLAGS_ITEM_DIFF_UNSET != diff.flags) + { + zbx_vector_ptr_t diffs; + char *sql = NULL; + size_t sql_alloc = 0, sql_offset = 0; + + zbx_vector_ptr_create(&diffs); + diff.itemid = itemid; + zbx_vector_ptr_append(&diffs, &diff); + + DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset); + zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, &diffs); + DBend_multiple_update(&sql, &sql_alloc, &sql_offset); + DBexecute("%s", sql); + + DCconfig_items_apply_changes(&diffs); + + zbx_vector_ptr_destroy(&diffs); + zbx_free(sql); + } + + DCconfig_clean_items(&item, &errcode, 1); +out: + zbx_free(value); + zbx_free(error); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name); +} + + +ZBX_THREAD_ENTRY(lld_worker_thread, args) +{ +#define STAT_INTERVAL 5 /* if a process is busy and does not sleep then update status not faster than */ + /* once in STAT_INTERVAL seconds */ + + char *error = NULL; + zbx_ipc_socket_t lld_socket; + zbx_ipc_message_t message; + double time_stat, time_idle = 0, time_now, time_read; + zbx_uint64_t processed_num = 0; + + process_type = ((zbx_thread_args_t *)args)->process_type; + server_num = ((zbx_thread_args_t *)args)->server_num; + process_num = ((zbx_thread_args_t *)args)->process_num; + + zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type), + server_num, get_process_type_string(process_type), process_num); + + zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type)); + + zbx_ipc_message_init(&message); + + if (FAIL == zbx_ipc_socket_open(&lld_socket, ZBX_IPC_SERVICE_LLD, 10, &error)) + { + zabbix_log(LOG_LEVEL_CRIT, "cannot connect to lld manager service: %s", error); + zbx_free(error); + exit(EXIT_FAILURE); + } + + lld_register_worker(&lld_socket); + + time_stat = zbx_time(); + + + DBconnect(ZBX_DB_CONNECT_NORMAL); + + zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num); + + update_selfmon_counter(ZBX_PROCESS_STATE_BUSY); + + for (;;) + { + time_now = zbx_time(); + + if (STAT_INTERVAL < time_now - time_stat) + { + zbx_setproctitle("%s #%d [processed " ZBX_FS_UI64 " LLD rules, idle " ZBX_FS_DBL " sec during " + ZBX_FS_DBL " sec]", get_process_type_string(process_type), process_num, + processed_num, time_idle, time_now - time_stat); + + time_stat = time_now; + time_idle = 0; + processed_num = 0; + } + + update_selfmon_counter(ZBX_PROCESS_STATE_IDLE); + if (SUCCEED != zbx_ipc_socket_read(&lld_socket, &message)) + { + zabbix_log(LOG_LEVEL_CRIT, "cannot read LLD manager service request"); + exit(EXIT_FAILURE); + } + update_selfmon_counter(ZBX_PROCESS_STATE_BUSY); + + time_read = zbx_time(); + time_idle += time_read - time_now; + zbx_update_env(time_read); + + switch (message.code) + { + case ZBX_IPC_LLD_TASK: + lld_process_task(&message); + zbx_ipc_socket_write(&lld_socket, ZBX_IPC_LLD_DONE, NULL, 0); + processed_num++; + break; + } + + zbx_ipc_message_clean(&message); + } + + DBclose(); + + zbx_ipc_socket_close(&lld_socket); +} |