diff options
Diffstat (limited to 'src/libs/zbxcachehistory/dbcache.c')
-rw-r--r-- | src/libs/zbxcachehistory/dbcache.c | 5195 |
1 files changed, 5195 insertions, 0 deletions
diff --git a/src/libs/zbxcachehistory/dbcache.c b/src/libs/zbxcachehistory/dbcache.c new file mode 100644 index 00000000000..d90d0cb7a50 --- /dev/null +++ b/src/libs/zbxcachehistory/dbcache.c @@ -0,0 +1,5195 @@ +/* +** Zabbix +** Copyright (C) 2001-2022 Zabbix SIA +** +** This program is free software; you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation; either version 2 of the License, or +** (at your option) any later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software +** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +**/ + +#include "zbxcachehistory.h" +#include "zbxcachevalue.h" + +#include "log.h" +#include "zbxmutexs.h" +#include "zbxserver.h" +#include "events.h" +#include "zbxmodules.h" +#include "module.h" +#include "zbxexport.h" +#include "zbxnix.h" +#include "zbxavailability.h" +#include "zbxtrends.h" +#include "zbxnum.h" +#include "zbxsysinfo.h" + +static zbx_shmem_info_t *hc_index_mem = NULL; +static zbx_shmem_info_t *hc_mem = NULL; +static zbx_shmem_info_t *trend_mem = NULL; + +#define LOCK_CACHE zbx_mutex_lock(cache_lock) +#define UNLOCK_CACHE zbx_mutex_unlock(cache_lock) +#define LOCK_TRENDS zbx_mutex_lock(trends_lock) +#define UNLOCK_TRENDS zbx_mutex_unlock(trends_lock) +#define LOCK_CACHE_IDS zbx_mutex_lock(cache_ids_lock) +#define UNLOCK_CACHE_IDS zbx_mutex_unlock(cache_ids_lock) + +static zbx_mutex_t cache_lock = ZBX_MUTEX_NULL; +static zbx_mutex_t trends_lock = ZBX_MUTEX_NULL; +static zbx_mutex_t cache_ids_lock = ZBX_MUTEX_NULL; + +static char *sql = NULL; +static size_t sql_alloc = 4 * ZBX_KIBIBYTE; + +extern unsigned char program_type; +extern int CONFIG_DOUBLE_PRECISION; +extern char *CONFIG_EXPORT_DIR; + +#define ZBX_IDS_SIZE 10 + +#define ZBX_HC_ITEMS_INIT_SIZE 1000 + +#define ZBX_TRENDS_CLEANUP_TIME ((SEC_PER_HOUR * 55) / 60) + +/* the maximum time spent synchronizing history */ +#define ZBX_HC_SYNC_TIME_MAX 10 + +/* the maximum number of items in one synchronization batch */ +#define ZBX_HC_SYNC_MAX 1000 +#define ZBX_HC_TIMER_MAX (ZBX_HC_SYNC_MAX / 2) +#define ZBX_HC_TIMER_SOFT_MAX (ZBX_HC_TIMER_MAX - 10) + +/* the minimum processed item percentage of item candidates to continue synchronizing */ +#define ZBX_HC_SYNC_MIN_PCNT 10 + +/* the maximum number of characters for history cache values */ +#define ZBX_HISTORY_VALUE_LEN (1024 * 64) + +#define ZBX_DC_FLAGS_NOT_FOR_HISTORY (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOHISTORY) +#define ZBX_DC_FLAGS_NOT_FOR_TRENDS (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOTRENDS) +#define ZBX_DC_FLAGS_NOT_FOR_MODULES (ZBX_DC_FLAGS_NOT_FOR_HISTORY | ZBX_DC_FLAG_LLD) +#define ZBX_DC_FLAGS_NOT_FOR_EXPORT (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF) + +#define ZBX_HC_PROXYQUEUE_STATE_NORMAL 0 +#define ZBX_HC_PROXYQUEUE_STATE_WAIT 1 + +typedef struct +{ + char table_name[ZBX_TABLENAME_LEN_MAX]; + zbx_uint64_t lastid; +} +ZBX_DC_ID; + +typedef struct +{ + ZBX_DC_ID id[ZBX_IDS_SIZE]; +} +ZBX_DC_IDS; + +static ZBX_DC_IDS *ids = NULL; + +typedef struct +{ + zbx_list_t list; + zbx_hashset_t index; + int state; +} +zbx_hc_proxyqueue_t; + +typedef struct +{ + zbx_hashset_t trends; + ZBX_DC_STATS stats; + + zbx_hashset_t history_items; + zbx_binary_heap_t history_queue; + + int history_num; + int trends_num; + int trends_last_cleanup_hour; + int history_num_total; + int history_progress_ts; + + unsigned char db_trigger_queue_lock; + + zbx_hc_proxyqueue_t proxyqueue; + int proxy_history_count; +} +ZBX_DC_CACHE; + +static ZBX_DC_CACHE *cache = NULL; + +/* local history cache */ +#define ZBX_MAX_VALUES_LOCAL 256 +#define ZBX_STRUCT_REALLOC_STEP 8 +#define ZBX_STRING_REALLOC_STEP ZBX_KIBIBYTE + +typedef struct +{ + size_t pvalue; + size_t len; +} +dc_value_str_t; + +typedef struct +{ + double value_dbl; + zbx_uint64_t value_uint; + dc_value_str_t value_str; +} +dc_value_t; + +typedef struct +{ + zbx_uint64_t itemid; + dc_value_t value; + zbx_timespec_t ts; + dc_value_str_t source; /* for log items only */ + zbx_uint64_t lastlogsize; + int timestamp; /* for log items only */ + int severity; /* for log items only */ + int logeventid; /* for log items only */ + int mtime; + unsigned char item_value_type; + unsigned char value_type; + unsigned char state; + unsigned char flags; /* see ZBX_DC_FLAG_* above */ +} +dc_item_value_t; + +static char *string_values = NULL; +static size_t string_values_alloc = 0, string_values_offset = 0; +static dc_item_value_t *item_values = NULL; +static size_t item_values_alloc = 0, item_values_num = 0; + +static void hc_add_item_values(dc_item_value_t *values, int values_num); +static void hc_pop_items(zbx_vector_ptr_t *history_items); +static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items); +static void hc_push_items(zbx_vector_ptr_t *history_items); +static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num); +static void hc_queue_item(zbx_hc_item_t *item); +static int hc_queue_elem_compare_func(const void *d1, const void *d2); +static int hc_queue_get_size(void); +static int hc_get_history_compression_age(void); + +ZBX_PTR_VECTOR_DECL(item_tag, zbx_tag_t) +ZBX_PTR_VECTOR_IMPL(item_tag, zbx_tag_t) + +ZBX_PTR_VECTOR_IMPL(tags, zbx_tag_t*) + +/****************************************************************************** + * * + * Purpose: retrieves all internal metrics of the database cache * + * * + * Parameters: stats - [OUT] write cache metrics * + * * + ******************************************************************************/ +void DCget_stats_all(zbx_wcache_info_t *wcache_info) +{ + LOCK_CACHE; + + wcache_info->stats = cache->stats; + wcache_info->history_free = hc_mem->free_size; + wcache_info->history_total = hc_mem->total_size; + wcache_info->index_free = hc_index_mem->free_size; + wcache_info->index_total = hc_index_mem->total_size; + + if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) + { + wcache_info->trend_free = trend_mem->free_size; + wcache_info->trend_total = trend_mem->orig_size; + } + + UNLOCK_CACHE; +} + +/****************************************************************************** + * * + * Purpose: get statistics of the database cache * + * * + ******************************************************************************/ +void *DCget_stats(int request) +{ + static zbx_uint64_t value_uint; + static double value_double; + void *ret; + + LOCK_CACHE; + + switch (request) + { + case ZBX_STATS_HISTORY_COUNTER: + value_uint = cache->stats.history_counter; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_FLOAT_COUNTER: + value_uint = cache->stats.history_float_counter; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_UINT_COUNTER: + value_uint = cache->stats.history_uint_counter; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_STR_COUNTER: + value_uint = cache->stats.history_str_counter; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_LOG_COUNTER: + value_uint = cache->stats.history_log_counter; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_TEXT_COUNTER: + value_uint = cache->stats.history_text_counter; + ret = (void *)&value_uint; + break; + case ZBX_STATS_NOTSUPPORTED_COUNTER: + value_uint = cache->stats.notsupported_counter; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_TOTAL: + value_uint = hc_mem->total_size; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_USED: + value_uint = hc_mem->total_size - hc_mem->free_size; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_FREE: + value_uint = hc_mem->free_size; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_PUSED: + value_double = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size; + ret = (void *)&value_double; + break; + case ZBX_STATS_HISTORY_PFREE: + value_double = 100 * (double)hc_mem->free_size / hc_mem->total_size; + ret = (void *)&value_double; + break; + case ZBX_STATS_TREND_TOTAL: + value_uint = trend_mem->orig_size; + ret = (void *)&value_uint; + break; + case ZBX_STATS_TREND_USED: + value_uint = trend_mem->orig_size - trend_mem->free_size; + ret = (void *)&value_uint; + break; + case ZBX_STATS_TREND_FREE: + value_uint = trend_mem->free_size; + ret = (void *)&value_uint; + break; + case ZBX_STATS_TREND_PUSED: + value_double = 100 * (double)(trend_mem->orig_size - trend_mem->free_size) / + trend_mem->orig_size; + ret = (void *)&value_double; + break; + case ZBX_STATS_TREND_PFREE: + value_double = 100 * (double)trend_mem->free_size / trend_mem->orig_size; + ret = (void *)&value_double; + break; + case ZBX_STATS_HISTORY_INDEX_TOTAL: + value_uint = hc_index_mem->total_size; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_INDEX_USED: + value_uint = hc_index_mem->total_size - hc_index_mem->free_size; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_INDEX_FREE: + value_uint = hc_index_mem->free_size; + ret = (void *)&value_uint; + break; + case ZBX_STATS_HISTORY_INDEX_PUSED: + value_double = 100 * (double)(hc_index_mem->total_size - hc_index_mem->free_size) / + hc_index_mem->total_size; + ret = (void *)&value_double; + break; + case ZBX_STATS_HISTORY_INDEX_PFREE: + value_double = 100 * (double)hc_index_mem->free_size / hc_index_mem->total_size; + ret = (void *)&value_double; + break; + default: + ret = NULL; + } + + UNLOCK_CACHE; + + return ret; +} + +/****************************************************************************** + * * + * Purpose: find existing or add new structure and return pointer * + * * + * Return value: pointer to a trend structure * + * * + ******************************************************************************/ +static ZBX_DC_TREND *DCget_trend(zbx_uint64_t itemid) +{ + ZBX_DC_TREND *ptr, trend; + + if (NULL != (ptr = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &itemid))) + return ptr; + + memset(&trend, 0, sizeof(ZBX_DC_TREND)); + trend.itemid = itemid; + + return (ZBX_DC_TREND *)zbx_hashset_insert(&cache->trends, &trend, sizeof(ZBX_DC_TREND)); +} + +/****************************************************************************** + * * + * Purpose: apply disable_from changes to cache * + * * + ******************************************************************************/ +static void DCupdate_trends(zbx_vector_uint64_pair_t *trends_diff) +{ + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + LOCK_TRENDS; + + for (i = 0; i < trends_diff->values_num; i++) + { + ZBX_DC_TREND *trend; + + if (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &trends_diff->values[i].first))) + trend->disable_from = trends_diff->values[i].second; + } + + UNLOCK_TRENDS; + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: helper function for DCflush trends * + * * + ******************************************************************************/ +static void dc_insert_trends_in_db(ZBX_DC_TREND *trends, int trends_num, unsigned char value_type, + const char *table_name, int clock) +{ + ZBX_DC_TREND *trend; + int i; + zbx_db_insert_t db_insert; + + zbx_db_insert_prepare(&db_insert, table_name, "itemid", "clock", "num", "value_min", "value_avg", + "value_max", NULL); + + for (i = 0; i < trends_num; i++) + { + trend = &trends[i]; + + if (0 == trend->itemid) + continue; + + if (clock != trend->clock || value_type != trend->value_type) + continue; + + if (ITEM_VALUE_TYPE_FLOAT == value_type) + { + zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num, + trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl); + } + else + { + zbx_uint128_t avg; + + /* calculate the trend average value */ + zbx_udiv128_64(&avg, &trend->value_avg.ui64, trend->num); + + zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num, + trend->value_min.ui64, avg.lo, trend->value_max.ui64); + } + + trend->itemid = 0; + } + + zbx_db_insert_execute(&db_insert); + zbx_db_insert_clean(&db_insert); +} + +/****************************************************************************** + * * + * Purpose: Update trends disable_until for items without trends data past or * + * equal the specified clock * + * * + * Comments: A helper function for DCflush trends * + * * + ******************************************************************************/ +static void dc_remove_updated_trends(ZBX_DC_TREND *trends, int trends_num, const char *table_name, + int value_type, zbx_uint64_t *itemids, int *itemids_num, int clock) +{ + int i, j, clocks_num, now, age; + ZBX_DC_TREND *trend; + zbx_uint64_t itemid; + size_t sql_offset; + DB_RESULT result; + DB_ROW row; + int clocks[] = {SEC_PER_DAY, SEC_PER_WEEK, SEC_PER_MONTH, SEC_PER_YEAR, INT_MAX}; + + now = time(NULL); + age = now - clock; + for (clocks_num = 0; age > clocks[clocks_num]; clocks_num++) + clocks[clocks_num] = now - clocks[clocks_num]; + clocks[clocks_num] = clock; + + /* remove itemids with trends data past or equal the clock */ + for (j = 0; j <= clocks_num && 0 < *itemids_num; j++) + { + sql_offset = 0; + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, + "select distinct itemid" + " from %s" + " where clock>=%d and", + table_name, clocks[j]); + + if (0 < j) + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " clock<%d and", clocks[j - 1]); + + DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, *itemids_num); + + result = DBselect("%s", sql); + + while (NULL != (row = DBfetch(result))) + { + ZBX_STR2UINT64(itemid, row[0]); + uint64_array_remove(itemids, itemids_num, &itemid, 1); + } + DBfree_result(result); + } + + /* update trends disable_until for the leftover itemids */ + while (0 != *itemids_num) + { + itemid = itemids[--*itemids_num]; + + for (i = 0; i < trends_num; i++) + { + trend = &trends[i]; + + if (itemid != trend->itemid) + continue; + + if (clock != trend->clock || value_type != trend->value_type) + continue; + + trend->disable_from = clock; + break; + } + } +} + +/****************************************************************************** + * * + * Purpose: helper function for DCflush trends * + * * + ******************************************************************************/ +static void dc_trends_update_float(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset) +{ + history_value_t value_min, value_avg, value_max; + + value_min.dbl = atof(row[2]); + value_avg.dbl = atof(row[3]); + value_max.dbl = atof(row[4]); + + if (value_min.dbl < trend->value_min.dbl) + trend->value_min.dbl = value_min.dbl; + + if (value_max.dbl > trend->value_max.dbl) + trend->value_max.dbl = value_max.dbl; + + trend->value_avg.dbl = trend->value_avg.dbl / (trend->num + num) * trend->num + + value_avg.dbl / (trend->num + num) * num; + trend->num += num; + + zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "update trends set" + " num=%d,value_min=" ZBX_FS_DBL64_SQL ",value_avg=" ZBX_FS_DBL64_SQL + ",value_max=" ZBX_FS_DBL64_SQL + " where itemid=" ZBX_FS_UI64 " and clock=%d;\n", + trend->num, trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl, + trend->itemid, trend->clock); +} + +/****************************************************************************** + * * + * Purpose: helper function for DCflush trends * + * * + ******************************************************************************/ +static void dc_trends_update_uint(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset) +{ + history_value_t value_min, value_avg, value_max; + zbx_uint128_t avg; + + ZBX_STR2UINT64(value_min.ui64, row[2]); + ZBX_STR2UINT64(value_avg.ui64, row[3]); + ZBX_STR2UINT64(value_max.ui64, row[4]); + + if (value_min.ui64 < trend->value_min.ui64) + trend->value_min.ui64 = value_min.ui64; + if (value_max.ui64 > trend->value_max.ui64) + trend->value_max.ui64 = value_max.ui64; + + /* calculate the trend average value */ + zbx_umul64_64(&avg, num, value_avg.ui64); + zbx_uinc128_128(&trend->value_avg.ui64, &avg); + zbx_udiv128_64(&avg, &trend->value_avg.ui64, trend->num + num); + + trend->num += num; + + zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, + "update trends_uint set num=%d,value_min=" ZBX_FS_UI64 ",value_avg=" + ZBX_FS_UI64 ",value_max=" ZBX_FS_UI64 " where itemid=" ZBX_FS_UI64 + " and clock=%d;\n", + trend->num, + trend->value_min.ui64, + avg.lo, + trend->value_max.ui64, + trend->itemid, + trend->clock); +} + +/****************************************************************************** + * * + * Purpose: helper function for DCflush trends * + * * + ******************************************************************************/ +static void dc_trends_fetch_and_update(ZBX_DC_TREND *trends, int trends_num, zbx_uint64_t *itemids, + int itemids_num, int *inserts_num, unsigned char value_type, + const char *table_name, int clock) +{ + + int i, num; + DB_RESULT result; + DB_ROW row; + zbx_uint64_t itemid; + ZBX_DC_TREND *trend; + size_t sql_offset; + + sql_offset = 0; + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, + "select itemid,num,value_min,value_avg,value_max" + " from %s" + " where clock=%d and", + table_name, clock); + + DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, itemids_num); + + result = DBselect("%s order by itemid,clock", sql); + + sql_offset = 0; + zbx_DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset); + + while (NULL != (row = DBfetch(result))) + { + ZBX_STR2UINT64(itemid, row[0]); + + for (i = 0; i < trends_num; i++) + { + trend = &trends[i]; + + if (itemid != trend->itemid) + continue; + + if (clock != trend->clock || value_type != trend->value_type) + continue; + + break; + } + + if (i == trends_num) + { + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + + num = atoi(row[1]); + + if (value_type == ITEM_VALUE_TYPE_FLOAT) + dc_trends_update_float(trend, row, num, &sql_offset); + else + dc_trends_update_uint(trend, row, num, &sql_offset); + + trend->itemid = 0; + + --*inserts_num; + + DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset); + } + + DBfree_result(result); + + zbx_DBend_multiple_update(&sql, &sql_alloc, &sql_offset); + + if (sql_offset > 16) /* In ORACLE always present begin..end; */ + DBexecute("%s", sql); +} + +/****************************************************************************** + * * + * Purpose: flush trend to the database * + * * + ******************************************************************************/ +static void DBflush_trends(ZBX_DC_TREND *trends, int *trends_num, zbx_vector_uint64_pair_t *trends_diff) +{ + int num, i, clock, inserts_num = 0, itemids_alloc, itemids_num = 0, trends_to = *trends_num; + unsigned char value_type; + zbx_uint64_t *itemids = NULL; + ZBX_DC_TREND *trend = NULL; + const char *table_name; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, *trends_num); + + clock = trends[0].clock; + value_type = trends[0].value_type; + + switch (value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + table_name = "trends"; + break; + case ITEM_VALUE_TYPE_UINT64: + table_name = "trends_uint"; + break; + default: + assert(0); + } + + itemids_alloc = MIN(ZBX_HC_SYNC_MAX, *trends_num); + itemids = (zbx_uint64_t *)zbx_malloc(itemids, itemids_alloc * sizeof(zbx_uint64_t)); + + for (i = 0; i < *trends_num; i++) + { + trend = &trends[i]; + + if (clock != trend->clock || value_type != trend->value_type) + continue; + + inserts_num++; + + if (0 != trend->disable_from) + continue; + + uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64); + + if (ZBX_HC_SYNC_MAX == itemids_num) + { + trends_to = i + 1; + break; + } + } + + if (0 != itemids_num) + { + dc_remove_updated_trends(trends, trends_to, table_name, value_type, itemids, + &itemids_num, clock); + } + + for (i = 0; i < trends_to; i++) + { + trend = &trends[i]; + + if (clock != trend->clock || value_type != trend->value_type) + continue; + + if (0 != trend->disable_from && clock >= trend->disable_from) + continue; + + uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64); + } + + if (0 != itemids_num) + { + dc_trends_fetch_and_update(trends, trends_to, itemids, itemids_num, + &inserts_num, value_type, table_name, clock); + } + + zbx_free(itemids); + + /* if 'trends' is not a primary trends buffer */ + if (NULL != trends_diff) + { + /* we update it too */ + for (i = 0; i < trends_to; i++) + { + zbx_uint64_pair_t pair; + + if (0 == trends[i].itemid) + continue; + + if (clock != trends[i].clock || value_type != trends[i].value_type) + continue; + + if (0 == trends[i].disable_from || trends[i].disable_from > clock) + continue; + + pair.first = trends[i].itemid; + pair.second = clock + SEC_PER_HOUR; + zbx_vector_uint64_pair_append(trends_diff, pair); + } + } + + if (0 != inserts_num) + dc_insert_trends_in_db(trends, trends_to, value_type, table_name, clock); + + /* clean trends */ + for (i = 0, num = 0; i < *trends_num; i++) + { + if (0 == trends[i].itemid) + continue; + + memcpy(&trends[num++], &trends[i], sizeof(ZBX_DC_TREND)); + } + *trends_num = num; + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: move trend to the array of trends for flushing to DB * + * * + ******************************************************************************/ +static void DCflush_trend(ZBX_DC_TREND *trend, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num) +{ + if (*trends_num == *trends_alloc) + { + *trends_alloc += 256; + *trends = (ZBX_DC_TREND *)zbx_realloc(*trends, *trends_alloc * sizeof(ZBX_DC_TREND)); + } + + memcpy(&(*trends)[*trends_num], trend, sizeof(ZBX_DC_TREND)); + (*trends_num)++; + + trend->clock = 0; + trend->num = 0; + memset(&trend->value_min, 0, sizeof(history_value_t)); + memset(&trend->value_avg, 0, sizeof(value_avg_t)); + memset(&trend->value_max, 0, sizeof(history_value_t)); +} + +/****************************************************************************** + * * + * Purpose: add new value to the trends * + * * + ******************************************************************************/ +static void DCadd_trend(const ZBX_DC_HISTORY *history, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num) +{ + ZBX_DC_TREND *trend = NULL; + int hour; + + hour = history->ts.sec - history->ts.sec % SEC_PER_HOUR; + + trend = DCget_trend(history->itemid); + + if (trend->num > 0 && (trend->clock != hour || trend->value_type != history->value_type) && + SUCCEED == zbx_history_requires_trends(trend->value_type)) + { + DCflush_trend(trend, trends, trends_alloc, trends_num); + } + + trend->value_type = history->value_type; + trend->clock = hour; + + switch (trend->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + if (trend->num == 0 || history->value.dbl < trend->value_min.dbl) + trend->value_min.dbl = history->value.dbl; + if (trend->num == 0 || history->value.dbl > trend->value_max.dbl) + trend->value_max.dbl = history->value.dbl; + trend->value_avg.dbl += history->value.dbl / (trend->num + 1) - + trend->value_avg.dbl / (trend->num + 1); + break; + case ITEM_VALUE_TYPE_UINT64: + if (trend->num == 0 || history->value.ui64 < trend->value_min.ui64) + trend->value_min.ui64 = history->value.ui64; + if (trend->num == 0 || history->value.ui64 > trend->value_max.ui64) + trend->value_max.ui64 = history->value.ui64; + zbx_uinc128_64(&trend->value_avg.ui64, history->value.ui64); + break; + } + trend->num++; +} + +/****************************************************************************** + * * + * Purpose: update trends cache and get list of trends to flush into database * + * * + * Parameters: history - [IN] array of history data * + * history_num - [IN] number of history structures * + * trends - [OUT] list of trends to flush into database * + * trends_num - [OUT] number of trends * + * compression_age - [IN] history compression age * + * * + ******************************************************************************/ +static void DCmass_update_trends(const ZBX_DC_HISTORY *history, int history_num, ZBX_DC_TREND **trends, + int *trends_num, int compression_age) +{ + static int last_trend_discard = 0; + zbx_timespec_t ts; + int trends_alloc = 0, i, hour, seconds; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + zbx_timespec(&ts); + seconds = ts.sec % SEC_PER_HOUR; + hour = ts.sec - seconds; + + LOCK_TRENDS; + + for (i = 0; i < history_num; i++) + { + const ZBX_DC_HISTORY *h = &history[i]; + + if (0 != (ZBX_DC_FLAGS_NOT_FOR_TRENDS & h->flags)) + continue; + + DCadd_trend(h, trends, &trends_alloc, trends_num); + } + + if (cache->trends_last_cleanup_hour < hour && ZBX_TRENDS_CLEANUP_TIME < seconds) + { + zbx_hashset_iter_t iter; + ZBX_DC_TREND *trend; + + zbx_hashset_iter_reset(&cache->trends, &iter); + + while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter))) + { + if (trend->clock == hour) + continue; + + /* discard trend items that are older than compression age */ + if (0 != compression_age && trend->clock < compression_age) + { + if (SEC_PER_HOUR < (ts.sec - last_trend_discard)) /* log once per hour */ + { + zabbix_log(LOG_LEVEL_TRACE, "discarding trends that are pointing to" + " compressed history period"); + last_trend_discard = ts.sec; + } + } + else if (SUCCEED == zbx_history_requires_trends(trend->value_type)) + DCflush_trend(trend, trends, &trends_alloc, trends_num); + + zbx_hashset_iter_remove(&iter); + } + + cache->trends_last_cleanup_hour = hour; + } + + UNLOCK_TRENDS; + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +static int zbx_trend_compare(const void *d1, const void *d2) +{ + const ZBX_DC_TREND *p1 = (const ZBX_DC_TREND *)d1; + const ZBX_DC_TREND *p2 = (const ZBX_DC_TREND *)d2; + + ZBX_RETURN_IF_NOT_EQUAL(p1->itemid, p2->itemid); + ZBX_RETURN_IF_NOT_EQUAL(p1->clock, p2->clock); + + return 0; +} + +/****************************************************************************** + * * + * Purpose: prepare history data using items from configuration cache * + * * + * Parameters: trends - [IN] trends from cache to be added to database * + * trends_num - [IN] number of trends to add to database * + * trends_diff - [OUT] disable_from updates * + * * + ******************************************************************************/ +static void DBmass_update_trends(const ZBX_DC_TREND *trends, int trends_num, + zbx_vector_uint64_pair_t *trends_diff) +{ + ZBX_DC_TREND *trends_tmp; + + if (0 != trends_num) + { + trends_tmp = (ZBX_DC_TREND *)zbx_malloc(NULL, trends_num * sizeof(ZBX_DC_TREND)); + memcpy(trends_tmp, trends, trends_num * sizeof(ZBX_DC_TREND)); + qsort(trends_tmp, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare); + + while (0 < trends_num) + DBflush_trends(trends_tmp, &trends_num, trends_diff); + + zbx_free(trends_tmp); + } +} + +typedef struct +{ + zbx_uint64_t hostid; + zbx_vector_ptr_t groups; +} +zbx_host_info_t; + +/****************************************************************************** + * * + * Purpose: frees resources allocated to store host groups names * + * * + * Parameters: host_info - [IN] host information * + * * + ******************************************************************************/ +static void zbx_host_info_clean(zbx_host_info_t *host_info) +{ + zbx_vector_ptr_clear_ext(&host_info->groups, zbx_ptr_free); + zbx_vector_ptr_destroy(&host_info->groups); +} + +/****************************************************************************** + * * + * Purpose: get hosts groups names * + * * + * Parameters: hosts_info - [IN/OUT] output names of host groups for a host * + * hostids - [IN] hosts identifiers * + * * + ******************************************************************************/ +static void db_get_hosts_info_by_hostid(zbx_hashset_t *hosts_info, const zbx_vector_uint64_t *hostids) +{ + int i; + size_t sql_offset = 0; + DB_RESULT result; + DB_ROW row; + + for (i = 0; i < hostids->values_num; i++) + { + zbx_host_info_t host_info = {.hostid = hostids->values[i]}; + + zbx_vector_ptr_create(&host_info.groups); + zbx_hashset_insert(hosts_info, &host_info, sizeof(host_info)); + } + + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, + "select distinct hg.hostid,g.name" + " from hstgrp g,hosts_groups hg" + " where g.groupid=hg.groupid" + " and"); + + DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "hg.hostid", hostids->values, hostids->values_num); + + result = DBselect("%s", sql); + + while (NULL != (row = DBfetch(result))) + { + zbx_uint64_t hostid; + zbx_host_info_t *host_info; + + ZBX_DBROW2UINT64(hostid, row[0]); + + if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &hostid))) + { + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + + zbx_vector_ptr_append(&host_info->groups, zbx_strdup(NULL, row[1])); + } + DBfree_result(result); +} + +typedef struct +{ + zbx_uint64_t itemid; + char *name; + DC_ITEM *item; + zbx_vector_item_tag_t item_tags; +} +zbx_item_info_t; + +/****************************************************************************** + * * + * Purpose: get items name and item tags * + * * + * Parameters: items_info - [IN/OUT] output item name and item tags * + * itemids - [IN] the item identifiers * + * * + ******************************************************************************/ +static void db_get_items_info_by_itemid(zbx_hashset_t *items_info, const zbx_vector_uint64_t *itemids) +{ + size_t sql_offset = 0; + DB_RESULT result; + DB_ROW row; + + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemid,name from items where"); + DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num); + + result = DBselect("%s", sql); + + while (NULL != (row = DBfetch(result))) + { + zbx_uint64_t itemid; + zbx_item_info_t *item_info; + + ZBX_DBROW2UINT64(itemid, row[0]); + + if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid))) + { + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + + item_info->name = zbx_strdup(item_info->name, row[1]); + } + DBfree_result(result); + + sql_offset = 0; + zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, + "select itemid,tag,value from item_tag where"); + + DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num); + + result = DBselect("%s", sql); + + while (NULL != (row = DBfetch(result))) + { + zbx_uint64_t itemid; + zbx_item_info_t *item_info; + zbx_tag_t item_tag; + + ZBX_DBROW2UINT64(itemid, row[0]); + + if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid))) + { + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + + item_tag.tag = zbx_strdup(NULL, row[1]); + item_tag.value = zbx_strdup(NULL, row[2]); + zbx_vector_item_tag_append(&item_info->item_tags, item_tag); + } + DBfree_result(result); +} + +/****************************************************************************** + * * + * Purpose: frees resources allocated to store item tag * + * * + * Parameters: item_tag - [IN] item tag * + * * + ******************************************************************************/ +static void item_tag_free(zbx_tag_t item_tag) +{ + zbx_free(item_tag.tag); + zbx_free(item_tag.value); +} + +/****************************************************************************** + * * + * Purpose: frees resources allocated to store item tags and name * + * * + * Parameters: item_info - [IN] item information * + * * + ******************************************************************************/ +static void zbx_item_info_clean(zbx_item_info_t *item_info) +{ + zbx_vector_item_tag_clear_ext(&item_info->item_tags, item_tag_free); + zbx_vector_item_tag_destroy(&item_info->item_tags); + zbx_free(item_info->name); +} + +/****************************************************************************** + * * + * Purpose: export trends * + * * + * Parameters: trends - [IN] trends from cache * + * trends_num - [IN] number of trends * + * hosts_info - [IN] hosts groups names * + * items_info - [IN] item names and tags * + * * + ******************************************************************************/ +static void DCexport_trends(const ZBX_DC_TREND *trends, int trends_num, zbx_hashset_t *hosts_info, + zbx_hashset_t *items_info) +{ + struct zbx_json json; + const ZBX_DC_TREND *trend = NULL; + int i, j; + const DC_ITEM *item; + zbx_host_info_t *host_info; + zbx_item_info_t *item_info; + zbx_uint128_t avg; /* calculate the trend average value */ + + zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN); + + for (i = 0; i < trends_num; i++) + { + trend = &trends[i]; + + if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &trend->itemid))) + continue; + + item = item_info->item; + + if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid))) + { + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + + zbx_json_clean(&json); + + zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST); + zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING); + zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING); + zbx_json_close(&json); + + zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS); + + for (j = 0; j < host_info->groups.values_num; j++) + zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING); + + zbx_json_close(&json); + + zbx_json_addarray(&json, ZBX_PROTO_TAG_ITEM_TAGS); + + for (j = 0; j < item_info->item_tags.values_num; j++) + { + zbx_tag_t item_tag = item_info->item_tags.values[j]; + + zbx_json_addobject(&json, NULL); + zbx_json_addstring(&json, ZBX_PROTO_TAG_TAG, item_tag.tag, ZBX_JSON_TYPE_STRING); + zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, item_tag.value, ZBX_JSON_TYPE_STRING); + zbx_json_close(&json); + } + + zbx_json_close(&json); + zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid); + + if (NULL != item_info->name) + zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING); + + zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, trend->clock); + zbx_json_addint64(&json, ZBX_PROTO_TAG_COUNT, trend->num); + + switch (trend->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + zbx_json_addfloat(&json, ZBX_PROTO_TAG_MIN, trend->value_min.dbl); + zbx_json_addfloat(&json, ZBX_PROTO_TAG_AVG, trend->value_avg.dbl); + zbx_json_addfloat(&json, ZBX_PROTO_TAG_MAX, trend->value_max.dbl); + break; + case ITEM_VALUE_TYPE_UINT64: + zbx_json_adduint64(&json, ZBX_PROTO_TAG_MIN, trend->value_min.ui64); + zbx_udiv128_64(&avg, &trend->value_avg.ui64, trend->num); + zbx_json_adduint64(&json, ZBX_PROTO_TAG_AVG, avg.lo); + zbx_json_adduint64(&json, ZBX_PROTO_TAG_MAX, trend->value_max.ui64); + break; + default: + THIS_SHOULD_NEVER_HAPPEN; + } + + zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, trend->value_type); + zbx_trends_export_write(json.buffer, json.buffer_size); + } + + zbx_trends_export_flush(); + zbx_json_free(&json); +} + +/****************************************************************************** + * * + * Purpose: export history * + * * + * Parameters: history - [IN/OUT] array of history data * + * history_num - [IN] number of history structures * + * hosts_info - [IN] hosts groups names * + * items_info - [IN] item names and tags * + * * + ******************************************************************************/ +static void DCexport_history(const ZBX_DC_HISTORY *history, int history_num, zbx_hashset_t *hosts_info, + zbx_hashset_t *items_info) +{ + const ZBX_DC_HISTORY *h; + const DC_ITEM *item; + int i, j; + zbx_host_info_t *host_info; + zbx_item_info_t *item_info; + struct zbx_json json; + + zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN); + + for (i = 0; i < history_num; i++) + { + h = &history[i]; + + if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags)) + continue; + + if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &h->itemid))) + { + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + + item = item_info->item; + + if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid))) + { + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + + zbx_json_clean(&json); + + zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST); + zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING); + zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING); + zbx_json_close(&json); + + zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS); + + for (j = 0; j < host_info->groups.values_num; j++) + zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING); + + zbx_json_close(&json); + + zbx_json_addarray(&json, ZBX_PROTO_TAG_ITEM_TAGS); + + for (j = 0; j < item_info->item_tags.values_num; j++) + { + zbx_tag_t item_tag = item_info->item_tags.values[j]; + + zbx_json_addobject(&json, NULL); + zbx_json_addstring(&json, ZBX_PROTO_TAG_TAG, item_tag.tag, ZBX_JSON_TYPE_STRING); + zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, item_tag.value, ZBX_JSON_TYPE_STRING); + zbx_json_close(&json); + } + + zbx_json_close(&json); + zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid); + + if (NULL != item_info->name) + zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING); + + zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, h->ts.sec); + zbx_json_addint64(&json, ZBX_PROTO_TAG_NS, h->ts.ns); + + switch (h->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + zbx_json_addfloat(&json, ZBX_PROTO_TAG_VALUE, h->value.dbl); + break; + case ITEM_VALUE_TYPE_UINT64: + zbx_json_adduint64(&json, ZBX_PROTO_TAG_VALUE, h->value.ui64); + break; + case ITEM_VALUE_TYPE_STR: + zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING); + break; + case ITEM_VALUE_TYPE_TEXT: + zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING); + break; + case ITEM_VALUE_TYPE_LOG: + zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGTIMESTAMP, h->value.log->timestamp); + zbx_json_addstring(&json, ZBX_PROTO_TAG_LOGSOURCE, + ZBX_NULL2EMPTY_STR(h->value.log->source), ZBX_JSON_TYPE_STRING); + zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGSEVERITY, h->value.log->severity); + zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGEVENTID, h->value.log->logeventid); + zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.log->value, + ZBX_JSON_TYPE_STRING); + break; + default: + THIS_SHOULD_NEVER_HAPPEN; + } + + zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, h->value_type); + zbx_history_export_write(json.buffer, json.buffer_size); + } + + zbx_history_export_flush(); + zbx_json_free(&json); +} + +/****************************************************************************** + * * + * Purpose: export history and trends * + * * + * Parameters: history - [IN/OUT] array of history data * + * history_num - [IN] number of history structures * + * itemids - [IN] the item identifiers * + * (used for item lookup) * + * items - [IN] the items * + * errcodes - [IN] item error codes * + * trends - [IN] trends from cache * + * trends_num - [IN] number of trends * + * * + ******************************************************************************/ +static void DCexport_history_and_trends(const ZBX_DC_HISTORY *history, int history_num, + const zbx_vector_uint64_t *itemids, DC_ITEM *items, const int *errcodes, const ZBX_DC_TREND *trends, + int trends_num) +{ + int i, index; + zbx_vector_uint64_t hostids, item_info_ids; + zbx_hashset_t hosts_info, items_info; + DC_ITEM *item; + zbx_item_info_t item_info; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d trends_num:%d", __func__, history_num, trends_num); + + zbx_vector_uint64_create(&hostids); + zbx_vector_uint64_create(&item_info_ids); + zbx_hashset_create_ext(&items_info, itemids->values_num, ZBX_DEFAULT_UINT64_HASH_FUNC, + ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_item_info_clean, + ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC); + + for (i = 0; i < history_num; i++) + { + const ZBX_DC_HISTORY *h = &history[i]; + + if (0 != (ZBX_DC_FLAGS_NOT_FOR_EXPORT & h->flags)) + continue; + + if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))) + { + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + + if (SUCCEED != errcodes[index]) + continue; + + item = &items[index]; + + zbx_vector_uint64_append(&hostids, item->host.hostid); + zbx_vector_uint64_append(&item_info_ids, item->itemid); + + item_info.itemid = item->itemid; + item_info.name = NULL; + item_info.item = item; + zbx_vector_item_tag_create(&item_info.item_tags); + zbx_hashset_insert(&items_info, &item_info, sizeof(item_info)); + } + + if (0 == history_num) + { + for (i = 0; i < trends_num; i++) + { + const ZBX_DC_TREND *trend = &trends[i]; + + if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, trend->itemid, + ZBX_DEFAULT_UINT64_COMPARE_FUNC))) + { + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + + if (SUCCEED != errcodes[index]) + continue; + + item = &items[index]; + + zbx_vector_uint64_append(&hostids, item->host.hostid); + zbx_vector_uint64_append(&item_info_ids, item->itemid); + + item_info.itemid = item->itemid; + item_info.name = NULL; + item_info.item = item; + zbx_vector_item_tag_create(&item_info.item_tags); + zbx_hashset_insert(&items_info, &item_info, sizeof(item_info)); + } + } + + if (0 == item_info_ids.values_num) + goto clean; + + zbx_vector_uint64_sort(&item_info_ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); + zbx_vector_uint64_sort(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); + zbx_vector_uint64_uniq(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); + + zbx_hashset_create_ext(&hosts_info, hostids.values_num, ZBX_DEFAULT_UINT64_HASH_FUNC, + ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_host_info_clean, + ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC); + + db_get_hosts_info_by_hostid(&hosts_info, &hostids); + + db_get_items_info_by_itemid(&items_info, &item_info_ids); + + if (0 != history_num) + DCexport_history(history, history_num, &hosts_info, &items_info); + + if (0 != trends_num) + DCexport_trends(trends, trends_num, &hosts_info, &items_info); + + zbx_hashset_destroy(&hosts_info); +clean: + zbx_hashset_destroy(&items_info); + zbx_vector_uint64_destroy(&item_info_ids); + zbx_vector_uint64_destroy(&hostids); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: export all trends * + * * + * Parameters: trends - [IN] trends from cache * + * trends_num - [IN] number of trends * + * * + ******************************************************************************/ +static void DCexport_all_trends(const ZBX_DC_TREND *trends, int trends_num) +{ + DC_ITEM *items; + zbx_vector_uint64_t itemids; + int *errcodes, i, num; + + zabbix_log(LOG_LEVEL_WARNING, "exporting trend data..."); + + while (0 < trends_num) + { + num = MIN(ZBX_HC_SYNC_MAX, trends_num); + + items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)num); + errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)num); + + zbx_vector_uint64_create(&itemids); + zbx_vector_uint64_reserve(&itemids, num); + + for (i = 0; i < num; i++) + zbx_vector_uint64_append(&itemids, trends[i].itemid); + + zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC); + + DCconfig_get_items_by_itemids(items, itemids.values, errcodes, num); + + DCexport_history_and_trends(NULL, 0, &itemids, items, errcodes, trends, num); + + DCconfig_clean_items(items, errcodes, num); + zbx_vector_uint64_destroy(&itemids); + zbx_free(items); + zbx_free(errcodes); + + trends += num; + trends_num -= num; + } + + zabbix_log(LOG_LEVEL_WARNING, "exporting trend data done"); +} + +/****************************************************************************** + * * + * Purpose: flush all trends to the database * + * * + ******************************************************************************/ +static void DCsync_trends(void) +{ + zbx_hashset_iter_t iter; + ZBX_DC_TREND *trends = NULL, *trend; + int trends_alloc = 0, trends_num = 0, compression_age; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, cache->trends_num); + + compression_age = hc_get_history_compression_age(); + + zabbix_log(LOG_LEVEL_WARNING, "syncing trend data..."); + + LOCK_TRENDS; + + zbx_hashset_iter_reset(&cache->trends, &iter); + + while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter))) + { + if (SUCCEED == zbx_history_requires_trends(trend->value_type) && trend->clock >= compression_age) + DCflush_trend(trend, &trends, &trends_alloc, &trends_num); + } + + UNLOCK_TRENDS; + + if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS) && 0 != trends_num) + DCexport_all_trends(trends, trends_num); + + if (0 < trends_num) + qsort(trends, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare); + + DBbegin(); + + while (trends_num > 0) + DBflush_trends(trends, &trends_num, NULL); + + DBcommit(); + + zbx_free(trends); + + zabbix_log(LOG_LEVEL_WARNING, "syncing trend data done"); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +#define ZBX_FLAGS_TRIGGER_CREATE_NOTHING 0x00 +#define ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT 0x01 +#define ZBX_FLAGS_TRIGGER_CREATE_INTERNAL_EVENT 0x02 +#define ZBX_FLAGS_TRIGGER_CREATE_EVENT \ + (ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT | ZBX_FLAGS_TRIGGER_CREATE_INTERNAL_EVENT) + +/****************************************************************************** + * * + * Purpose: 1) calculate changeset of trigger fields to be updated * + * 2) generate events * + * * + * Parameters: trigger - [IN] the trigger to process * + * diffs - [OUT] the vector with trigger changes * + * * + * Return value: SUCCEED - trigger processed successfully * + * FAIL - no changes * + * * + * Comments: Trigger dependency checks will be done during event processing. * + * * + * Event generation depending on trigger value/state changes: * + * * + * From \ To | OK | OK(?) | PROBLEM | PROBLEM(?) | NONE * + *----------------------------------------------------------------------------* + * OK | . | I | E | I | . * + * | | | | | * + * OK(?) | I | . | E,I | - | I * + * | | | | | * + * PROBLEM | E | I | E(m) | I | . * + * | | | | | * + * PROBLEM(?) | E,I | - | E(m),I | . | I * + * * + * Legend: * + * 'E' - trigger event * + * 'I' - internal event * + * '.' - nothing * + * '-' - should never happen * + * * + ******************************************************************************/ +static int zbx_process_trigger(struct _DC_TRIGGER *trigger, zbx_vector_ptr_t *diffs) +{ + const char *new_error; + int new_state, new_value, ret = FAIL; + zbx_uint64_t flags = ZBX_FLAGS_TRIGGER_DIFF_UNSET, event_flags = ZBX_FLAGS_TRIGGER_CREATE_NOTHING; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() triggerid:" ZBX_FS_UI64 " value:%d(%d) new_value:%d", + __func__, trigger->triggerid, trigger->value, trigger->state, trigger->new_value); + + if (TRIGGER_VALUE_UNKNOWN == trigger->new_value) + { + new_state = TRIGGER_STATE_UNKNOWN; + new_value = trigger->value; + } + else + { + new_state = TRIGGER_STATE_NORMAL; + new_value = trigger->new_value; + } + new_error = (NULL == trigger->new_error ? "" : trigger->new_error); + + if (trigger->state != new_state) + { + flags |= ZBX_FLAGS_TRIGGER_DIFF_UPDATE_STATE; + event_flags |= ZBX_FLAGS_TRIGGER_CREATE_INTERNAL_EVENT; + } + + if (0 != strcmp(trigger->error, new_error)) + flags |= ZBX_FLAGS_TRIGGER_DIFF_UPDATE_ERROR; + + if (TRIGGER_STATE_NORMAL == new_state) + { + if (TRIGGER_VALUE_PROBLEM == new_value) + { + if (TRIGGER_VALUE_OK == trigger->value || TRIGGER_TYPE_MULTIPLE_TRUE == trigger->type) + event_flags |= ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT; + } + else if (TRIGGER_VALUE_OK == new_value) + { + if (TRIGGER_VALUE_PROBLEM == trigger->value || 0 == trigger->lastchange) + event_flags |= ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT; + } + } + + /* check if there is something to be updated */ + if (0 == (flags & ZBX_FLAGS_TRIGGER_DIFF_UPDATE) && 0 == (event_flags & ZBX_FLAGS_TRIGGER_CREATE_EVENT)) + goto out; + + if (0 != (event_flags & ZBX_FLAGS_TRIGGER_CREATE_TRIGGER_EVENT)) + { + zbx_add_event(EVENT_SOURCE_TRIGGERS, EVENT_OBJECT_TRIGGER, trigger->triggerid, + &trigger->timespec, new_value, trigger->description, + trigger->expression, trigger->recovery_expression, + trigger->priority, trigger->type, &trigger->tags, + trigger->correlation_mode, trigger->correlation_tag, trigger->value, trigger->opdata, + trigger->event_name, NULL); + } + + if (0 != (event_flags & ZBX_FLAGS_TRIGGER_CREATE_INTERNAL_EVENT)) + { + zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_TRIGGER, trigger->triggerid, + &trigger->timespec, new_state, NULL, trigger->expression, + trigger->recovery_expression, 0, 0, &trigger->tags, 0, NULL, 0, NULL, NULL, + new_error); + } + + zbx_append_trigger_diff(diffs, trigger->triggerid, trigger->priority, flags, trigger->value, new_state, + trigger->timespec.sec, new_error); + + ret = SUCCEED; +out: + zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s flags:" ZBX_FS_UI64, __func__, zbx_result_string(ret), + flags); + + return ret; +} + +/****************************************************************************** + * * + * Comments: helper function for zbx_process_triggers() * + * * + ******************************************************************************/ +static int zbx_trigger_topoindex_compare(const void *d1, const void *d2) +{ + const DC_TRIGGER *t1 = *(const DC_TRIGGER * const *)d1; + const DC_TRIGGER *t2 = *(const DC_TRIGGER * const *)d2; + + ZBX_RETURN_IF_NOT_EQUAL(t1->topoindex, t2->topoindex); + + return 0; +} + +/****************************************************************************** + * * + * Purpose: process triggers - calculates property changeset and generates * + * events * + * * + * Parameters: triggers - [IN] the triggers to process * + * trigger_diff - [OUT] the trigger changeset * + * * + * Comments: The trigger_diff changeset must be cleaned by the caller: * + * zbx_vector_ptr_clear_ext(trigger_diff, * + * (zbx_clean_func_t)zbx_trigger_diff_free); * + * * + ******************************************************************************/ +static void zbx_process_triggers(zbx_vector_ptr_t *triggers, zbx_vector_ptr_t *trigger_diff) +{ + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() values_num:%d", __func__, triggers->values_num); + + if (0 == triggers->values_num) + goto out; + + zbx_vector_ptr_sort(triggers, zbx_trigger_topoindex_compare); + + for (i = 0; i < triggers->values_num; i++) + zbx_process_trigger((struct _DC_TRIGGER *)triggers->values[i], trigger_diff); + + zbx_vector_ptr_sort(trigger_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); +out: + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: re-calculate and update values of triggers related to the items * + * * + * Parameters: history - [IN] array of history data * + * history_num - [IN] number of history structures * + * history_itemids - [IN] the item identifiers * + * (used for item lookup) * + * history_items - [IN] the items * + * history_errcodes - [IN] item error codes * + * timers - [IN] the trigger timers * + * trigger_diff - [OUT] trigger updates * + * * + ******************************************************************************/ +static void recalculate_triggers(const ZBX_DC_HISTORY *history, int history_num, + const zbx_vector_uint64_t *history_itemids, const DC_ITEM *history_items, const int *history_errcodes, + const zbx_vector_ptr_t *timers, zbx_vector_ptr_t *trigger_diff, zbx_uint64_t *itemids, + zbx_timespec_t *timespecs, zbx_hashset_t *trigger_info, zbx_vector_ptr_t *trigger_order) +{ + int i, item_num = 0, timers_num = 0; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + if (0 != history_num) + { + for (i = 0; i < history_num; i++) + { + const ZBX_DC_HISTORY *h = &history[i]; + + if (0 != (ZBX_DC_FLAG_NOVALUE & h->flags)) + continue; + + itemids[item_num] = h->itemid; + timespecs[item_num] = h->ts; + item_num++; + } + } + + for (i = 0; i < timers->values_num; i++) + { + zbx_trigger_timer_t *timer = (zbx_trigger_timer_t *)timers->values[i]; + + if (0 != timer->lock) + timers_num++; + } + + if (0 == item_num && 0 == timers_num) + goto out; + + zbx_hashset_reserve(trigger_info, MAX(100, 2 * item_num + timers_num)); + + zbx_vector_ptr_reserve(trigger_order, trigger_info->num_slots); + + if (0 != item_num) + { + DCconfig_get_triggers_by_itemids(trigger_info, trigger_order, itemids, timespecs, item_num); + zbx_prepare_triggers((DC_TRIGGER **)trigger_order->values, trigger_order->values_num); + zbx_determine_items_in_expressions(trigger_order, itemids, item_num); + } + + if (0 != timers_num) + { + int offset = trigger_order->values_num; + + zbx_dc_get_triggers_by_timers(trigger_info, trigger_order, timers); + + if (offset != trigger_order->values_num) + { + zbx_prepare_triggers((DC_TRIGGER **)trigger_order->values + offset, + trigger_order->values_num - offset); + } + } + + zbx_vector_ptr_sort(trigger_order, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); + zbx_evaluate_expressions(trigger_order, history_itemids, history_items, history_errcodes); + zbx_process_triggers(trigger_order, trigger_diff); + + DCfree_triggers(trigger_order); + + zbx_hashset_clear(trigger_info); + zbx_vector_ptr_clear(trigger_order); +out: + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +static void DCinventory_value_add(zbx_vector_ptr_t *inventory_values, const DC_ITEM *item, ZBX_DC_HISTORY *h) +{ + char value[MAX_BUFFER_LEN]; + const char *inventory_field; + zbx_inventory_value_t *inventory_value; + + if (ITEM_STATE_NOTSUPPORTED == h->state) + return; + + if (HOST_INVENTORY_AUTOMATIC != item->host.inventory_mode) + return; + + if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags) || + NULL == (inventory_field = DBget_inventory_field(item->inventory_link))) + { + return; + } + + switch (h->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + zbx_print_double(value, sizeof(value), h->value.dbl); + break; + case ITEM_VALUE_TYPE_UINT64: + zbx_snprintf(value, sizeof(value), ZBX_FS_UI64, h->value.ui64); + break; + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + zbx_strscpy(value, h->value.str); + break; + default: + return; + } + + zbx_format_value(value, sizeof(value), item->valuemapid, ZBX_NULL2EMPTY_STR(item->units), h->value_type); + + inventory_value = (zbx_inventory_value_t *)zbx_malloc(NULL, sizeof(zbx_inventory_value_t)); + + inventory_value->hostid = item->host.hostid; + inventory_value->idx = item->inventory_link - 1; + inventory_value->field_name = inventory_field; + inventory_value->value = zbx_strdup(NULL, value); + + zbx_vector_ptr_append(inventory_values, inventory_value); +} + +static void DCadd_update_inventory_sql(size_t *sql_offset, const zbx_vector_ptr_t *inventory_values) +{ + char *value_esc; + int i; + + for (i = 0; i < inventory_values->values_num; i++) + { + const zbx_inventory_value_t *inventory_value = (zbx_inventory_value_t *)inventory_values->values[i]; + + value_esc = DBdyn_escape_field("host_inventory", inventory_value->field_name, inventory_value->value); + + zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, + "update host_inventory set %s='%s' where hostid=" ZBX_FS_UI64 ";\n", + inventory_value->field_name, value_esc, inventory_value->hostid); + + DBexecute_overflowed_sql(&sql, &sql_alloc, sql_offset); + + zbx_free(value_esc); + } +} + +static void DCinventory_value_free(zbx_inventory_value_t *inventory_value) +{ + zbx_free(inventory_value->value); + zbx_free(inventory_value); +} + +/****************************************************************************** + * * + * Purpose: frees resources allocated to store str/text/log value * + * * + * Parameters: history - [IN] the history data * + * history_num - [IN] the number of values in history data * + * * + ******************************************************************************/ +static void dc_history_clean_value(ZBX_DC_HISTORY *history) +{ + if (ITEM_STATE_NOTSUPPORTED == history->state) + { + zbx_free(history->value.err); + return; + } + + if (0 != (ZBX_DC_FLAG_NOVALUE & history->flags)) + return; + + switch (history->value_type) + { + case ITEM_VALUE_TYPE_LOG: + zbx_free(history->value.log->value); + zbx_free(history->value.log->source); + zbx_free(history->value.log); + break; + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + zbx_free(history->value.str); + break; + } +} + +/****************************************************************************** + * * + * Purpose: frees resources allocated to store str/text/log values * + * * + * Parameters: history - [IN] the history data * + * history_num - [IN] the number of values in history data * + * * + ******************************************************************************/ +static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num) +{ + int i; + + for (i = 0; i < history_num; i++) + dc_history_clean_value(&history[i]); +} + +/****************************************************************************** + * * + * Purpose: sets history data to notsupported * + * * + * Parameters: history - [IN] the history data * + * errmsg - [IN] the error message * + * * + * Comments: The error message is stored directly and freed with when history * + * data is cleaned. * + * * + ******************************************************************************/ +static void dc_history_set_error(ZBX_DC_HISTORY *hdata, char *errmsg) +{ + dc_history_clean_value(hdata); + hdata->value.err = errmsg; + hdata->state = ITEM_STATE_NOTSUPPORTED; + hdata->flags |= ZBX_DC_FLAG_UNDEF; +} + +/****************************************************************************** + * * + * Purpose: sets history data value * + * * + * Parameters: hdata - [IN/OUT] the history data * + * value_type - [IN] the item value type * + * value - [IN] the value to set * + * * + ******************************************************************************/ +static void dc_history_set_value(ZBX_DC_HISTORY *hdata, unsigned char value_type, zbx_variant_t *value) +{ + char *errmsg = NULL; + + if (FAIL == zbx_variant_to_value_type(value, value_type, CONFIG_DOUBLE_PRECISION, &errmsg)) + { + dc_history_set_error(hdata, errmsg); + return; + } + + switch (value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + dc_history_clean_value(hdata); + hdata->value.dbl = value->data.dbl; + break; + case ITEM_VALUE_TYPE_UINT64: + dc_history_clean_value(hdata); + hdata->value.ui64 = value->data.ui64; + break; + case ITEM_VALUE_TYPE_STR: + dc_history_clean_value(hdata); + hdata->value.str = value->data.str; + hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_STR_VALUE_LEN)] = '\0'; + break; + case ITEM_VALUE_TYPE_TEXT: + dc_history_clean_value(hdata); + hdata->value.str = value->data.str; + hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_TEXT_VALUE_LEN)] = '\0'; + break; + case ITEM_VALUE_TYPE_LOG: + if (ITEM_VALUE_TYPE_LOG != hdata->value_type) + { + dc_history_clean_value(hdata); + hdata->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t)); + memset(hdata->value.log, 0, sizeof(zbx_log_value_t)); + } + hdata->value.log->value = value->data.str; + hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_LOG_VALUE_LEN)] = '\0'; + } + + hdata->value_type = value_type; + zbx_variant_set_none(value); +} + +/****************************************************************************** + * * + * Purpose: normalize item value by performing truncation of long text * + * values and changes value format according to the item value type * + * * + * Parameters: item - [IN] the item * + * hdata - [IN/OUT] the historical data to process * + * * + ******************************************************************************/ +static void normalize_item_value(const DC_ITEM *item, ZBX_DC_HISTORY *hdata) +{ + char *logvalue; + zbx_variant_t value_var; + + if (0 != (hdata->flags & ZBX_DC_FLAG_NOVALUE)) + return; + + if (ITEM_STATE_NOTSUPPORTED == hdata->state) + return; + + if (0 == (hdata->flags & ZBX_DC_FLAG_NOHISTORY)) + hdata->ttl = item->history_sec; + + if (item->value_type == hdata->value_type) + { + /* truncate text based values if necessary */ + switch (hdata->value_type) + { + case ITEM_VALUE_TYPE_STR: + hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_STR_VALUE_LEN)] = '\0'; + break; + case ITEM_VALUE_TYPE_TEXT: + hdata->value.str[zbx_db_strlen_n(hdata->value.str, ZBX_HISTORY_TEXT_VALUE_LEN)] = '\0'; + break; + case ITEM_VALUE_TYPE_LOG: + logvalue = hdata->value.log->value; + logvalue[zbx_db_strlen_n(logvalue, ZBX_HISTORY_LOG_VALUE_LEN)] = '\0'; + break; + case ITEM_VALUE_TYPE_FLOAT: + if (FAIL == zbx_validate_value_dbl(hdata->value.dbl, CONFIG_DOUBLE_PRECISION)) + { + char buffer[ZBX_MAX_DOUBLE_LEN + 1]; + + dc_history_set_error(hdata, zbx_dsprintf(NULL, + "Value %s is too small or too large.", + zbx_print_double(buffer, sizeof(buffer), hdata->value.dbl))); + } + break; + } + return; + } + + switch (hdata->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + zbx_variant_set_dbl(&value_var, hdata->value.dbl); + break; + case ITEM_VALUE_TYPE_UINT64: + zbx_variant_set_ui64(&value_var, hdata->value.ui64); + break; + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + zbx_variant_set_str(&value_var, hdata->value.str); + hdata->value.str = NULL; + break; + case ITEM_VALUE_TYPE_LOG: + zbx_variant_set_str(&value_var, hdata->value.log->value); + hdata->value.log->value = NULL; + break; + default: + THIS_SHOULD_NEVER_HAPPEN; + return; + } + + dc_history_set_value(hdata, item->value_type, &value_var); + zbx_variant_clear(&value_var); +} + +/****************************************************************************** + * * + * Purpose: calculates what item fields must be updated * + * * + * Parameters: item - [IN] the item * + * h - [IN] the historical data to process * + * * + * Return value: The update data. This data must be freed by the caller. * + * * + * Comments: Will generate internal events when item state switches. * + * * + ******************************************************************************/ +static zbx_item_diff_t *calculate_item_update(DC_ITEM *item, const ZBX_DC_HISTORY *h) +{ + zbx_uint64_t flags = 0; + const char *item_error = NULL; + zbx_item_diff_t *diff; + + if (0 != (ZBX_DC_FLAG_META & h->flags)) + { + if (item->lastlogsize != h->lastlogsize) + flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE; + + if (item->mtime != h->mtime) + flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME; + } + + if (h->state != item->state) + { + flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE; + + if (ITEM_STATE_NOTSUPPORTED == h->state) + { + zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became not supported: %s", + item->host.host, item->key_orig, h->value.str); + + zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state, NULL, + NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL, h->value.err); + + if (0 != strcmp(ZBX_NULL2EMPTY_STR(item->error), h->value.err)) + item_error = h->value.err; + } + else + { + zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became supported", + item->host.host, item->key_orig); + + /* we know it's EVENT_OBJECT_ITEM because LLDRULE that becomes */ + /* supported is handled in lld_process_discovery_rule() */ + zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state, + NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL, NULL); + + item_error = ""; + } + } + else if (ITEM_STATE_NOTSUPPORTED == h->state && 0 != strcmp(ZBX_NULL2EMPTY_STR(item->error), h->value.err)) + { + zabbix_log(LOG_LEVEL_WARNING, "error reason for \"%s:%s\" changed: %s", item->host.host, + item->key_orig, h->value.err); + + item_error = h->value.err; + } + + if (NULL != item_error) + flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR; + + if (0 == flags) + return NULL; + + diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t)); + diff->itemid = item->itemid; + diff->flags = flags; + + if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE & flags)) + diff->lastlogsize = h->lastlogsize; + + if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME & flags)) + diff->mtime = h->mtime; + + if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE & flags)) + { + diff->state = h->state; + item->state = h->state; + } + + if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR & flags)) + diff->error = item_error; + + return diff; +} + +/****************************************************************************** + * * + * Purpose: update item data and inventory in database * + * * + * Parameters: item_diff - item changes * + * inventory_values - inventory values * + * * + ******************************************************************************/ +static void DBmass_update_items(const zbx_vector_ptr_t *item_diff, const zbx_vector_ptr_t *inventory_values) +{ + size_t sql_offset = 0; + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + for (i = 0; i < item_diff->values_num; i++) + { + zbx_item_diff_t *diff; + + diff = (zbx_item_diff_t *)item_diff->values[i]; + if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_DB & diff->flags)) + break; + } + + if (i != item_diff->values_num || 0 != inventory_values->values_num) + { + zbx_DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset); + + if (i != item_diff->values_num) + { + zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff, + ZBX_FLAGS_ITEM_DIFF_UPDATE_DB); + } + + if (0 != inventory_values->values_num) + DCadd_update_inventory_sql(&sql_offset, inventory_values); + + zbx_DBend_multiple_update(&sql, &sql_alloc, &sql_offset); + + if (sql_offset > 16) /* In ORACLE always present begin..end; */ + DBexecute("%s", sql); + + DCconfig_update_inventory_values(inventory_values); + } + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: prepare itemdiff after receiving new values * + * * + * Parameters: history - array of history data * + * history_num - number of history structures * + * item_diff - vector to store prepared diff * + * * + ******************************************************************************/ +static void DCmass_proxy_prepare_itemdiff(ZBX_DC_HISTORY *history, int history_num, zbx_vector_ptr_t *item_diff) +{ + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + zbx_vector_ptr_reserve(item_diff, history_num); + + for (i = 0; i < history_num; i++) + { + zbx_item_diff_t *diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t)); + + diff->itemid = history[i].itemid; + diff->state = history[i].state; + diff->flags = ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE; + + if (0 != (ZBX_DC_FLAG_META & history[i].flags)) + { + diff->lastlogsize = history[i].lastlogsize; + diff->mtime = history[i].mtime; + diff->flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME; + } + + zbx_vector_ptr_append(item_diff, diff); + } + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: update items info after new value is received * + * * + * Parameters: item_diff - diff of items to be updated * + * * + ******************************************************************************/ +static void DBmass_proxy_update_items(zbx_vector_ptr_t *item_diff) +{ + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + if (0 != item_diff->values_num) + { + size_t sql_offset = 0; + + zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); + + zbx_DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset); + + zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff, + ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME); + + zbx_DBend_multiple_update(&sql, &sql_alloc, &sql_offset); + + if (sql_offset > 16) /* In ORACLE always present begin..end; */ + DBexecute("%s", sql); + } + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +typedef struct +{ + char *table_name; + char *sql; + size_t sql_alloc, sql_offset; +} +zbx_history_dupl_select_t; + +static int history_value_compare_func(const void *d1, const void *d2) +{ + const ZBX_DC_HISTORY *i1 = *(const ZBX_DC_HISTORY **)d1; + const ZBX_DC_HISTORY *i2 = *(const ZBX_DC_HISTORY **)d2; + + ZBX_RETURN_IF_NOT_EQUAL(i1->itemid, i2->itemid); + ZBX_RETURN_IF_NOT_EQUAL(i1->value_type, i2->value_type); + ZBX_RETURN_IF_NOT_EQUAL(i1->ts.sec, i2->ts.sec); + ZBX_RETURN_IF_NOT_EQUAL(i1->ts.ns, i2->ts.ns); + + return 0; +} + +static void vc_flag_duplicates(zbx_vector_ptr_t *history_index, zbx_vector_ptr_t *duplicates) +{ + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + for (i = 0; i < duplicates->values_num; i++) + { + int idx_cached; + + if (FAIL != (idx_cached = zbx_vector_ptr_bsearch(history_index, duplicates->values[i], + history_value_compare_func))) + { + ZBX_DC_HISTORY *cached_value = (ZBX_DC_HISTORY *)history_index->values[idx_cached]; + + dc_history_clean_value(cached_value); + cached_value->flags |= ZBX_DC_FLAGS_NOT_FOR_HISTORY; + } + } + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +static void db_fetch_duplicates(zbx_history_dupl_select_t *query, unsigned char value_type, + zbx_vector_ptr_t *duplicates) +{ + DB_RESULT result; + DB_ROW row; + + if (NULL == query->sql) + return; + + result = DBselect("%s", query->sql); + + while (NULL != (row = DBfetch(result))) + { + ZBX_DC_HISTORY *d = (ZBX_DC_HISTORY *)zbx_malloc(NULL, sizeof(ZBX_DC_HISTORY)); + + ZBX_STR2UINT64(d->itemid, row[0]); + d->ts.sec = atoi(row[1]); + d->ts.ns = atoi(row[2]); + + d->value_type = value_type; + + zbx_vector_ptr_append(duplicates, d); + } + DBfree_result(result); + + zbx_free(query->sql); +} + +static void remove_history_duplicates(zbx_vector_ptr_t *history) +{ + int i; + zbx_history_dupl_select_t select_flt = {.table_name = "history"}, + select_uint = {.table_name = "history_uint"}, + select_str = {.table_name = "history_str"}, + select_log = {.table_name = "history_log"}, + select_text = {.table_name = "history_text"}; + zbx_vector_ptr_t duplicates, history_index; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + zbx_vector_ptr_create(&duplicates); + zbx_vector_ptr_create(&history_index); + + zbx_vector_ptr_append_array(&history_index, history->values, history->values_num); + zbx_vector_ptr_sort(&history_index, history_value_compare_func); + + for (i = 0; i < history_index.values_num; i++) + { + ZBX_DC_HISTORY *h = history_index.values[i]; + zbx_history_dupl_select_t *select_ptr; + char *separator = " or"; + + if (h->value_type == ITEM_VALUE_TYPE_FLOAT) + select_ptr = &select_flt; + else if (h->value_type == ITEM_VALUE_TYPE_UINT64) + select_ptr = &select_uint; + else if (h->value_type == ITEM_VALUE_TYPE_STR) + select_ptr = &select_str; + else if (h->value_type == ITEM_VALUE_TYPE_LOG) + select_ptr = &select_log; + else if (h->value_type == ITEM_VALUE_TYPE_TEXT) + select_ptr = &select_text; + else + continue; + + if (NULL == select_ptr->sql) + { + zbx_snprintf_alloc(&select_ptr->sql, &select_ptr->sql_alloc, &select_ptr->sql_offset, + "select itemid,clock,ns" + " from %s" + " where", select_ptr->table_name); + separator = ""; + } + + zbx_snprintf_alloc(&select_ptr->sql, &select_ptr->sql_alloc, &select_ptr->sql_offset, + "%s (itemid=" ZBX_FS_UI64 " and clock=%d and ns=%d)", separator , h->itemid, + h->ts.sec, h->ts.ns); + } + + db_fetch_duplicates(&select_flt, ITEM_VALUE_TYPE_FLOAT, &duplicates); + db_fetch_duplicates(&select_uint, ITEM_VALUE_TYPE_UINT64, &duplicates); + db_fetch_duplicates(&select_str, ITEM_VALUE_TYPE_STR, &duplicates); + db_fetch_duplicates(&select_log, ITEM_VALUE_TYPE_LOG, &duplicates); + db_fetch_duplicates(&select_text, ITEM_VALUE_TYPE_TEXT, &duplicates); + + vc_flag_duplicates(&history_index, &duplicates); + + zbx_vector_ptr_clear_ext(&duplicates, (zbx_clean_func_t)zbx_ptr_free); + zbx_vector_ptr_destroy(&duplicates); + zbx_vector_ptr_destroy(&history_index); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +static int add_history(ZBX_DC_HISTORY *history, int history_num, zbx_vector_ptr_t *history_values, int *ret_flush) +{ + int i, ret = SUCCEED; + + for (i = 0; i < history_num; i++) + { + ZBX_DC_HISTORY *h = &history[i]; + + if (0 != (ZBX_DC_FLAGS_NOT_FOR_HISTORY & h->flags)) + continue; + + zbx_vector_ptr_append(history_values, h); + } + + if (0 != history_values->values_num) + ret = zbx_vc_add_values(history_values, ret_flush); + + return ret; +} + +/****************************************************************************** + * * + * Purpose: inserting new history data after new value is received * + * * + * Parameters: history - array of history data * + * history_num - number of history structures * + * * + ******************************************************************************/ +static int DBmass_add_history(ZBX_DC_HISTORY *history, int history_num) +{ + int ret, ret_flush = FLUSH_SUCCEED, num; + zbx_vector_ptr_t history_values; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + zbx_vector_ptr_create(&history_values); + zbx_vector_ptr_reserve(&history_values, history_num); + + if (FAIL == (ret = add_history(history, history_num, &history_values, &ret_flush)) && + FLUSH_DUPL_REJECTED == ret_flush) + { + num = history_values.values_num; + remove_history_duplicates(&history_values); + zbx_vector_ptr_clear(&history_values); + + if (SUCCEED == (ret = add_history(history, history_num, &history_values, &ret_flush))) + zabbix_log(LOG_LEVEL_WARNING, "skipped %d duplicates", num - history_values.values_num); + } + + zbx_vector_ptr_destroy(&history_values); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); + + return ret; +} + +/****************************************************************************** + * * + * Purpose: helper function for DCmass_proxy_add_history() * + * * + * Comment: this function is meant for items with value_type other than * + * ITEM_VALUE_TYPE_LOG not containing meta information in result * + * * + ******************************************************************************/ +static void dc_add_proxy_history(ZBX_DC_HISTORY *history, int history_num) +{ + int i, now, history_count = 0; + unsigned int flags; + char buffer[64], *pvalue; + zbx_db_insert_t db_insert; + + now = (int)time(NULL); + zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "flags", "write_clock", + NULL); + + for (i = 0; i < history_num; i++) + { + const ZBX_DC_HISTORY *h = &history[i]; + + if (0 != (h->flags & ZBX_DC_FLAG_UNDEF)) + continue; + + if (0 != (h->flags & ZBX_DC_FLAG_META)) + continue; + + if (ITEM_STATE_NOTSUPPORTED == h->state) + continue; + + if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE)) + { + switch (h->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl); + break; + case ITEM_VALUE_TYPE_UINT64: + zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64); + break; + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + pvalue = h->value.str; + break; + case ITEM_VALUE_TYPE_LOG: + continue; + default: + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + flags = 0; + } + else + { + flags = PROXY_HISTORY_FLAG_NOVALUE; + pvalue = (char *)""; + } + + history_count++; + zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, flags, now); + } + + change_proxy_history_count(history_count); + zbx_db_insert_execute(&db_insert); + zbx_db_insert_clean(&db_insert); +} + +/****************************************************************************** + * * + * Purpose: helper function for DCmass_proxy_add_history() * + * * + * Comment: this function is meant for items with value_type other than * + * ITEM_VALUE_TYPE_LOG containing meta information in result * + * * + ******************************************************************************/ +static void dc_add_proxy_history_meta(ZBX_DC_HISTORY *history, int history_num) +{ + int i, now, history_count = 0; + char buffer[64], *pvalue; + zbx_db_insert_t db_insert; + + now = (int)time(NULL); + zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "lastlogsize", "mtime", + "flags", "write_clock", NULL); + + for (i = 0; i < history_num; i++) + { + unsigned int flags = PROXY_HISTORY_FLAG_META; + const ZBX_DC_HISTORY *h = &history[i]; + + if (ITEM_STATE_NOTSUPPORTED == h->state) + continue; + + if (0 != (h->flags & ZBX_DC_FLAG_UNDEF)) + continue; + + if (0 == (h->flags & ZBX_DC_FLAG_META)) + continue; + + if (ITEM_VALUE_TYPE_LOG == h->value_type) + continue; + + if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE)) + { + switch (h->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl); + break; + case ITEM_VALUE_TYPE_UINT64: + zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64); + break; + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + pvalue = h->value.str; + break; + default: + THIS_SHOULD_NEVER_HAPPEN; + continue; + } + } + else + { + flags |= PROXY_HISTORY_FLAG_NOVALUE; + pvalue = (char *)""; + } + + history_count++; + zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, h->lastlogsize, h->mtime, + flags, now); + } + + change_proxy_history_count(history_count); + zbx_db_insert_execute(&db_insert); + zbx_db_insert_clean(&db_insert); +} + +/****************************************************************************** + * * + * Purpose: helper function for DCmass_proxy_add_history() * + * * + * Comment: this function is meant for items with value_type * + * ITEM_VALUE_TYPE_LOG * + * * + ******************************************************************************/ +static void dc_add_proxy_history_log(ZBX_DC_HISTORY *history, int history_num) +{ + int i, now, history_count = 0; + zbx_db_insert_t db_insert; + + now = (int)time(NULL); + + /* see hc_copy_history_data() for fields that might be uninitialized and need special handling here */ + zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "timestamp", "source", "severity", + "value", "logeventid", "lastlogsize", "mtime", "flags", "write_clock", NULL); + + for (i = 0; i < history_num; i++) + { + unsigned int flags; + zbx_uint64_t lastlogsize; + int mtime; + const ZBX_DC_HISTORY *h = &history[i]; + + if (ITEM_STATE_NOTSUPPORTED == h->state) + continue; + + if (ITEM_VALUE_TYPE_LOG != h->value_type) + continue; + + if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE)) + { + zbx_log_value_t *log = h->value.log; + + if (0 != (h->flags & ZBX_DC_FLAG_META)) + { + flags = PROXY_HISTORY_FLAG_META; + lastlogsize = h->lastlogsize; + mtime = h->mtime; + } + else + { + flags = 0; + lastlogsize = 0; + mtime = 0; + } + + zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, log->timestamp, + ZBX_NULL2EMPTY_STR(log->source), log->severity, log->value, log->logeventid, + lastlogsize, mtime, flags, now); + } + else + { + /* sent to server only if not 0, see proxy_get_history_data() */ + const int unset_if_novalue = 0; + + flags = PROXY_HISTORY_FLAG_META | PROXY_HISTORY_FLAG_NOVALUE; + + zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, unset_if_novalue, "", + unset_if_novalue, "", unset_if_novalue, h->lastlogsize, h->mtime, flags, now); + } + history_count++; + } + + change_proxy_history_count(history_count); + zbx_db_insert_execute(&db_insert); + zbx_db_insert_clean(&db_insert); +} + +/****************************************************************************** + * * + * Purpose: helper function for DCmass_proxy_add_history() * + * * + ******************************************************************************/ +static void dc_add_proxy_history_notsupported(ZBX_DC_HISTORY *history, int history_num) +{ + int i, now, history_count = 0; + zbx_db_insert_t db_insert; + + now = (int)time(NULL); + zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "state", "write_clock", + NULL); + + for (i = 0; i < history_num; i++) + { + const ZBX_DC_HISTORY *h = &history[i]; + + if (ITEM_STATE_NOTSUPPORTED != h->state) + continue; + + history_count++; + zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, ZBX_NULL2EMPTY_STR(h->value.err), + (int)h->state, now); + } + + change_proxy_history_count(history_count); + zbx_db_insert_execute(&db_insert); + zbx_db_insert_clean(&db_insert); +} + +/****************************************************************************** + * * + * Purpose: inserting new history data after new value is received * + * * + * Parameters: history - array of history data * + * history_num - number of history structures * + * * + ******************************************************************************/ +static void DBmass_proxy_add_history(ZBX_DC_HISTORY *history, int history_num) +{ + int i, h_num = 0, h_meta_num = 0, hlog_num = 0, notsupported_num = 0; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + for (i = 0; i < history_num; i++) + { + const ZBX_DC_HISTORY *h = &history[i]; + + if (ITEM_STATE_NOTSUPPORTED == h->state) + { + notsupported_num++; + continue; + } + + switch (h->value_type) + { + case ITEM_VALUE_TYPE_LOG: + hlog_num++; + break; + case ITEM_VALUE_TYPE_FLOAT: + case ITEM_VALUE_TYPE_UINT64: + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + if (0 != (h->flags & ZBX_DC_FLAG_META)) + h_meta_num++; + else + h_num++; + break; + case ITEM_VALUE_TYPE_NONE: + h_num++; + break; + default: + THIS_SHOULD_NEVER_HAPPEN; + } + } + + if (0 != h_num) + dc_add_proxy_history(history, history_num); + + if (0 != h_meta_num) + dc_add_proxy_history_meta(history, history_num); + + if (0 != hlog_num) + dc_add_proxy_history_log(history, history_num); + + if (0 != notsupported_num) + dc_add_proxy_history_notsupported(history, history_num); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: prepare history data using items from configuration cache and * + * generate item changes to be applied and host inventory values to * + * be added * + * * + * Parameters: history - [IN/OUT] array of history data * + * itemids - [IN] the item identifiers * + * (used for item lookup) * + * items - [IN] the items * + * errcodes - [IN] item error codes * + * history_num - [IN] number of history structures * + * item_diff - [OUT] the changes in item data * + * inventory_values - [OUT] the inventory values to add * + * compression_age - [IN] history compression age * + * proxy_subscribtions - [IN] history compression age * + * * + ******************************************************************************/ +static void DCmass_prepare_history(ZBX_DC_HISTORY *history, DC_ITEM *items, const int *errcodes, int history_num, + zbx_vector_ptr_t *item_diff, zbx_vector_ptr_t *inventory_values, int compression_age, + zbx_vector_uint64_pair_t *proxy_subscribtions) +{ + static time_t last_history_discard = 0; + time_t now; + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, history_num); + + now = time(NULL); + + for (i = 0; i < history_num; i++) + { + ZBX_DC_HISTORY *h = &history[i]; + DC_ITEM *item; + zbx_item_diff_t *diff; + + /* discard history items that are older than compression age */ + if (0 != compression_age && h->ts.sec < compression_age) + { + if (SEC_PER_HOUR < (now - last_history_discard)) /* log once per hour */ + { + zabbix_log(LOG_LEVEL_TRACE, "discarding history that is pointing to" + " compressed history period"); + last_history_discard = now; + } + + h->flags |= ZBX_DC_FLAG_UNDEF; + continue; + } + + if (SUCCEED != errcodes[i]) + { + h->flags |= ZBX_DC_FLAG_UNDEF; + continue; + } + + item = &items[i]; + + if (ITEM_STATUS_ACTIVE != item->status || HOST_STATUS_MONITORED != item->host.status) + { + h->flags |= ZBX_DC_FLAG_UNDEF; + continue; + } + + if (0 == item->history) + { + h->flags |= ZBX_DC_FLAG_NOHISTORY; + } + else if (now - h->ts.sec > item->history_sec) + { + h->flags |= ZBX_DC_FLAG_NOHISTORY; + zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside history " + "storage period", item->host.host, item->key_orig, + zbx_date2str(h->ts.sec, NULL), zbx_time2str(h->ts.sec, NULL)); + } + + if (ITEM_VALUE_TYPE_FLOAT == item->value_type || ITEM_VALUE_TYPE_UINT64 == item->value_type) + { + if (0 == item->trends) + { + h->flags |= ZBX_DC_FLAG_NOTRENDS; + } + else if (now - h->ts.sec > item->trends_sec) + { + h->flags |= ZBX_DC_FLAG_NOTRENDS; + zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside " + "trends storage period", item->host.host, item->key_orig, + zbx_date2str(h->ts.sec, NULL), zbx_time2str(h->ts.sec, NULL)); + } + } + else + h->flags |= ZBX_DC_FLAG_NOTRENDS; + + normalize_item_value(item, h); + + /* calculate item update and update already retrieved item status for trigger calculation */ + if (NULL != (diff = calculate_item_update(item, h))) + zbx_vector_ptr_append(item_diff, diff); + + DCinventory_value_add(inventory_values, item, h); + + if (0 != item->host.proxy_hostid && FAIL == is_item_processed_by_server(item->type, item->key_orig)) + { + zbx_uint64_pair_t p = {item->host.proxy_hostid, h->ts.sec}; + + zbx_vector_uint64_pair_append(proxy_subscribtions, p); + } + } + + zbx_vector_ptr_sort(inventory_values, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); + zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: prepare history data to share them with loadable modules, sort * + * data by type skipping low-level discovery data, meta information * + * updates and notsupported items * + * * + * Parameters: history - [IN] array of history data * + * history_num - [IN] number of history structures * + * history_<type> - [OUT] array of historical data of a * + * specific data type * + * history_<type>_num - [OUT] number of values of a specific * + * data type * + * * + ******************************************************************************/ +static void DCmodule_prepare_history(ZBX_DC_HISTORY *history, int history_num, ZBX_HISTORY_FLOAT *history_float, + int *history_float_num, ZBX_HISTORY_INTEGER *history_integer, int *history_integer_num, + ZBX_HISTORY_STRING *history_string, int *history_string_num, ZBX_HISTORY_TEXT *history_text, + int *history_text_num, ZBX_HISTORY_LOG *history_log, int *history_log_num) +{ + ZBX_DC_HISTORY *h; + ZBX_HISTORY_FLOAT *h_float; + ZBX_HISTORY_INTEGER *h_integer; + ZBX_HISTORY_STRING *h_string; + ZBX_HISTORY_TEXT *h_text; + ZBX_HISTORY_LOG *h_log; + int i; + const zbx_log_value_t *log; + + *history_float_num = 0; + *history_integer_num = 0; + *history_string_num = 0; + *history_text_num = 0; + *history_log_num = 0; + + for (i = 0; i < history_num; i++) + { + h = &history[i]; + + if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags)) + continue; + + switch (h->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + if (NULL == history_float_cbs) + continue; + + h_float = &history_float[(*history_float_num)++]; + h_float->itemid = h->itemid; + h_float->clock = h->ts.sec; + h_float->ns = h->ts.ns; + h_float->value = h->value.dbl; + break; + case ITEM_VALUE_TYPE_UINT64: + if (NULL == history_integer_cbs) + continue; + + h_integer = &history_integer[(*history_integer_num)++]; + h_integer->itemid = h->itemid; + h_integer->clock = h->ts.sec; + h_integer->ns = h->ts.ns; + h_integer->value = h->value.ui64; + break; + case ITEM_VALUE_TYPE_STR: + if (NULL == history_string_cbs) + continue; + + h_string = &history_string[(*history_string_num)++]; + h_string->itemid = h->itemid; + h_string->clock = h->ts.sec; + h_string->ns = h->ts.ns; + h_string->value = h->value.str; + break; + case ITEM_VALUE_TYPE_TEXT: + if (NULL == history_text_cbs) + continue; + + h_text = &history_text[(*history_text_num)++]; + h_text->itemid = h->itemid; + h_text->clock = h->ts.sec; + h_text->ns = h->ts.ns; + h_text->value = h->value.str; + break; + case ITEM_VALUE_TYPE_LOG: + if (NULL == history_log_cbs) + continue; + + log = h->value.log; + h_log = &history_log[(*history_log_num)++]; + h_log->itemid = h->itemid; + h_log->clock = h->ts.sec; + h_log->ns = h->ts.ns; + h_log->value = log->value; + h_log->source = ZBX_NULL2EMPTY_STR(log->source); + h_log->timestamp = log->timestamp; + h_log->logeventid = log->logeventid; + h_log->severity = log->severity; + break; + default: + THIS_SHOULD_NEVER_HAPPEN; + } + } +} + +static void DCmodule_sync_history(int history_float_num, int history_integer_num, int history_string_num, + int history_text_num, int history_log_num, ZBX_HISTORY_FLOAT *history_float, + ZBX_HISTORY_INTEGER *history_integer, ZBX_HISTORY_STRING *history_string, + ZBX_HISTORY_TEXT *history_text, ZBX_HISTORY_LOG *history_log) +{ + if (0 != history_float_num) + { + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "syncing float history data with modules..."); + + for (i = 0; NULL != history_float_cbs[i].module; i++) + { + zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_float_cbs[i].module->name); + history_float_cbs[i].history_float_cb(history_float, history_float_num); + } + + zabbix_log(LOG_LEVEL_DEBUG, "synced %d float values with modules", history_float_num); + } + + if (0 != history_integer_num) + { + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "syncing integer history data with modules..."); + + for (i = 0; NULL != history_integer_cbs[i].module; i++) + { + zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_integer_cbs[i].module->name); + history_integer_cbs[i].history_integer_cb(history_integer, history_integer_num); + } + + zabbix_log(LOG_LEVEL_DEBUG, "synced %d integer values with modules", history_integer_num); + } + + if (0 != history_string_num) + { + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "syncing string history data with modules..."); + + for (i = 0; NULL != history_string_cbs[i].module; i++) + { + zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_string_cbs[i].module->name); + history_string_cbs[i].history_string_cb(history_string, history_string_num); + } + + zabbix_log(LOG_LEVEL_DEBUG, "synced %d string values with modules", history_string_num); + } + + if (0 != history_text_num) + { + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "syncing text history data with modules..."); + + for (i = 0; NULL != history_text_cbs[i].module; i++) + { + zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_text_cbs[i].module->name); + history_text_cbs[i].history_text_cb(history_text, history_text_num); + } + + zabbix_log(LOG_LEVEL_DEBUG, "synced %d text values with modules", history_text_num); + } + + if (0 != history_log_num) + { + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "syncing log history data with modules..."); + + for (i = 0; NULL != history_log_cbs[i].module; i++) + { + zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_log_cbs[i].module->name); + history_log_cbs[i].history_log_cb(history_log, history_log_num); + } + + zabbix_log(LOG_LEVEL_DEBUG, "synced %d log values with modules", history_log_num); + } +} + +/****************************************************************************** + * * + * Purpose: prepares history update by checking which values must be stored * + * * + * Parameters: history - [IN/OUT] the history values * + * history_num - [IN] the number of history values * + * * + ******************************************************************************/ +static void proxy_prepare_history(ZBX_DC_HISTORY *history, int history_num) +{ + int i, *errcodes; + DC_ITEM *items; + zbx_vector_uint64_t itemids; + + zbx_vector_uint64_create(&itemids); + zbx_vector_uint64_reserve(&itemids, history_num); + + for (i = 0; i < history_num; i++) + zbx_vector_uint64_append(&itemids, history[i].itemid); + + items = (DC_ITEM *)zbx_calloc(NULL, 1, sizeof(DC_ITEM) * (size_t)history_num); + errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)history_num); + + DCconfig_get_items_by_itemids_partial(items, itemids.values, errcodes, itemids.values_num, + ZBX_ITEM_GET_HOUSEKEEPING | ZBX_ITEM_GET_MISC); + + for (i = 0; i < history_num; i++) + { + if (SUCCEED != errcodes[i]) + continue; + + /* store items with enabled history */ + if (0 != items[i].history) + continue; + + /* store numeric items to handle data conversion errors on server and trends */ + if (ITEM_VALUE_TYPE_FLOAT == items[i].value_type || ITEM_VALUE_TYPE_UINT64 == items[i].value_type) + continue; + + /* store discovery rules */ + if (0 != (items[i].flags & ZBX_FLAG_DISCOVERY_RULE)) + continue; + + /* store errors or first value after an error */ + if (ITEM_STATE_NOTSUPPORTED == history[i].state || ITEM_STATE_NOTSUPPORTED == items[i].state) + continue; + + /* store items linked to host inventory */ + if (0 != items[i].inventory_link) + continue; + + dc_history_clean_value(history + i); + + /* all checks passed, item value must not be stored in proxy history/sent to server */ + history[i].flags |= ZBX_DC_FLAG_NOVALUE; + } + + DCconfig_clean_items(items, errcodes, history_num); + zbx_free(items); + zbx_free(errcodes); + zbx_vector_uint64_destroy(&itemids); +} + +static void sync_proxy_history(int *total_num, int *more) +{ + int history_num, txn_rc; + time_t sync_start; + zbx_vector_ptr_t history_items; + zbx_vector_ptr_t item_diff; + ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX]; + + zbx_vector_ptr_create(&history_items); + zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX); + zbx_vector_ptr_create(&item_diff); + + sync_start = time(NULL); + + do + { + *more = ZBX_SYNC_DONE; + + LOCK_CACHE; + + hc_pop_items(&history_items); /* select and take items out of history cache */ + history_num = history_items.values_num; + + UNLOCK_CACHE; + + if (0 == history_num) + break; + + hc_get_item_values(history, &history_items); /* copy item data from history cache */ + proxy_prepare_history(history, history_items.values_num); + + DCmass_proxy_prepare_itemdiff(history, history_num, &item_diff); + + do + { + DBbegin(); + + DBmass_proxy_add_history(history, history_num); + DBmass_proxy_update_items(&item_diff); + } + while (ZBX_DB_DOWN == (txn_rc = DBcommit())); + + LOCK_CACHE; + + hc_push_items(&history_items); /* return items to history cache */ + + if (ZBX_DB_FAIL != txn_rc) + { + if (0 != item_diff.values_num) + DCconfig_items_apply_changes(&item_diff); + + cache->history_num -= history_num; + + if (0 != hc_queue_get_size()) + *more = ZBX_SYNC_MORE; + + UNLOCK_CACHE; + + *total_num += history_num; + + hc_free_item_values(history, history_num); + } + else + { + *more = ZBX_SYNC_MORE; + UNLOCK_CACHE; + } + + zbx_vector_ptr_clear(&history_items); + zbx_vector_ptr_clear_ext(&item_diff, zbx_default_mem_free_func); + + /* Exit from sync loop if we have spent too much time here */ + /* unless we are doing full sync. This is done to allow */ + /* syncer process to update their statistics. */ + } + while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start); + + zbx_vector_ptr_destroy(&item_diff); + zbx_vector_ptr_destroy(&history_items); +} + +/****************************************************************************** + * * + * Purpose: flush history cache to database, process triggers of flushed * + * and timer triggers from timer queue * + * * + * Parameters: sync_timeout - [IN] the timeout in seconds * + * values_num - [IN/OUT] the number of synced values * + * triggers_num - [IN/OUT] the number of processed timers * + * more - [OUT] a flag indicating the cache emptiness: * + * ZBX_SYNC_DONE - nothing to sync, go idle * + * ZBX_SYNC_MORE - more data to sync * + * * + * Comments: This function loops syncing history values by 1k batches and * + * processing timer triggers by batches of 500 triggers. * + * Unless full sync is being done the loop is aborted if either * + * timeout has passed or there are no more data to process. * + * The last is assumed when the following is true: * + * a) history cache is empty or less than 10% of batch values were * + * processed (the other items were locked by triggers) * + * b) less than 500 (full batch) timer triggers were processed * + * * + ******************************************************************************/ +static void sync_server_history(int *values_num, int *triggers_num, int *more) +{ + static ZBX_HISTORY_FLOAT *history_float; + static ZBX_HISTORY_INTEGER *history_integer; + static ZBX_HISTORY_STRING *history_string; + static ZBX_HISTORY_TEXT *history_text; + static ZBX_HISTORY_LOG *history_log; + static int module_enabled = FAIL; + int i, history_num, history_float_num, history_integer_num, history_string_num, + history_text_num, history_log_num, txn_error, compression_age; + unsigned int item_retrieve_mode; + time_t sync_start; + zbx_vector_uint64_t triggerids ; + zbx_vector_ptr_t history_items, trigger_diff, item_diff, inventory_values, trigger_timers, + trigger_order; + zbx_vector_uint64_pair_t trends_diff, proxy_subscribtions; + ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX]; + zbx_uint64_t trigger_itemids[ZBX_HC_SYNC_MAX]; + zbx_timespec_t trigger_timespecs[ZBX_HC_SYNC_MAX]; + DC_ITEM *items = NULL; + int *errcodes = NULL; + zbx_vector_uint64_t itemids; + zbx_hashset_t trigger_info; + + item_retrieve_mode = NULL == CONFIG_EXPORT_DIR ? ZBX_ITEM_GET_SYNC : ZBX_ITEM_GET_SYNC_EXPORT; + + if (NULL == history_float && NULL != history_float_cbs) + { + module_enabled = SUCCEED; + history_float = (ZBX_HISTORY_FLOAT *)zbx_malloc(history_float, + ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_FLOAT)); + } + + if (NULL == history_integer && NULL != history_integer_cbs) + { + module_enabled = SUCCEED; + history_integer = (ZBX_HISTORY_INTEGER *)zbx_malloc(history_integer, + ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_INTEGER)); + } + + if (NULL == history_string && NULL != history_string_cbs) + { + module_enabled = SUCCEED; + history_string = (ZBX_HISTORY_STRING *)zbx_malloc(history_string, + ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_STRING)); + } + + if (NULL == history_text && NULL != history_text_cbs) + { + module_enabled = SUCCEED; + history_text = (ZBX_HISTORY_TEXT *)zbx_malloc(history_text, + ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_TEXT)); + } + + if (NULL == history_log && NULL != history_log_cbs) + { + module_enabled = SUCCEED; + history_log = (ZBX_HISTORY_LOG *)zbx_malloc(history_log, + ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_LOG)); + } + + compression_age = hc_get_history_compression_age(); + + zbx_vector_ptr_create(&inventory_values); + zbx_vector_ptr_create(&item_diff); + zbx_vector_ptr_create(&trigger_diff); + zbx_vector_uint64_pair_create(&trends_diff); + zbx_vector_uint64_pair_create(&proxy_subscribtions); + + zbx_vector_uint64_create(&triggerids); + zbx_vector_uint64_reserve(&triggerids, ZBX_HC_SYNC_MAX); + + zbx_vector_ptr_create(&trigger_timers); + zbx_vector_ptr_reserve(&trigger_timers, ZBX_HC_TIMER_MAX); + + zbx_vector_ptr_create(&history_items); + zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX); + + zbx_vector_ptr_create(&trigger_order); + zbx_hashset_create(&trigger_info, 100, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC); + + zbx_vector_uint64_create(&itemids); + + sync_start = time(NULL); + + do + { + int trends_num = 0, timers_num = 0, ret = SUCCEED; + ZBX_DC_TREND *trends = NULL; + + *more = ZBX_SYNC_DONE; + + LOCK_CACHE; + hc_pop_items(&history_items); /* select and take items out of history cache */ + UNLOCK_CACHE; + + if (0 != history_items.values_num) + { + if (0 == (history_num = DCconfig_lock_triggers_by_history_items(&history_items, &triggerids))) + { + LOCK_CACHE; + hc_push_items(&history_items); + UNLOCK_CACHE; + zbx_vector_ptr_clear(&history_items); + } + } + else + history_num = 0; + + if (0 != history_num) + { + zbx_dc_um_handle_t *um_handle; + + zbx_vector_ptr_sort(&history_items, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC); + hc_get_item_values(history, &history_items); /* copy item data from history cache */ + + if (NULL == items) + items = (DC_ITEM *)zbx_calloc(NULL, 1, sizeof(DC_ITEM) * (size_t)ZBX_HC_SYNC_MAX); + + if (NULL == errcodes) + errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)ZBX_HC_SYNC_MAX); + + zbx_vector_uint64_reserve(&itemids, history_num); + + for (i = 0; i < history_num; i++) + zbx_vector_uint64_append(&itemids, history[i].itemid); + + DCconfig_get_items_by_itemids_partial(items, itemids.values, errcodes, history_num, + item_retrieve_mode); + + um_handle = zbx_dc_open_user_macros(); + + DCmass_prepare_history(history, items, errcodes, history_num, &item_diff, + &inventory_values, compression_age, &proxy_subscribtions); + + if (FAIL != (ret = DBmass_add_history(history, history_num))) + { + DCconfig_items_apply_changes(&item_diff); + DCmass_update_trends(history, history_num, &trends, &trends_num, compression_age); + + if (0 != trends_num) + zbx_tfc_invalidate_trends(trends, trends_num); + + do + { + DBbegin(); + + DBmass_update_items(&item_diff, &inventory_values); + DBmass_update_trends(trends, trends_num, &trends_diff); + + /* process internal events generated by DCmass_prepare_history() */ + zbx_process_events(NULL, NULL); + + if (ZBX_DB_OK == (txn_error = DBcommit())) + DCupdate_trends(&trends_diff); + else + zbx_reset_event_recovery(); + + zbx_vector_uint64_pair_clear(&trends_diff); + } + while (ZBX_DB_DOWN == txn_error); + } + + zbx_dc_close_user_macros(um_handle); + + zbx_clean_events(); + + zbx_vector_ptr_clear_ext(&inventory_values, (zbx_clean_func_t)DCinventory_value_free); + zbx_vector_ptr_clear_ext(&item_diff, (zbx_clean_func_t)zbx_ptr_free); + } + + if (FAIL != ret) + { + /* don't process trigger timers when server is shutting down */ + if (ZBX_IS_RUNNING()) + { + zbx_dc_get_trigger_timers(&trigger_timers, time(NULL), ZBX_HC_TIMER_SOFT_MAX, + ZBX_HC_TIMER_MAX); + } + + timers_num = trigger_timers.values_num; + + if (ZBX_HC_TIMER_SOFT_MAX <= timers_num) + *more = ZBX_SYNC_MORE; + + if (0 != history_num || 0 != timers_num) + { + for (i = 0; i < trigger_timers.values_num; i++) + { + zbx_trigger_timer_t *timer = (zbx_trigger_timer_t *)trigger_timers.values[i]; + + if (0 != timer->lock) + zbx_vector_uint64_append(&triggerids, timer->triggerid); + } + + do + { + DBbegin(); + + recalculate_triggers(history, history_num, &itemids, items, errcodes, + &trigger_timers, &trigger_diff, trigger_itemids, + trigger_timespecs, &trigger_info, &trigger_order); + + /* process trigger events generated by recalculate_triggers() */ + zbx_process_events(&trigger_diff, &triggerids); + if (0 != trigger_diff.values_num) + zbx_db_save_trigger_changes(&trigger_diff); + + if (ZBX_DB_OK == (txn_error = DBcommit())) + DCconfig_triggers_apply_changes(&trigger_diff); + else + zbx_clean_events(); + + zbx_vector_ptr_clear_ext(&trigger_diff, (zbx_clean_func_t)zbx_trigger_diff_free); + } + while (ZBX_DB_DOWN == txn_error); + + if (ZBX_DB_OK == txn_error) + zbx_events_update_itservices(); + } + } + + if (0 != triggerids.values_num) + { + *triggers_num += triggerids.values_num; + DCconfig_unlock_triggers(&triggerids); + zbx_vector_uint64_clear(&triggerids); + } + + if (0 != trigger_timers.values_num) + { + zbx_dc_reschedule_trigger_timers(&trigger_timers, time(NULL)); + zbx_vector_ptr_clear(&trigger_timers); + } + + if (0 != proxy_subscribtions.values_num) + { + zbx_vector_uint64_pair_sort(&proxy_subscribtions, ZBX_DEFAULT_UINT64_COMPARE_FUNC); + zbx_dc_proxy_update_nodata(&proxy_subscribtions); + zbx_vector_uint64_pair_clear(&proxy_subscribtions); + } + + if (0 != history_num) + { + LOCK_CACHE; + hc_push_items(&history_items); /* return items to history cache */ + cache->history_num -= history_num; + + if (0 != hc_queue_get_size()) + { + /* Continue sync if enough of sync candidates were processed */ + /* (meaning most of sync candidates are not locked by triggers). */ + /* Otherwise better to wait a bit for other syncers to unlock */ + /* items rather than trying and failing to sync locked items over */ + /* and over again. */ + if (ZBX_HC_SYNC_MIN_PCNT <= history_num * 100 / history_items.values_num) + *more = ZBX_SYNC_MORE; + } + + UNLOCK_CACHE; + + *values_num += history_num; + } + + if (FAIL != ret) + { + if (0 != history_num) + { + const ZBX_DC_HISTORY *phistory = NULL; + const ZBX_DC_TREND *ptrends = NULL; + int history_num_loc = 0, trends_num_loc = 0; + + if (SUCCEED == module_enabled) + { + DCmodule_prepare_history(history, history_num, history_float, &history_float_num, + history_integer, &history_integer_num, history_string, + &history_string_num, history_text, &history_text_num, history_log, + &history_log_num); + + DCmodule_sync_history(history_float_num, history_integer_num, history_string_num, + history_text_num, history_log_num, history_float, history_integer, + history_string, history_text, history_log); + } + + if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_HISTORY)) + { + phistory = history; + history_num_loc = history_num; + } + + if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS)) + { + ptrends = trends; + trends_num_loc = trends_num; + } + + if (NULL != phistory || NULL != ptrends) + { + DCexport_history_and_trends(phistory, history_num_loc, &itemids, items, + errcodes, ptrends, trends_num_loc); + } + } + + if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS)) + zbx_export_events(); + } + + if (0 != history_num || 0 != timers_num) + zbx_clean_events(); + + if (0 != history_num) + { + zbx_free(trends); + DCconfig_clean_items(items, errcodes, history_num); + + zbx_vector_ptr_clear(&history_items); + hc_free_item_values(history, history_num); + } + + zbx_vector_uint64_clear(&itemids); + + /* Exit from sync loop if we have spent too much time here. */ + /* This is done to allow syncer process to update its statistics. */ + } + while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start); + + zbx_free(items); + zbx_free(errcodes); + + zbx_vector_ptr_destroy(&trigger_order); + zbx_hashset_destroy(&trigger_info); + + zbx_vector_uint64_destroy(&itemids); + zbx_vector_ptr_destroy(&history_items); + zbx_vector_ptr_destroy(&inventory_values); + zbx_vector_ptr_destroy(&item_diff); + zbx_vector_ptr_destroy(&trigger_diff); + zbx_vector_uint64_pair_destroy(&trends_diff); + zbx_vector_uint64_pair_destroy(&proxy_subscribtions); + + zbx_vector_ptr_destroy(&trigger_timers); + zbx_vector_uint64_destroy(&triggerids); +} + +/****************************************************************************** + * * + * Purpose: writes updates and new data from history cache to database * + * * + * Comments: This function is used to flush history cache at server/proxy * + * exit. * + * Other processes are already terminated, so cache locking is * + * unnecessary. * + * * + ******************************************************************************/ +static void sync_history_cache_full(void) +{ + int values_num = 0, triggers_num = 0, more; + zbx_hashset_iter_t iter; + zbx_hc_item_t *item; + zbx_binary_heap_t tmp_history_queue; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num); + + /* History index cache might be full without any space left for queueing items from history index to */ + /* history queue. The solution: replace the shared-memory history queue with heap-allocated one. Add */ + /* all items from history index to the new history queue. */ + /* */ + /* Assertions that must be true. */ + /* * This is the main server or proxy process, */ + /* * There are no other users of history index cache stored in shared memory. Other processes */ + /* should have quit by this point. */ + /* * other parts of the program do not hold pointers to the elements of history queue that is */ + /* stored in the shared memory. */ + + if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) + { + /* unlock all triggers before full sync so no items are locked by triggers */ + DCconfig_unlock_all_triggers(); + } + + tmp_history_queue = cache->history_queue; + + zbx_binary_heap_create(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY); + zbx_hashset_iter_reset(&cache->history_items, &iter); + + /* add all items from history index to the new history queue */ + while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter))) + { + if (NULL != item->tail) + { + item->status = ZBX_HC_ITEM_STATUS_NORMAL; + hc_queue_item(item); + } + } + + if (0 != hc_queue_get_size()) + { + zabbix_log(LOG_LEVEL_WARNING, "syncing history data..."); + + do + { + if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) + sync_server_history(&values_num, &triggers_num, &more); + else + sync_proxy_history(&values_num, &more); + + zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%", + (double)values_num / (cache->history_num + values_num) * 100); + } + while (0 != hc_queue_get_size()); + + zabbix_log(LOG_LEVEL_WARNING, "syncing history data done"); + } + + zbx_binary_heap_destroy(&cache->history_queue); + cache->history_queue = tmp_history_queue; + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: log progress of syncing history data * + * * + ******************************************************************************/ +void zbx_log_sync_history_cache_progress(void) +{ + double pcnt = -1.0; + int ts_last, ts_next, sec; + + LOCK_CACHE; + + if (INT_MAX == cache->history_progress_ts) + { + UNLOCK_CACHE; + return; + } + + ts_last = cache->history_progress_ts; + sec = time(NULL); + + if (0 == cache->history_progress_ts) + { + cache->history_num_total = cache->history_num; + cache->history_progress_ts = sec; + } + + if (ZBX_HC_SYNC_TIME_MAX <= sec - cache->history_progress_ts || 0 == cache->history_num) + { + if (0 != cache->history_num_total) + pcnt = 100 * (double)(cache->history_num_total - cache->history_num) / cache->history_num_total; + + cache->history_progress_ts = (0 == cache->history_num ? INT_MAX : sec); + } + + ts_next = cache->history_progress_ts; + + UNLOCK_CACHE; + + if (0 == ts_last) + zabbix_log(LOG_LEVEL_WARNING, "syncing history data in progress... "); + + if (-1.0 != pcnt) + zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%", pcnt); + + if (INT_MAX == ts_next) + zabbix_log(LOG_LEVEL_WARNING, "syncing history data done"); +} + +/****************************************************************************** + * * + * Purpose: writes updates and new data from history cache to database * + * * + * Parameters: values_num - [OUT] the number of synced values * + * more - [OUT] a flag indicating the cache emptiness: * + * ZBX_SYNC_DONE - nothing to sync, go idle * + * ZBX_SYNC_MORE - more data to sync * + * * + ******************************************************************************/ +void zbx_sync_history_cache(int *values_num, int *triggers_num, int *more) +{ + zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num); + + *values_num = 0; + *triggers_num = 0; + + if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) + sync_server_history(values_num, triggers_num, more); + else + sync_proxy_history(values_num, more); +} + +/****************************************************************************** + * * + * local history cache * + * * + ******************************************************************************/ +static void dc_string_buffer_realloc(size_t len) +{ + if (string_values_alloc >= string_values_offset + len) + return; + + do + { + string_values_alloc += ZBX_STRING_REALLOC_STEP; + } + while (string_values_alloc < string_values_offset + len); + + string_values = (char *)zbx_realloc(string_values, string_values_alloc); +} + +static dc_item_value_t *dc_local_get_history_slot(void) +{ + if (ZBX_MAX_VALUES_LOCAL == item_values_num) + dc_flush_history(); + + if (item_values_alloc == item_values_num) + { + item_values_alloc += ZBX_STRUCT_REALLOC_STEP; + item_values = (dc_item_value_t *)zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t)); + } + + return &item_values[item_values_num++]; +} + +static void dc_local_add_history_dbl(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, + double value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags) +{ + dc_item_value_t *item_value; + + item_value = dc_local_get_history_slot(); + + item_value->itemid = itemid; + item_value->ts = *ts; + item_value->item_value_type = item_value_type; + item_value->value_type = ITEM_VALUE_TYPE_FLOAT; + item_value->state = ITEM_STATE_NORMAL; + item_value->flags = flags; + + if (0 != (item_value->flags & ZBX_DC_FLAG_META)) + { + item_value->lastlogsize = lastlogsize; + item_value->mtime = mtime; + } + + if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE)) + item_value->value.value_dbl = value_orig; +} + +static void dc_local_add_history_uint(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, + zbx_uint64_t value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags) +{ + dc_item_value_t *item_value; + + item_value = dc_local_get_history_slot(); + + item_value->itemid = itemid; + item_value->ts = *ts; + item_value->item_value_type = item_value_type; + item_value->value_type = ITEM_VALUE_TYPE_UINT64; + item_value->state = ITEM_STATE_NORMAL; + item_value->flags = flags; + + if (0 != (item_value->flags & ZBX_DC_FLAG_META)) + { + item_value->lastlogsize = lastlogsize; + item_value->mtime = mtime; + } + + if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE)) + item_value->value.value_uint = value_orig; +} + +static void dc_local_add_history_text(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, + const char *value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags) +{ + dc_item_value_t *item_value; + + item_value = dc_local_get_history_slot(); + + item_value->itemid = itemid; + item_value->ts = *ts; + item_value->item_value_type = item_value_type; + item_value->value_type = ITEM_VALUE_TYPE_TEXT; + item_value->state = ITEM_STATE_NORMAL; + item_value->flags = flags; + + if (0 != (item_value->flags & ZBX_DC_FLAG_META)) + { + item_value->lastlogsize = lastlogsize; + item_value->mtime = mtime; + } + + if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE)) + { + item_value->value.value_str.len = zbx_db_strlen_n(value_orig, ZBX_HISTORY_VALUE_LEN) + 1; + dc_string_buffer_realloc(item_value->value.value_str.len); + + item_value->value.value_str.pvalue = string_values_offset; + memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len); + string_values_offset += item_value->value.value_str.len; + } + else + item_value->value.value_str.len = 0; +} + +static void dc_local_add_history_log(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, + const zbx_log_t *log, zbx_uint64_t lastlogsize, int mtime, unsigned char flags) +{ + dc_item_value_t *item_value; + + item_value = dc_local_get_history_slot(); + + item_value->itemid = itemid; + item_value->ts = *ts; + item_value->item_value_type = item_value_type; + item_value->value_type = ITEM_VALUE_TYPE_LOG; + item_value->state = ITEM_STATE_NORMAL; + + item_value->flags = flags; + + if (0 != (item_value->flags & ZBX_DC_FLAG_META)) + { + item_value->lastlogsize = lastlogsize; + item_value->mtime = mtime; + } + + if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE)) + { + item_value->severity = log->severity; + item_value->logeventid = log->logeventid; + item_value->timestamp = log->timestamp; + + item_value->value.value_str.len = zbx_db_strlen_n(log->value, ZBX_HISTORY_VALUE_LEN) + 1; + + if (NULL != log->source && '\0' != *log->source) + item_value->source.len = zbx_db_strlen_n(log->source, ZBX_HISTORY_LOG_SOURCE_LEN) + 1; + else + item_value->source.len = 0; + } + else + { + item_value->value.value_str.len = 0; + item_value->source.len = 0; + } + + if (0 != item_value->value.value_str.len + item_value->source.len) + { + dc_string_buffer_realloc(item_value->value.value_str.len + item_value->source.len); + + if (0 != item_value->value.value_str.len) + { + item_value->value.value_str.pvalue = string_values_offset; + memcpy(&string_values[string_values_offset], log->value, item_value->value.value_str.len); + string_values_offset += item_value->value.value_str.len; + } + + if (0 != item_value->source.len) + { + item_value->source.pvalue = string_values_offset; + memcpy(&string_values[string_values_offset], log->source, item_value->source.len); + string_values_offset += item_value->source.len; + } + } +} + +static void dc_local_add_history_notsupported(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *error, + zbx_uint64_t lastlogsize, int mtime, unsigned char flags) +{ + dc_item_value_t *item_value; + + item_value = dc_local_get_history_slot(); + + item_value->itemid = itemid; + item_value->ts = *ts; + item_value->state = ITEM_STATE_NOTSUPPORTED; + item_value->flags = flags; + + if (0 != (item_value->flags & ZBX_DC_FLAG_META)) + { + item_value->lastlogsize = lastlogsize; + item_value->mtime = mtime; + } + + item_value->value.value_str.len = zbx_db_strlen_n(error, ZBX_ITEM_ERROR_LEN) + 1; + dc_string_buffer_realloc(item_value->value.value_str.len); + item_value->value.value_str.pvalue = string_values_offset; + memcpy(&string_values[string_values_offset], error, item_value->value.value_str.len); + string_values_offset += item_value->value.value_str.len; +} + +static void dc_local_add_history_lld(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig) +{ + dc_item_value_t *item_value; + + item_value = dc_local_get_history_slot(); + + item_value->itemid = itemid; + item_value->ts = *ts; + item_value->state = ITEM_STATE_NORMAL; + item_value->flags = ZBX_DC_FLAG_LLD; + item_value->value.value_str.len = strlen(value_orig) + 1; + + dc_string_buffer_realloc(item_value->value.value_str.len); + item_value->value.value_str.pvalue = string_values_offset; + memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len); + string_values_offset += item_value->value.value_str.len; +} + +static void dc_local_add_history_empty(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts, + unsigned char flags) +{ + dc_item_value_t *item_value; + + item_value = dc_local_get_history_slot(); + + item_value->itemid = itemid; + item_value->ts = *ts; + item_value->item_value_type = item_value_type; + item_value->value_type = ITEM_VALUE_TYPE_NONE; + item_value->state = ITEM_STATE_NORMAL; + item_value->flags = flags; +} + +/****************************************************************************** + * * + * Purpose: add new value to the cache * + * * + * Parameters: itemid - [IN] the itemid * + * item_value_type - [IN] the item value type * + * item_flags - [IN] the item flags (e. g. lld rule) * + * result - [IN] agent result containing the value * + * to add * + * ts - [IN] the value timestamp * + * state - [IN] the item state * + * error - [IN] the error message in case item state * + * is ITEM_STATE_NOTSUPPORTED * + * * + ******************************************************************************/ +void dc_add_history(zbx_uint64_t itemid, unsigned char item_value_type, unsigned char item_flags, + AGENT_RESULT *result, const zbx_timespec_t *ts, unsigned char state, const char *error) +{ + unsigned char value_flags; + + if (ITEM_STATE_NOTSUPPORTED == state) + { + zbx_uint64_t lastlogsize; + int mtime; + + if (NULL != result && 0 != ZBX_ISSET_META(result)) + { + value_flags = ZBX_DC_FLAG_META; + lastlogsize = result->lastlogsize; + mtime = result->mtime; + } + else + { + value_flags = 0; + lastlogsize = 0; + mtime = 0; + } + dc_local_add_history_notsupported(itemid, ts, error, lastlogsize, mtime, value_flags); + return; + } + + if (NULL == result) + return; + + /* allow proxy to send timestamps of empty (throttled etc) values to update nextchecks for queue */ + if (!ZBX_ISSET_VALUE(result) && !ZBX_ISSET_META(result) && 0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) + return; + + value_flags = 0; + + if (!ZBX_ISSET_VALUE(result)) + value_flags |= ZBX_DC_FLAG_NOVALUE; + + if (ZBX_ISSET_META(result)) + value_flags |= ZBX_DC_FLAG_META; + + /* Add data to the local history cache if: */ + /* 1) the NOVALUE flag is set (data contains either meta information or timestamp) */ + /* 2) the NOVALUE flag is not set and value conversion succeeded */ + + if (0 == (value_flags & ZBX_DC_FLAG_NOVALUE)) + { + if (0 != (ZBX_FLAG_DISCOVERY_RULE & item_flags)) + { + if (NULL == ZBX_GET_TEXT_RESULT(result)) + return; + + /* proxy stores low-level discovery (lld) values in db */ + if (0 == (ZBX_PROGRAM_TYPE_SERVER & program_type)) + dc_local_add_history_lld(itemid, ts, result->text); + + return; + } + + if (ZBX_ISSET_LOG(result)) + { + dc_local_add_history_log(itemid, item_value_type, ts, result->log, result->lastlogsize, + result->mtime, value_flags); + } + else if (ZBX_ISSET_UI64(result)) + { + dc_local_add_history_uint(itemid, item_value_type, ts, result->ui64, result->lastlogsize, + result->mtime, value_flags); + } + else if (ZBX_ISSET_DBL(result)) + { + dc_local_add_history_dbl(itemid, item_value_type, ts, result->dbl, result->lastlogsize, + result->mtime, value_flags); + } + else if (ZBX_ISSET_STR(result)) + { + dc_local_add_history_text(itemid, item_value_type, ts, result->str, result->lastlogsize, + result->mtime, value_flags); + } + else if (ZBX_ISSET_TEXT(result)) + { + dc_local_add_history_text(itemid, item_value_type, ts, result->text, result->lastlogsize, + result->mtime, value_flags); + } + else + { + THIS_SHOULD_NEVER_HAPPEN; + } + } + else + { + if (0 != (value_flags & ZBX_DC_FLAG_META)) + { + dc_local_add_history_log(itemid, item_value_type, ts, NULL, result->lastlogsize, result->mtime, + value_flags); + } + else + dc_local_add_history_empty(itemid, item_value_type, ts, value_flags); + } +} + +void dc_flush_history(void) +{ + if (0 == item_values_num) + return; + + LOCK_CACHE; + + hc_add_item_values(item_values, item_values_num); + + cache->history_num += item_values_num; + + UNLOCK_CACHE; + + item_values_num = 0; + string_values_offset = 0; +} + +/****************************************************************************** + * * + * history cache storage * + * * + ******************************************************************************/ +ZBX_SHMEM_FUNC_IMPL(__hc_index, hc_index_mem) +ZBX_SHMEM_FUNC_IMPL(__hc, hc_mem) + +/****************************************************************************** + * * + * Purpose: compares history queue elements * + * * + ******************************************************************************/ +static int hc_queue_elem_compare_func(const void *d1, const void *d2) +{ + const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1; + const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2; + + const zbx_hc_item_t *item1 = (const zbx_hc_item_t *)e1->data; + const zbx_hc_item_t *item2 = (const zbx_hc_item_t *)e2->data; + + /* compare by timestamp of the oldest value */ + return zbx_timespec_compare(&item1->tail->ts, &item2->tail->ts); +} + +/****************************************************************************** + * * + * Purpose: free history item data allocated in history cache * + * * + * Parameters: data - [IN] history item data * + * * + ******************************************************************************/ +static void hc_free_data(zbx_hc_data_t *data) +{ + if (ITEM_STATE_NOTSUPPORTED == data->state) + { + __hc_shmem_free_func(data->value.str); + } + else + { + if (0 == (data->flags & ZBX_DC_FLAG_NOVALUE)) + { + switch (data->value_type) + { + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + __hc_shmem_free_func(data->value.str); + break; + case ITEM_VALUE_TYPE_LOG: + __hc_shmem_free_func(data->value.log->value); + + if (NULL != data->value.log->source) + __hc_shmem_free_func(data->value.log->source); + + __hc_shmem_free_func(data->value.log); + break; + } + } + } + + __hc_shmem_free_func(data); +} + +/****************************************************************************** + * * + * Purpose: put back item into history queue * + * * + * Parameters: data - [IN] history item data * + * * + ******************************************************************************/ +static void hc_queue_item(zbx_hc_item_t *item) +{ + zbx_binary_heap_elem_t elem = {item->itemid, (const void *)item}; + + zbx_binary_heap_insert(&cache->history_queue, &elem); +} + +/****************************************************************************** + * * + * Purpose: returns history item by itemid * + * * + * Parameters: itemid - [IN] the item id * + * * + * Return value: the history item or NULL if the requested item is not in * + * history cache * + * * + ******************************************************************************/ +static zbx_hc_item_t *hc_get_item(zbx_uint64_t itemid) +{ + return (zbx_hc_item_t *)zbx_hashset_search(&cache->history_items, &itemid); +} + +/****************************************************************************** + * * + * Purpose: adds a new item to history cache * + * * + * Parameters: itemid - [IN] the item id * + * [IN] the item data * + * * + * Return value: the added history item * + * * + ******************************************************************************/ +static zbx_hc_item_t *hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data) +{ + zbx_hc_item_t item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, 0, data, data}; + + return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local)); +} + +/****************************************************************************** + * * + * Purpose: copies string value to history cache * + * * + * Parameters: str - [IN] the string value * + * * + * Return value: the copied string or NULL if there was not enough memory * + * * + ******************************************************************************/ +static char *hc_mem_value_str_dup(const dc_value_str_t *str) +{ + char *ptr; + + if (NULL == (ptr = (char *)__hc_shmem_malloc_func(NULL, str->len))) + return NULL; + + memcpy(ptr, &string_values[str->pvalue], str->len - 1); + ptr[str->len - 1] = '\0'; + + return ptr; +} + +/****************************************************************************** + * * + * Purpose: clones string value into history data memory * + * * + * Parameters: dst - [IN/OUT] a reference to the cloned value * + * str - [IN] the string value to clone * + * * + * Return value: SUCCESS - either there was no need to clone the string * + * (it was empty or already cloned) or the string was * + * cloned successfully * + * FAIL - not enough memory * + * * + * Comments: This function can be called in loop with the same dst value * + * until it finishes cloning string value. * + * * + ******************************************************************************/ +static int hc_clone_history_str_data(char **dst, const dc_value_str_t *str) +{ + if (0 == str->len) + return SUCCEED; + + if (NULL != *dst) + return SUCCEED; + + if (NULL != (*dst = hc_mem_value_str_dup(str))) + return SUCCEED; + + return FAIL; +} + +/****************************************************************************** + * * + * Purpose: clones log value into history data memory * + * * + * Parameters: dst - [IN/OUT] a reference to the cloned value * + * item_value - [IN] the log value to clone * + * * + * Return value: SUCCESS - the log value was cloned successfully * + * FAIL - not enough memory * + * * + * Comments: This function can be called in loop with the same dst value * + * until it finishes cloning log value. * + * * + ******************************************************************************/ +static int hc_clone_history_log_data(zbx_log_value_t **dst, const dc_item_value_t *item_value) +{ + if (NULL == *dst) + { + /* using realloc instead of malloc just to suppress 'not used' warning for realloc */ + if (NULL == (*dst = (zbx_log_value_t *)__hc_shmem_realloc_func(NULL, sizeof(zbx_log_value_t)))) + return FAIL; + + memset(*dst, 0, sizeof(zbx_log_value_t)); + } + + if (SUCCEED != hc_clone_history_str_data(&(*dst)->value, &item_value->value.value_str)) + return FAIL; + + if (SUCCEED != hc_clone_history_str_data(&(*dst)->source, &item_value->source)) + return FAIL; + + (*dst)->logeventid = item_value->logeventid; + (*dst)->severity = item_value->severity; + (*dst)->timestamp = item_value->timestamp; + + return SUCCEED; +} + +/****************************************************************************** + * * + * Purpose: clones item value from local cache into history cache * + * * + * Parameters: data - [IN/OUT] a reference to the cloned value * + * item_value - [IN] the item value * + * * + * Return value: SUCCESS - the item value was cloned successfully * + * FAIL - not enough memory * + * * + * Comments: This function can be called in loop with the same data value * + * until it finishes cloning item value. * + * * + ******************************************************************************/ +static int hc_clone_history_data(zbx_hc_data_t **data, const dc_item_value_t *item_value) +{ + if (NULL == *data) + { + if (NULL == (*data = (zbx_hc_data_t *)__hc_shmem_malloc_func(NULL, sizeof(zbx_hc_data_t)))) + return FAIL; + + memset(*data, 0, sizeof(zbx_hc_data_t)); + + (*data)->state = item_value->state; + (*data)->ts = item_value->ts; + (*data)->flags = item_value->flags; + } + + if (0 != (ZBX_DC_FLAG_META & item_value->flags)) + { + (*data)->lastlogsize = item_value->lastlogsize; + (*data)->mtime = item_value->mtime; + } + + if (ITEM_STATE_NOTSUPPORTED == item_value->state) + { + if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str))) + return FAIL; + + (*data)->value_type = item_value->value_type; + cache->stats.notsupported_counter++; + + return SUCCEED; + } + + if (0 != (ZBX_DC_FLAG_LLD & item_value->flags)) + { + if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str))) + return FAIL; + + (*data)->value_type = ITEM_VALUE_TYPE_TEXT; + + cache->stats.history_text_counter++; + cache->stats.history_counter++; + + return SUCCEED; + } + + if (0 == (ZBX_DC_FLAG_NOVALUE & item_value->flags)) + { + switch (item_value->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + (*data)->value.dbl = item_value->value.value_dbl; + break; + case ITEM_VALUE_TYPE_UINT64: + (*data)->value.ui64 = item_value->value.value_uint; + break; + case ITEM_VALUE_TYPE_STR: + if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str, + &item_value->value.value_str)) + { + return FAIL; + } + break; + case ITEM_VALUE_TYPE_TEXT: + if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str, + &item_value->value.value_str)) + { + return FAIL; + } + break; + case ITEM_VALUE_TYPE_LOG: + if (SUCCEED != hc_clone_history_log_data(&(*data)->value.log, item_value)) + return FAIL; + break; + } + + switch (item_value->item_value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + cache->stats.history_float_counter++; + break; + case ITEM_VALUE_TYPE_UINT64: + cache->stats.history_uint_counter++; + break; + case ITEM_VALUE_TYPE_STR: + cache->stats.history_str_counter++; + break; + case ITEM_VALUE_TYPE_TEXT: + cache->stats.history_text_counter++; + break; + case ITEM_VALUE_TYPE_LOG: + cache->stats.history_log_counter++; + break; + } + + cache->stats.history_counter++; + } + + (*data)->value_type = item_value->value_type; + + return SUCCEED; +} + +/****************************************************************************** + * * + * Purpose: adds item values to the history cache * + * * + * Parameters: values - [IN] the item values to add * + * values_num - [IN] the number of item values to add * + * * + * Comments: If the history cache is full this function will wait until * + * history syncers processes values freeing enough space to store * + * the new value. * + * * + ******************************************************************************/ +static void hc_add_item_values(dc_item_value_t *values, int values_num) +{ + dc_item_value_t *item_value; + int i; + zbx_hc_item_t *item; + + for (i = 0; i < values_num; i++) + { + zbx_hc_data_t *data = NULL; + + item_value = &values[i]; + + /* a record with metadata and no value can be dropped if */ + /* the metadata update is copied to the last queued value */ + if (NULL != (item = hc_get_item(item_value->itemid)) && + 0 != (item_value->flags & ZBX_DC_FLAG_NOVALUE) && + 0 != (item_value->flags & ZBX_DC_FLAG_META)) + { + /* skip metadata updates when only one value is queued, */ + /* because the item might be already being processed */ + if (item->head != item->tail) + { + item->head->lastlogsize = item_value->lastlogsize; + item->head->mtime = item_value->mtime; + item->head->flags |= ZBX_DC_FLAG_META; + continue; + } + } + + if (SUCCEED != hc_clone_history_data(&data, item_value)) + { + do + { + UNLOCK_CACHE; + + zabbix_log(LOG_LEVEL_DEBUG, "History cache is full. Sleeping for 1 second."); + sleep(1); + + LOCK_CACHE; + } + while (SUCCEED != hc_clone_history_data(&data, item_value)); + + item = hc_get_item(item_value->itemid); + } + + if (NULL == item) + { + item = hc_add_item(item_value->itemid, data); + hc_queue_item(item); + } + else + { + item->head->next = data; + item->head = data; + } + item->values_num++; + } +} + +/****************************************************************************** + * * + * Purpose: copies item value from history cache into the specified history * + * value * + * * + * Parameters: history - [OUT] the history value * + * itemid - [IN] the item identifier * + * data - [IN] the history data to copy * + * * + * Comments: handling of uninitialized fields in dc_add_proxy_history_log() * + * * + ******************************************************************************/ +static void hc_copy_history_data(ZBX_DC_HISTORY *history, zbx_uint64_t itemid, zbx_hc_data_t *data) +{ + history->itemid = itemid; + history->ts = data->ts; + history->state = data->state; + history->flags = data->flags; + history->lastlogsize = data->lastlogsize; + history->mtime = data->mtime; + + if (ITEM_STATE_NOTSUPPORTED == data->state) + { + history->value.err = zbx_strdup(NULL, data->value.str); + history->flags |= ZBX_DC_FLAG_UNDEF; + return; + } + + history->value_type = data->value_type; + + if (0 == (ZBX_DC_FLAG_NOVALUE & data->flags)) + { + switch (data->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + history->value.dbl = data->value.dbl; + break; + case ITEM_VALUE_TYPE_UINT64: + history->value.ui64 = data->value.ui64; + break; + case ITEM_VALUE_TYPE_STR: + case ITEM_VALUE_TYPE_TEXT: + history->value.str = zbx_strdup(NULL, data->value.str); + break; + case ITEM_VALUE_TYPE_LOG: + history->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t)); + history->value.log->value = zbx_strdup(NULL, data->value.log->value); + + if (NULL != data->value.log->source) + history->value.log->source = zbx_strdup(NULL, data->value.log->source); + else + history->value.log->source = NULL; + + history->value.log->timestamp = data->value.log->timestamp; + history->value.log->severity = data->value.log->severity; + history->value.log->logeventid = data->value.log->logeventid; + + break; + } + } +} + +/****************************************************************************** + * * + * Purpose: pops the next batch of history items from cache for processing * + * * + * Parameters: history_items - [OUT] the locked history items * + * * + * Comments: The history_items must be returned back to history cache with * + * hc_push_items() function after they have been processed. * + * * + ******************************************************************************/ +static void hc_pop_items(zbx_vector_ptr_t *history_items) +{ + zbx_binary_heap_elem_t *elem; + zbx_hc_item_t *item; + + while (ZBX_HC_SYNC_MAX > history_items->values_num && FAIL == zbx_binary_heap_empty(&cache->history_queue)) + { + elem = zbx_binary_heap_find_min(&cache->history_queue); + item = (zbx_hc_item_t *)elem->data; + zbx_vector_ptr_append(history_items, item); + + zbx_binary_heap_remove_min(&cache->history_queue); + } +} + +/****************************************************************************** + * * + * Purpose: gets item history values * + * * + * Parameters: history - [OUT] the history values * + * history_items - [IN] the history items * + * * + ******************************************************************************/ +static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items) +{ + int i, history_num = 0; + zbx_hc_item_t *item; + + /* we don't need to lock history cache because no other processes can */ + /* change item's history data until it is pushed back to history queue */ + for (i = 0; i < history_items->values_num; i++) + { + item = (zbx_hc_item_t *)history_items->values[i]; + + if (ZBX_HC_ITEM_STATUS_BUSY == item->status) + continue; + + hc_copy_history_data(&history[history_num++], item->itemid, item->tail); + } +} + +/****************************************************************************** + * * + * Purpose: push back the processed history items into history cache * + * * + * Parameters: history_items - [IN] the history items containing processed * + * (available) and busy items * + * * + * Comments: This function removes processed value from history cache. * + * If there is no more data for this item, then the item itself is * + * removed from history index. * + * * + ******************************************************************************/ +void hc_push_items(zbx_vector_ptr_t *history_items) +{ + int i; + zbx_hc_item_t *item; + zbx_hc_data_t *data_free; + + for (i = 0; i < history_items->values_num; i++) + { + item = (zbx_hc_item_t *)history_items->values[i]; + + switch (item->status) + { + case ZBX_HC_ITEM_STATUS_BUSY: + /* reset item status before returning it to queue */ + item->status = ZBX_HC_ITEM_STATUS_NORMAL; + hc_queue_item(item); + break; + case ZBX_HC_ITEM_STATUS_NORMAL: + item->values_num--; + data_free = item->tail; + item->tail = item->tail->next; + hc_free_data(data_free); + if (NULL == item->tail) + zbx_hashset_remove(&cache->history_items, item); + else + hc_queue_item(item); + break; + } + } +} + +/****************************************************************************** + * * + * Purpose: retrieve the size of history queue * + * * + ******************************************************************************/ +int hc_queue_get_size(void) +{ + return cache->history_queue.elems_num; +} + +int hc_get_history_compression_age(void) +{ +#if defined(HAVE_POSTGRESQL) + zbx_config_t cfg; + int compression_age = 0; + + zbx_config_get(&cfg, ZBX_CONFIG_FLAGS_DB_EXTENSION); + + if (ON == cfg.db.history_compression_status && 0 != cfg.db.history_compress_older) + { + compression_age = (int)time(NULL) - cfg.db.history_compress_older; + } + + zbx_config_clean(&cfg); + + return compression_age; +#else + return 0; +#endif +} + +/****************************************************************************** + * * + * Purpose: Allocate shared memory for trend cache (part of database cache) * + * * + * Comments: Is optionally called from init_database_cache() * + * * + ******************************************************************************/ + +ZBX_SHMEM_FUNC_IMPL(__trend, trend_mem) + +static int init_trend_cache(char **error) +{ + size_t sz; + int ret; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + if (SUCCEED != (ret = zbx_mutex_create(&trends_lock, ZBX_MUTEX_TRENDS, error))) + goto out; + + sz = zbx_shmem_required_size(1, "trend cache", "TrendCacheSize"); + if (SUCCEED != (ret = zbx_shmem_create(&trend_mem, CONFIG_TRENDS_CACHE_SIZE, "trend cache", "TrendCacheSize", 0, + error))) + { + goto out; + } + + CONFIG_TRENDS_CACHE_SIZE -= sz; + + cache->trends_num = 0; + cache->trends_last_cleanup_hour = 0; + +#define INIT_HASHSET_SIZE 100 /* Should be calculated dynamically based on trends size? */ + /* Still does not make sense to have it more than initial */ + /* item hashset size in configuration cache. */ + + zbx_hashset_create_ext(&cache->trends, INIT_HASHSET_SIZE, + ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL, + __trend_shmem_malloc_func, __trend_shmem_realloc_func, __trend_shmem_free_func); + +#undef INIT_HASHSET_SIZE +out: + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); + + return ret; +} + +/****************************************************************************** + * * + * Purpose: Allocate shared memory for database cache * + * * + ******************************************************************************/ +int init_database_cache(char **error) +{ + int ret; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + if (NULL != cache) + { + ret = SUCCEED; + goto out; + } + + if (SUCCEED != (ret = zbx_mutex_create(&cache_lock, ZBX_MUTEX_CACHE, error))) + goto out; + + if (SUCCEED != (ret = zbx_mutex_create(&cache_ids_lock, ZBX_MUTEX_CACHE_IDS, error))) + goto out; + + if (SUCCEED != (ret = zbx_shmem_create(&hc_mem, CONFIG_HISTORY_CACHE_SIZE, "history cache", + "HistoryCacheSize", 1, error))) + { + goto out; + } + + if (SUCCEED != (ret = zbx_shmem_create(&hc_index_mem, CONFIG_HISTORY_INDEX_CACHE_SIZE, "history index cache", + "HistoryIndexCacheSize", 0, error))) + { + goto out; + } + + cache = (ZBX_DC_CACHE *)__hc_index_shmem_malloc_func(NULL, sizeof(ZBX_DC_CACHE)); + memset(cache, 0, sizeof(ZBX_DC_CACHE)); + + ids = (ZBX_DC_IDS *)__hc_index_shmem_malloc_func(NULL, sizeof(ZBX_DC_IDS)); + memset(ids, 0, sizeof(ZBX_DC_IDS)); + + zbx_hashset_create_ext(&cache->history_items, ZBX_HC_ITEMS_INIT_SIZE, + ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL, + __hc_index_shmem_malloc_func, __hc_index_shmem_realloc_func, __hc_index_shmem_free_func); + + zbx_binary_heap_create_ext(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY, + __hc_index_shmem_malloc_func, __hc_index_shmem_realloc_func, __hc_index_shmem_free_func); + + if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) + { + zbx_hashset_create_ext(&(cache->proxyqueue.index), ZBX_HC_SYNC_MAX, + ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL, + __hc_index_shmem_malloc_func, __hc_index_shmem_realloc_func, __hc_index_shmem_free_func); + + zbx_list_create_ext(&(cache->proxyqueue.list), __hc_index_shmem_malloc_func, + __hc_index_shmem_free_func); + + cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL; + + if (SUCCEED != (ret = init_trend_cache(error))) + goto out; + } + + cache->history_num_total = 0; + cache->history_progress_ts = 0; + + cache->db_trigger_queue_lock = 1; + + cache->proxy_history_count = 0; + + if (NULL == sql) + sql = (char *)zbx_malloc(sql, sql_alloc); +out: + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); + + return ret; +} + +/****************************************************************************** + * * + * Purpose: change proxy_history_count by count * + * * + ******************************************************************************/ +void change_proxy_history_count(int change_count) +{ + LOCK_CACHE; + + cache->proxy_history_count += change_count; + + UNLOCK_CACHE; +} + +/****************************************************************************** + * * + * Purpose: change proxy_history_count by count * + * * + ******************************************************************************/ +void reset_proxy_history_count(int reset) +{ + LOCK_CACHE; + + cache->proxy_history_count = reset; + + UNLOCK_CACHE; +} + +/****************************************************************************** + * * + * Purpose: get proxy_history_count value * + * * + ******************************************************************************/ +int get_proxy_history_count(void) +{ + int proxy_history_count; + + LOCK_CACHE; + proxy_history_count = cache->proxy_history_count; + UNLOCK_CACHE; + + return proxy_history_count; +} +/****************************************************************************** + * * + * Purpose: writes updates and new data from pool and cache data to database * + * * + ******************************************************************************/ +static void DCsync_all(void) +{ + zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_all()"); + + sync_history_cache_full(); + if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) + DCsync_trends(); + + zabbix_log(LOG_LEVEL_DEBUG, "End of DCsync_all()"); +} + +/****************************************************************************** + * * + * Purpose: Free memory allocated for database cache * + * * + ******************************************************************************/ +void free_database_cache(int sync) +{ + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + if (ZBX_SYNC_ALL == sync) + DCsync_all(); + + cache = NULL; + + zbx_shmem_destroy(hc_mem); + hc_mem = NULL; + zbx_shmem_destroy(hc_index_mem); + hc_index_mem = NULL; + + zbx_mutex_destroy(&cache_lock); + zbx_mutex_destroy(&cache_ids_lock); + + if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER)) + { + zbx_shmem_destroy(trend_mem); + trend_mem = NULL; + zbx_mutex_destroy(&trends_lock); + } + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: Return next id for requested table * + * * + ******************************************************************************/ +zbx_uint64_t DCget_nextid(const char *table_name, int num) +{ + int i; + DB_RESULT result; + DB_ROW row; + const ZBX_TABLE *table; + ZBX_DC_ID *id; + zbx_uint64_t min = 0, max = ZBX_DB_MAX_ID, nextid, lastid; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s' num:%d", __func__, table_name, num); + + LOCK_CACHE_IDS; + + for (i = 0; i < ZBX_IDS_SIZE; i++) + { + id = &ids->id[i]; + if ('\0' == *id->table_name) + break; + + if (0 == strcmp(id->table_name, table_name)) + { + nextid = id->lastid + 1; + id->lastid += num; + lastid = id->lastid; + + UNLOCK_CACHE_IDS; + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]", + __func__, table_name, nextid, lastid); + + return nextid; + } + } + + if (i == ZBX_IDS_SIZE) + { + zabbix_log(LOG_LEVEL_ERR, "insufficient shared memory for ids"); + exit(EXIT_FAILURE); + } + + table = DBget_table(table_name); + + result = DBselect("select max(%s) from %s where %s between " ZBX_FS_UI64 " and " ZBX_FS_UI64, + table->recid, table_name, table->recid, min, max); + + if (NULL != result) + { + zbx_strlcpy(id->table_name, table_name, sizeof(id->table_name)); + + if (NULL == (row = DBfetch(result)) || SUCCEED == DBis_null(row[0])) + id->lastid = min; + else + ZBX_STR2UINT64(id->lastid, row[0]); + + nextid = id->lastid + 1; + id->lastid += num; + lastid = id->lastid; + } + else + nextid = lastid = 0; + + UNLOCK_CACHE_IDS; + + DBfree_result(result); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]", + __func__, table_name, nextid, lastid); + + return nextid; +} + +/****************************************************************************** + * * + * Purpose: performs interface availability reset for hosts with * + * availability set on interfaces without enabled items * + * * + ******************************************************************************/ +void DCupdate_interfaces_availability(void) +{ + zbx_vector_availability_ptr_t interfaces; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__); + + zbx_vector_availability_ptr_create(&interfaces); + + if (SUCCEED != DCreset_interfaces_availability(&interfaces)) + goto out; + + zbx_availabilities_flush(&interfaces); +out: + zbx_vector_availability_ptr_clear_ext(&interfaces, zbx_interface_availability_free); + zbx_vector_availability_ptr_destroy(&interfaces); + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__); +} + +/****************************************************************************** + * * + * Purpose: get history cache diagnostics statistics * + * * + ******************************************************************************/ +void zbx_hc_get_diag_stats(zbx_uint64_t *items_num, zbx_uint64_t *values_num) +{ + LOCK_CACHE; + + *values_num = cache->history_num; + *items_num = cache->history_items.num_data; + + UNLOCK_CACHE; +} + +/****************************************************************************** + * * + * Purpose: get shared memory allocator statistics * + * * + ******************************************************************************/ +void zbx_hc_get_mem_stats(zbx_shmem_stats_t *data, zbx_shmem_stats_t *index) +{ + LOCK_CACHE; + + if (NULL != data) + zbx_shmem_get_stats(hc_mem, data); + + if (NULL != index) + zbx_shmem_get_stats(hc_index_mem, index); + + UNLOCK_CACHE; +} + +/****************************************************************************** + * * + * Purpose: get statistics of cached items * + * * + ******************************************************************************/ +void zbx_hc_get_items(zbx_vector_uint64_pair_t *items) +{ + zbx_hashset_iter_t iter; + zbx_hc_item_t *item; + + LOCK_CACHE; + + zbx_vector_uint64_pair_reserve(items, cache->history_items.num_data); + + zbx_hashset_iter_reset(&cache->history_items, &iter); + while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter))) + { + zbx_uint64_pair_t pair = {item->itemid, item->values_num}; + zbx_vector_uint64_pair_append_ptr(items, &pair); + } + + UNLOCK_CACHE; +} + +/****************************************************************************** + * * + * Purpose: checks if database trigger queue table is locked * + * * + ******************************************************************************/ +int zbx_db_trigger_queue_locked(void) +{ + return 0 == cache->db_trigger_queue_lock ? FAIL : SUCCEED; +} + +/****************************************************************************** + * * + * Purpose: unlocks database trigger queue table * + * * + ******************************************************************************/ +void zbx_db_trigger_queue_unlock(void) +{ + cache->db_trigger_queue_lock = 0; +} + +/****************************************************************************** + * * + * Purpose: return first proxy in a queue, function assumes that a queue is * + * not empty * + * * + * Return value: proxyid at the top a queue * + * * + ******************************************************************************/ +static zbx_uint64_t zbx_hc_proxyqueue_peek(void) +{ + zbx_uint64_t *p_val; + + if (NULL == cache->proxyqueue.list.head) + return 0; + + p_val = (zbx_uint64_t *)(cache->proxyqueue.list.head->data); + + return *p_val; +} + +/****************************************************************************** + * * + * Purpose: add new proxyid to a queue * + * * + * Parameters: proxyid - [IN] the proxy id * + * * + ******************************************************************************/ +static void zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid) +{ + if (NULL == zbx_hashset_search(&cache->proxyqueue.index, &proxyid)) + { + zbx_uint64_t *ptr; + + ptr = zbx_hashset_insert(&cache->proxyqueue.index, &proxyid, sizeof(proxyid)); + zbx_list_append(&cache->proxyqueue.list, ptr, NULL); + } +} + +/****************************************************************************** + * * + * Purpose: try to dequeue proxyid from a proxy queue * + * * + * Parameters: chk_proxyid - [IN] the proxyid * + * * + * Return value: SUCCEED - retrieval successful * + * FAIL - otherwise * + * * + ******************************************************************************/ +static int zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid) +{ + zbx_uint64_t top_val; + void *rem_val = 0; + + top_val = zbx_hc_proxyqueue_peek(); + + if (proxyid != top_val) + return FAIL; + + if (FAIL == zbx_list_pop(&cache->proxyqueue.list, &rem_val)) + return FAIL; + + zbx_hashset_remove_direct(&cache->proxyqueue.index, rem_val); + + return SUCCEED; +} + +/****************************************************************************** + * * + * Purpose: remove all proxies from proxy priority queue * + * * + ******************************************************************************/ +static void zbx_hc_proxyqueue_clear(void) +{ + zbx_list_destroy(&cache->proxyqueue.list); + zbx_hashset_clear(&cache->proxyqueue.index); +} + +/****************************************************************************** + * * + * Purpose: check status of a history cache usage, enqueue/dequeue proxy * + * from priority list and accordingly enable or disable wait mode * + * * + * Parameters: proxyid - [IN] the proxyid * + * * + * Return value: SUCCEED - proxy can be processed now * + * FAIL - proxy cannot be processed now, it got enqueued * + * * + ******************************************************************************/ +int zbx_hc_check_proxy(zbx_uint64_t proxyid) +{ + double hc_pused; + int ret; + + zabbix_log(LOG_LEVEL_DEBUG, "In %s() proxyid:"ZBX_FS_UI64, __func__, proxyid); + + LOCK_CACHE; + + hc_pused = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size; + + if (20 >= hc_pused) + { + cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL; + + zbx_hc_proxyqueue_clear(); + + ret = SUCCEED; + goto out; + } + + if (ZBX_HC_PROXYQUEUE_STATE_WAIT == cache->proxyqueue.state) + { + zbx_hc_proxyqueue_enqueue(proxyid); + + if (60 < hc_pused) + { + ret = FAIL; + goto out; + } + + cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL; + } + else + { + if (80 <= hc_pused) + { + cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_WAIT; + zbx_hc_proxyqueue_enqueue(proxyid); + + ret = FAIL; + goto out; + } + } + + if (0 == zbx_hc_proxyqueue_peek()) + { + ret = SUCCEED; + goto out; + } + + ret = zbx_hc_proxyqueue_dequeue(proxyid); + +out: + UNLOCK_CACHE; + + zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret)); + + return ret; +} |