diff options
author | sasha <sasha@97f52cf1-0a1b-0410-bd0e-c28be96e8082> | 2008-07-29 09:43:53 +0000 |
---|---|---|
committer | sasha <sasha@97f52cf1-0a1b-0410-bd0e-c28be96e8082> | 2008-07-29 09:43:53 +0000 |
commit | f7cd0be8cdcedcc34656ced6982dd7abadca01fc (patch) | |
tree | a6aa538b83fd84824e8a8431bd9c2ecc694dafc3 /src | |
parent | e3248ce4bc1b34d8623359d5333cfa9712b5eb73 (diff) | |
download | zabbix-f7cd0be8cdcedcc34656ced6982dd7abadca01fc.tar.gz zabbix-f7cd0be8cdcedcc34656ced6982dd7abadca01fc.tar.xz zabbix-f7cd0be8cdcedcc34656ced6982dd7abadca01fc.zip |
- [DEV-196] improved performance of server module
git-svn-id: svn://svn.zabbix.com/trunk@5848 97f52cf1-0a1b-0410-bd0e-c28be96e8082
Diffstat (limited to 'src')
-rw-r--r-- | src/libs/zbxdb/db.c | 34 | ||||
-rw-r--r-- | src/libs/zbxdbcache/Makefile.am | 3 | ||||
-rw-r--r-- | src/libs/zbxdbcache/dbcache.c | 2028 | ||||
-rw-r--r-- | src/libs/zbxdbcache/nextchecks.c | 213 | ||||
-rw-r--r-- | src/libs/zbxdbhigh/db.c | 649 | ||||
-rw-r--r-- | src/libs/zbxserver/expression.c | 10 | ||||
-rw-r--r-- | src/libs/zbxserver/expression.h | 1 | ||||
-rw-r--r-- | src/libs/zbxserver/functions.c | 28 | ||||
-rw-r--r-- | src/zabbix_proxy/datasender/datasender.c | 85 | ||||
-rw-r--r-- | src/zabbix_proxy/proxy.c | 7 | ||||
-rw-r--r-- | src/zabbix_proxy/zlog.c | 25 | ||||
-rw-r--r-- | src/zabbix_server/dbsyncer/dbsyncer.c | 44 | ||||
-rw-r--r-- | src/zabbix_server/httppoller/httptest.c | 25 | ||||
-rw-r--r-- | src/zabbix_server/pinger/pinger.c | 33 | ||||
-rw-r--r-- | src/zabbix_server/poller/checks_agent.c | 5 | ||||
-rw-r--r-- | src/zabbix_server/poller/poller.c | 303 | ||||
-rw-r--r-- | src/zabbix_server/server.c | 29 | ||||
-rw-r--r-- | src/zabbix_server/trapper/trapper.c | 342 | ||||
-rw-r--r-- | src/zabbix_server/trapper/trapper.h | 14 | ||||
-rw-r--r-- | src/zabbix_server/zlog.c | 25 |
20 files changed, 2847 insertions, 1056 deletions
diff --git a/src/libs/zbxdb/db.c b/src/libs/zbxdb/db.c index 1de8ac7d..61dad9fd 100644 --- a/src/libs/zbxdb/db.c +++ b/src/libs/zbxdb/db.c @@ -84,7 +84,7 @@ int zbx_db_connect(char *host, char *user, char *password, char *dbname, char *d conn = mysql_init(NULL); - if( ! mysql_real_connect( conn, host, user, password, dbname, port, dbsocket,0 ) ) + if( ! mysql_real_connect( conn, host, user, password, dbname, port, dbsocket, CLIENT_MULTI_STATEMENTS) ) { zabbix_log(LOG_LEVEL_ERR, "Failed to connect to database: Error: %s [%d]", mysql_error(conn), mysql_errno(conn)); @@ -191,7 +191,7 @@ int zbx_db_connect(char *host, char *user, char *password, char *dbname, char *d } if (ZBX_DB_OK == ret) - sqlo_autocommit_on(oracle); + sqlo_autocommit_off(oracle); return ret; #endif @@ -342,6 +342,9 @@ void zbx_db_commit(void) #ifdef HAVE_POSTGRESQL zbx_db_execute("%s","commit;"); #endif +#ifdef HAVE_ORACLE + zbx_db_execute("%s","commit"); +#endif #ifdef HAVE_SQLITE3 if(sqlite_transaction_started > 1) @@ -384,6 +387,9 @@ void zbx_db_rollback(void) #ifdef HAVE_POSTGRESQL zbx_db_execute("rollback;"); #endif +#ifdef HAVE_ORACLE + zbx_db_execute("rollback"); +#endif #ifdef HAVE_SQLITE3 if(sqlite_transaction_started > 1) @@ -420,6 +426,9 @@ int zbx_db_vexecute(const char *fmt, va_list args) #ifdef HAVE_SQLITE3 char *error=0; #endif +#ifdef HAVE_MYSQL + int status; +#endif /* sec = zbx_time();*/ @@ -434,7 +443,7 @@ int zbx_db_vexecute(const char *fmt, va_list args) } else { - if(mysql_query(conn,sql) != 0) + if ((status = mysql_query(conn,sql)) != 0) { zabbix_log(LOG_LEVEL_ERR, "Query failed: [%s] %s [%d]", sql, mysql_error(conn), mysql_errno(conn) ); @@ -453,7 +462,24 @@ int zbx_db_vexecute(const char *fmt, va_list args) } else { - ret = (int)mysql_affected_rows(conn); + do { + if (mysql_field_count(conn) == 0) + { +/* zabbix_log(LOG_LEVEL_DEBUG, ZBX_FS_UI64 " rows affected", + (zbx_uint64_t)mysql_affected_rows(conn));*/ + ret += (int)mysql_affected_rows(conn); + } + else /* some error occurred */ + { + zabbix_log(LOG_LEVEL_DEBUG, "Could not retrieve result set"); + break; + } + + /* more results? -1 = no, >0 = error, 0 = yes (keep looping) */ + if ((status = mysql_next_result(conn)) > 0) + zabbix_log(LOG_LEVEL_ERR, "Error: %s [%d]", + mysql_error(conn), mysql_errno(conn)); + } while (status == 0); } } #endif diff --git a/src/libs/zbxdbcache/Makefile.am b/src/libs/zbxdbcache/Makefile.am index 4b1fb42f..b1955b03 100644 --- a/src/libs/zbxdbcache/Makefile.am +++ b/src/libs/zbxdbcache/Makefile.am @@ -3,4 +3,5 @@ noinst_LIBRARIES = libzbxdbcache.a libzbxdbcache_a_SOURCES = \ - dbcache.c + dbcache.c \ + nextchecks.c diff --git a/src/libs/zbxdbcache/dbcache.c b/src/libs/zbxdbcache/dbcache.c index dee47851..82866151 100644 --- a/src/libs/zbxdbcache/dbcache.c +++ b/src/libs/zbxdbcache/dbcache.c @@ -20,345 +20,1860 @@ #include "common.h" #include "log.h" #include "zlog.h" +#include "threads.h" #include "db.h" #include "dbcache.h" #include "mutexs.h" +#include "zbxserver.h" #define LOCK_CACHE zbx_mutex_lock(&cache_lock) #define UNLOCK_CACHE zbx_mutex_unlock(&cache_lock) -#define ZBX_GET_SHM_DBCACHE_KEY(smk_key) \ - {if( -1 == (shm_key = ftok(CONFIG_FILE, (int)'c') )) \ - { \ - zbx_error("Can not create IPC key for path '%s', try to create for path '.' [%s]", CONFIG_FILE, strerror(errno)); \ - if( -1 == (shm_key = ftok(".", (int)'c') )) \ - { \ - zbx_error("Can not create IPC key for path '.' [%s]", strerror(errno)); \ - exit(1); \ - } \ +#define ZBX_GET_SHM_DBCACHE_KEY(smk_key) \ + {if( -1 == (shm_key = ftok(CONFIG_FILE, (int)'c') )) \ + { \ + zbx_error("Can not create IPC key for path '%s', try to create for path '.' [%s]", \ + CONFIG_FILE, strerror(errno)); \ + if( -1 == (shm_key = ftok(".", (int)'c') )) \ + { \ + zbx_error("Can not create IPC key for path '.' [%s]", strerror(errno)); \ + exit(1); \ + } \ }} ZBX_DC_CACHE *cache = NULL; static ZBX_MUTEX cache_lock; +static char *sql = NULL; +static int sql_allocated = 65536; + +zbx_process_t zbx_process; + /****************************************************************************** * * - * Function: DCsync * + * Function: DCmass_update_triggers * * * - * Purpose: writes updates and new data from pool to database * + * Purpose: re-calculate and updates values of triggers related to the items * * * - * Parameters: * + * Parameters: history - array of history data * + * history_num - number of history structures * * * * Return value: * * * - * Author: Alexei Vladishev * + * Author: Alexei Vladishev, Aleksander Vladishev * * * * Comments: * * * ******************************************************************************/ -void DCsync() +void DCmass_update_triggers(ZBX_DC_HISTORY *history, int history_num) { - int i; + char *exp; + char error[MAX_STRING_LEN]; + int exp_value; + DB_TRIGGER trigger; + DB_RESULT result; + DB_ROW row; + int sql_offset = 0, i; + ZBX_DC_HISTORY *h; + zbx_uint64_t itemid; - ZBX_DC_HISTORY *history; - ZBX_DC_TREND *trend; - char value_esc[MAX_STRING_LEN]; + zabbix_log(LOG_LEVEL_DEBUG, "In DCmass_update_triggers()"); - zabbix_log(LOG_LEVEL_DEBUG,"In DCsync(items %d pool:trends %d pool:history:%d)", - cache->items_count, - cache->pool.trends_count, - cache->pool.history_count); + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 1024, + "select distinct t.triggerid,t.expression,t.description,t.url,t.comments,t.status,t.value,t.priority" + ",t.type,f.itemid from triggers t,functions f,items i where i.status not in (%d) and i.itemid=f.itemid" + " and t.status=%d and f.triggerid=t.triggerid and f.itemid in (", + ITEM_STATUS_NOTSUPPORTED, + TRIGGER_STATUS_ENABLED); - LOCK_CACHE; - DBbegin(); - + for (i = 0; i < history_num; i++) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 32, ZBX_FS_UI64 ",", + history[i].itemid); + + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 32, ")"); + } + + result = DBselect("%s", sql); - for(i=0;i<cache->pool.history_count;i++) + sql_offset = 0; + + while (NULL != (row = DBfetch(result))) { - history = &cache->pool.history[i]; - zabbix_log(LOG_LEVEL_DEBUG,"History " ZBX_FS_UI64, - history->itemid); - switch(history->value_type) + trigger.triggerid = zbx_atoui64(row[0]); + strscpy(trigger.expression, row[1]); + strscpy(trigger.description, row[2]); + trigger.url = row[3]; + trigger.comments = row[4]; + trigger.status = atoi(row[5]); + trigger.value = atoi(row[6]); + trigger.priority = atoi(row[7]); + trigger.type = atoi(row[8]); + itemid = zbx_atoui64(row[9]); + + h = NULL; + + for (i = 0; i < history_num; i++) { - case ITEM_VALUE_TYPE_UINT64: - DBexecute("insert into history_uint (clock,itemid,value) values (%d," ZBX_FS_UI64 "," ZBX_FS_UI64 ");\n", - history->clock, - history->itemid, - history->value.value_uint64); + if (itemid == history[i].itemid) + { + h = &history[i]; break; - case ITEM_VALUE_TYPE_FLOAT: - DBexecute("insert into history (clock,itemid,value) values (%d," ZBX_FS_UI64 "," ZBX_FS_DBL ");\n", - history->clock, - history->itemid, - history->value.value_float); + } + } + + if (NULL == h) + continue; + + exp = strdup(trigger.expression); + + if (evaluate_expression(&exp_value, &exp, &trigger, error, sizeof(error)) != 0) + { + zabbix_log(LOG_LEVEL_WARNING, "Expression [%s] cannot be evaluated [%s]", + trigger.expression, + error); + zabbix_syslog("Expression [%s] cannot be evaluated [%s]", + trigger.expression, + error); +/* We shouldn't update triggervalue if expressions failed */ +/* DBupdate_trigger_value(&trigger, exp_value, time(NULL), error);*/ + } + else + DBupdate_trigger_value(&trigger, exp_value, h->clock, NULL); + + zbx_free(exp); + } + DBfree_result(result); +} + +/****************************************************************************** + * * + * Function: DCmass_update_item * + * * + * Purpose: update items info after new values is received * + * * + * Parameters: history - array of history data * + * history_num - number of history structures * + * * + * Author: Alexei Vladishev, Eugene Grigorjev, Aleksander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static void DCmass_update_item(ZBX_DC_HISTORY *history, int history_num) +{ + DB_RESULT result; + DB_ROW row; + DB_ITEM item; + char value_esc[ITEM_LASTVALUE_LEN_MAX]; + int sql_offset = 0, i; + ZBX_DC_HISTORY *h; + double value_float; + zbx_uint64_t value_uint64; + + zabbix_log( LOG_LEVEL_DEBUG, "In DCmass_update_item()"); + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 1024, + "select %s where h.hostid = i.hostid and i.itemid in (", + ZBX_SQL_ITEM_SELECT, + TRIGGER_STATUS_ENABLED); + + for (i = 0; i < history_num; i++) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 32, ZBX_FS_UI64 ",", + history[i].itemid); + + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 32, ")"); + } + + result = DBselect("%s", sql); + + sql_offset = 0; + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "begin\n"); +#endif + + while (NULL != (row = DBfetch(result))) + { + DBget_item_from_db(&item, row); + + h = NULL; + + for (i = 0; i < history_num; i++) + { + if (item.itemid == history[i].itemid) + { + h = &history[i]; break; - case ITEM_VALUE_TYPE_STR: - DBescape_string(history->value.value_str,value_esc,MAX_STRING_LEN); - DBexecute("insert into history_str (clock,itemid,value) values (%d," ZBX_FS_UI64 ",'%s');\n", - history->clock, - history->itemid, + } + } + + if (NULL == h) + continue; + +/* if (item.type == ITEM_TYPE_ZABBIX_ACTIVE || item.type == ITEM_TYPE_TRAPPER || item.type == ITEM_TYPE_HTTPTEST)*/ + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "update items set lastclock=%d", + h->clock); +/* else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "update items set nextcheck=%d,lastclock=%d", + calculate_item_nextcheck(item.itemid, item.type, item.delay, item.delay_flex, h->clock), + h->clock);*/ + + if (item.delta == ITEM_STORE_AS_IS) + { + if (h->value_type == ITEM_VALUE_TYPE_FLOAT) + zbx_snprintf(value_esc, sizeof(value_esc), ZBX_FS_DBL, h->value.value_float); + else if (h->value_type == ITEM_VALUE_TYPE_UINT64) + zbx_snprintf(value_esc, sizeof(value_esc), ZBX_FS_UI64, h->value.value_uint64); + else if (h->value_type == ITEM_VALUE_TYPE_STR + || h->value_type == ITEM_VALUE_TYPE_TEXT + || h->value_type == ITEM_VALUE_TYPE_LOG) + DBescape_string(h->value.value_str, value_esc, sizeof(value_esc)); + else + *value_esc = '\0'; + + if (h->value_type == ITEM_VALUE_TYPE_LOG) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "prevvalue=lastvalue,lastvalue='%s',lastlogsize=%d", + value_esc, + h->lastlogsize); + else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, ",prevvalue=lastvalue,lastvalue='%s'", value_esc); - break; - default: - zabbix_log(LOG_LEVEL_CRIT,"Unsupported history value type %d. Database cache corrupted?", - history->value_type); - exit(-1); - break; } + else if (item.delta == ITEM_STORE_SPEED_PER_SECOND) /* Logic for delta as speed of change */ + { + if (h->value_type == ITEM_VALUE_TYPE_FLOAT) + { + if (item.prevorgvalue_null == 0 && item.prevorgvalue_dbl <= h->value.value_float) + { + /* In order to continue normal processing, we assume difference 1 second + Otherwise function update_functions and update_triggers won't work correctly*/ + if (h->clock != item.lastclock) + value_float = (h->value.value_float - item.prevorgvalue_dbl) + / (h->clock - item.lastclock); + else + value_float = h->value.value_float - item.prevorgvalue_dbl; + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + ",prevvalue=lastvalue,prevorgvalue='" ZBX_FS_DBL "'" + ",lastvalue='" ZBX_FS_DBL "'", + h->value.value_float, + value_float); + } + else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + ",prevorgvalue='" ZBX_FS_DBL "'", + h->value.value_float); + } + else if (h->value_type == ITEM_VALUE_TYPE_UINT64) + { + if (item.prevorgvalue_null == 0 && item.prevorgvalue_uint64 <= h->value.value_uint64) + { + if (h->clock != item.lastclock) + value_uint64 = (h->value.value_uint64 - item.prevorgvalue_uint64) + / (h->clock - item.lastclock); + else + value_uint64 = h->value.value_uint64 - item.prevorgvalue_uint64; + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + ",prevvalue=lastvalue,prevorgvalue='" ZBX_FS_UI64 "'" + ",lastvalue='" ZBX_FS_UI64 "'", + h->value.value_uint64, + value_uint64); + } + else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + ",prevorgvalue='" ZBX_FS_DBL "'", + h->value.value_uint64); + } + } + else if (item.delta == ITEM_STORE_SIMPLE_CHANGE) /* Real delta: simple difference between values */ + { + if (h->value_type == ITEM_VALUE_TYPE_FLOAT) + { + if (item.prevorgvalue_null == 0 && item.prevorgvalue_dbl <= h->value.value_float) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + ",prevvalue=lastvalue,prevorgvalue='" ZBX_FS_DBL "'" + ",lastvalue='" ZBX_FS_DBL "'", + h->value.value_float, + h->value.value_float - item.prevorgvalue_dbl); + else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + ",prevorgvalue='" ZBX_FS_DBL "'", + h->value.value_float); + } + else if(h->value_type == ITEM_VALUE_TYPE_UINT64) + { + if (item.prevorgvalue_null == 0 && item.prevorgvalue_uint64 <= h->value.value_uint64) + { + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + ",prevvalue=lastvalue,prevorgvalue='" ZBX_FS_UI64 "'" + ",lastvalue='" ZBX_FS_UI64 "'", + h->value.value_uint64, + h->value.value_uint64 - item.prevorgvalue_uint64); + } + else + { + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + ",prevorgvalue='" ZBX_FS_UI64 "'", + h->value.value_uint64); + } + } + } + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, " where itemid=" ZBX_FS_UI64 ";\n", + item.itemid); + +/* Update item status if required */ + if (item.status == ITEM_STATUS_NOTSUPPORTED) + { + zabbix_log(LOG_LEVEL_WARNING, "Parameter [%s] became supported by agent on host [%s]", + item.key, + item.host_name); + zabbix_syslog("Parameter [%s] became supported by agent on host [%s]", + item.key, + item.host_name); + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, + "update items set status=%d,error='' where itemid=" ZBX_FS_UI64 ";\n", + ITEM_STATUS_ACTIVE, + item.itemid); + } + } + DBfree_result(result); + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "end;\n"); +#endif + + if (sql_offset > 16) /* In ORACLE always present begin..end; */ + DBexecute("%s", sql); +} + +/****************************************************************************** + * * + * Function: DCmass_proxy_update_item * + * * + * Purpose: update items info after new values is received * + * * + * Parameters: history - array of history data * + * history_num - number of history structures * + * * + * Author: Alexei Vladishev, Eugene Grigorjev, Aleksander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static void DCmass_proxy_update_item(ZBX_DC_HISTORY *history, int history_num) +{ + int sql_offset = 0, i; + + zabbix_log( LOG_LEVEL_DEBUG, "In DCmass_proxy_update_item()"); + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "begin\n"); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_LOG) + { + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, + "update items set lastlogsize=%d where itemid=" ZBX_FS_UI64 ";\n", + history[i].lastlogsize, + history[i].itemid); + } + } + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "end;\n"); +#endif + + if (sql_offset > 16) /* In ORACLE always present begin..end; */ + DBexecute("%s", sql); +} + +/****************************************************************************** + * * + * Function: DCmass_function_update * + * * + * Purpose: update functions lastvalue after new values is received * + * * + * Parameters: history - array of history data * + * history_num - number of history structures * + * * + * Author: Alexei Vladishev, Aleksander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static void DCmass_function_update(ZBX_DC_HISTORY *history, int history_num) +{ + DB_RESULT result; + DB_ROW row; + DB_FUNCTION function; + DB_ITEM item; + char *lastvalue; + char value[MAX_STRING_LEN], value_esc[MAX_STRING_LEN]; + int sql_offset = 0, i; + + zabbix_log(LOG_LEVEL_DEBUG, "In DCmass_function_update()"); + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 1024, + "select distinct %s,f.function,f.parameter,f.itemid,f.lastvalue from %s,functions f,triggers t" + " where f.itemid=i.itemid and h.hostid=i.hostid and f.triggerid=t.triggerid and t.status in (%d)" + " and f.itemid in (", + ZBX_SQL_ITEM_FIELDS, + ZBX_SQL_ITEM_TABLES, + TRIGGER_STATUS_ENABLED); + + for (i = 0; i < history_num; i++) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 32, ZBX_FS_UI64 ",", + history[i].itemid); + + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 32, ")"); } - for(i=0;i<cache->pool.trends_count;i++) + result = DBselect("%s", sql); + + sql_offset = 0; + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "begin\n"); +#endif + + while (NULL != (row = DBfetch(result))) { - trend = &cache->pool.trends[i]; - zabbix_log(LOG_LEVEL_DEBUG,"Trend " ZBX_FS_UI64, - trend->itemid); - if(trend->operation == ZBX_TREND_OP_INSERT) + DBget_item_from_db(&item, row); + + function.function = row[ZBX_SQL_ITEM_FIELDS_NUM]; + function.parameter = row[ZBX_SQL_ITEM_FIELDS_NUM + 1]; + function.itemid = zbx_atoui64(row[ZBX_SQL_ITEM_FIELDS_NUM + 2]); +/* It is not required to check lastvalue for NULL here */ + lastvalue = row[ZBX_SQL_ITEM_FIELDS_NUM + 3]; + + if (FAIL == evaluate_function(value, &item, function.function, function.parameter)) { - DBexecute("insert into trends (clock,itemid,num,value_min,value_avg,value_max) values (%d," ZBX_FS_UI64 ",%d," ZBX_FS_DBL "," ZBX_FS_DBL "," ZBX_FS_DBL ")", - trend->clock, - trend->itemid, - trend->num, - trend->value_min, - trend->value_avg, - trend->value_max); + zabbix_log(LOG_LEVEL_DEBUG, "Evaluation failed for function:%s", + function.function); + continue; } - else if(trend->operation == ZBX_TREND_OP_UPDATE) + + /* Update only if lastvalue differs from new one */ + if (DBis_null(lastvalue) == SUCCEED || strcmp(lastvalue, value) != 0) { - DBexecute("update trends set num=%d, value_min=" ZBX_FS_DBL ", value_avg=" ZBX_FS_DBL ", value_max=" ZBX_FS_DBL " where itemid=" ZBX_FS_UI64 " and clock=%d", - trend->num, - trend->value_min, - trend->value_avg, - trend->value_max, - trend->itemid, - trend->clock); + DBescape_string(value, value_esc, MAX_STRING_LEN); + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 1024, + "update functions set lastvalue='%s' where itemid=" ZBX_FS_UI64 + " and function='%s' and parameter='%s';\n", + value_esc, + function.itemid, + function.function, + function.parameter); } } + DBfree_result(result); - DBcommit(); +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "end;\n"); +#endif - cache->pool.history_count=0; - cache->pool.trends_count=0; - UNLOCK_CACHE; + if (sql_offset > 16) /* In ORACLE always present begin..end; */ + DBexecute("%s", sql); +} - zabbix_log(LOG_LEVEL_DEBUG,"End of DCsync()"); +/****************************************************************************** + * * + * Function: DCmass_add_history * + * * + * Purpose: inserting new history data after new values is received * + * * + * Parameters: history - array of history data * + * history_num - number of history structures * + * * + * Author: Aleksander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static void DCmass_add_history(ZBX_DC_HISTORY *history, int history_num) +{ + int sql_offset = 0, i; + char value_esc[MAX_STRING_LEN], *value_esc_dyn; + int history_text_num, history_log_num; + zbx_uint64_t id; +#if defined(HAVE_MYSQL) + int tmp_offset; +#endif + + zabbix_log(LOG_LEVEL_DEBUG, "In DCmass_add_history()"); + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "begin\n"); +#endif + +/* + * history + */ +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into history values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_FLOAT) + { +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(" ZBX_FS_UI64 ",%d," ZBX_FS_DBL "),", + history[i].itemid, + history[i].clock, + history[i].value.value_float); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "insert into history values " + "(" ZBX_FS_UI64 ",%d," ZBX_FS_DBL ");\n", + history[i].itemid, + history[i].clock, + history[i].value.value_float); +#endif + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + + if (CONFIG_NODE_NOHISTORY == 0 && CONFIG_MASTER_NODEID > 0) + { +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into history_sync (nodeid,itemid,clock,value) values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_FLOAT) + { +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(%d," ZBX_FS_UI64 ",%d," ZBX_FS_DBL "),", + get_nodeid_by_id(history[i].itemid), + history[i].itemid, + history[i].clock, + history[i].value.value_float); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "insert into history_sync (nodeid,itemid,clock,value) values " + "(%d," ZBX_FS_UI64 ",%d," ZBX_FS_DBL ");\n", + get_nodeid_by_id(history[i].itemid), + history[i].itemid, + history[i].clock, + history[i].value.value_float); +#endif + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + } + +/* + * history_uint + */ +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into history_uint values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_UINT64) + { +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(" ZBX_FS_UI64 ",%d," ZBX_FS_UI64 "),", + history[i].itemid, + history[i].clock, + history[i].value.value_uint64); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "insert into history_uint values " + "(" ZBX_FS_UI64 ",%d," ZBX_FS_UI64 ");\n", + history[i].itemid, + history[i].clock, + history[i].value.value_uint64); +#endif + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + + if (CONFIG_NODE_NOHISTORY == 0 && CONFIG_MASTER_NODEID > 0) + { +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into history_uint_sync (nodeid,itemid,clock,value) values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_UINT64) + { +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(%d," ZBX_FS_UI64 ",%d," ZBX_FS_UI64 "),", + get_nodeid_by_id(history[i].itemid), + history[i].itemid, + history[i].clock, + history[i].value.value_uint64); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "insert into history_uint_sync (nodeid,itemid,clock,value) values " + "(%d," ZBX_FS_UI64 ",%d," ZBX_FS_UI64 ");\n", + get_nodeid_by_id(history[i].itemid), + history[i].itemid, + history[i].clock, + history[i].value.value_uint64); +#endif + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + } + +/* + * history_str + */ +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into history_str values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_STR) + { + DBescape_string(history[i].value.value_str, value_esc, sizeof(value_esc)); +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(" ZBX_FS_UI64 ",%d,'%s'),", + history[i].itemid, + history[i].clock, + value_esc); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "insert into history_str values " + "(" ZBX_FS_UI64 ",%d,'%s');\n", + history[i].itemid, + history[i].clock, + value_esc); +#endif + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + + if (CONFIG_NODE_NOHISTORY == 0 && CONFIG_MASTER_NODEID > 0) + { +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into history_str_sync (nodeid,itemid,clock,value) values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_STR) + { + DBescape_string(history[i].value.value_str, value_esc, sizeof(value_esc)); +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(%d," ZBX_FS_UI64 ",%d,'%s'),", + get_nodeid_by_id(history[i].itemid), + history[i].itemid, + history[i].clock, + value_esc); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "insert into history_str_sync (nodeid,itemid,clock,value) values " + "(%d," ZBX_FS_UI64 ",%d,'%s');\n", + get_nodeid_by_id(history[i].itemid), + history[i].itemid, + history[i].clock, + value_esc); +#endif + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + } + + history_text_num = 0; + history_log_num = 0; + + for (i = 0; i < history_num; i++) + if (history[i].value_type == ITEM_VALUE_TYPE_TEXT) + history_text_num++; + else if (history[i].value_type == ITEM_VALUE_TYPE_LOG) + history_log_num++; + +/* + * history_text + */ + if (history_text_num > 0) + { + id = DBget_maxid_num("history_text", "id", history_text_num); + +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into history_text values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_TEXT) + { + value_esc_dyn = DBdyn_escape_string(history[i].value.value_str); +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + "(" ZBX_FS_UI64 "," ZBX_FS_UI64 ",%d,'%s'),", + id, + history[i].itemid, + history[i].clock, + value_esc_dyn); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + "insert into history_text values " + "(" ZBX_FS_UI64 "," ZBX_FS_UI64 ",%d,'%s');\n", + id, + history[i].itemid, + history[i].clock, + value_esc_dyn); +#endif + zbx_free(value_esc_dyn); + id++; + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + } + +/* + * history_log + */ + if (history_log_num > 0) + { + id = DBget_maxid_num("history_log", "id", history_log_num); + +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into history_log values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_LOG) + { + value_esc_dyn = DBdyn_escape_string(history[i].value.value_str); +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + "(" ZBX_FS_UI64 "," ZBX_FS_UI64 ",%d,%d,'%s',%d,'%s'),", + id, + history[i].itemid, + history[i].clock, + history[i].timestamp, + history[i].source, + history[i].severity, + value_esc_dyn); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + "insert into history_log values " + "(" ZBX_FS_UI64 "," ZBX_FS_UI64 ",%d,%d,'%s',%d,'%s');\n", + id, + history[i].itemid, + history[i].clock, + history[i].timestamp, + history[i].source, + history[i].severity, + value_esc_dyn); +#endif + zbx_free(value_esc_dyn); + id++; + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + } + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "end;\n"); +#endif + + if (sql_offset > 16) /* In ORACLE always present begin..end; */ + DBexecute("%s", sql); } -void DCshow() +/****************************************************************************** + * * + * Function: DCmass_proxy_add_history * + * * + * Purpose: inserting new history data after new values is received * + * * + * Parameters: history - array of history data * + * history_num - number of history structures * + * * + * Author: Aleksander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static void DCmass_proxy_add_history(ZBX_DC_HISTORY *history, int history_num) { - int i; + int sql_offset = 0, i; + char value_esc[MAX_STRING_LEN], *value_esc_dyn; +#if defined(HAVE_MYSQL) + int tmp_offset; +#endif - ZBX_DC_HISTORY *history; - ZBX_DC_TREND *trend; + zabbix_log(LOG_LEVEL_DEBUG, "In DCmass_proxy_add_history()"); - zabbix_log(LOG_LEVEL_WARNING,"In DCshow(items %d pool:trends %d pool:history:%d)", - cache->items_count, - cache->pool.trends_count, - cache->pool.history_count); +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "begin\n"); +#endif - LOCK_CACHE; - for(i=0;i<cache->pool.history_count;i++) +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into proxy_history (itemid,clock,value) values "); +#endif + + for (i = 0; i < history_num; i++) { - history = &cache->pool.history[i]; - zabbix_log(LOG_LEVEL_DEBUG,"History " ZBX_FS_UI64, - history->itemid); + if (history[i].value_type == ITEM_VALUE_TYPE_FLOAT) + { +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(" ZBX_FS_UI64 ",%d,'" ZBX_FS_DBL "'),", + history[i].itemid, + history[i].clock, + history[i].value.value_float); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "insert into proxy_history (itemid,clock,value) values " + "(" ZBX_FS_UI64 ",%d,'" ZBX_FS_DBL "');\n", + history[i].itemid, + history[i].clock, + history[i].value.value_float); +#endif + } } - for(i=0;i<cache->pool.trends_count;i++) +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') { - trend = &cache->pool.trends[i]; - zabbix_log(LOG_LEVEL_DEBUG,"History " ZBX_FS_UI64, - trend->itemid); + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); } - UNLOCK_CACHE; + else + sql_offset = tmp_offset; +#endif + +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into proxy_history (itemid,clock,value) values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_UINT64) + { +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(" ZBX_FS_UI64 ",%d,'" ZBX_FS_UI64 "'),", + history[i].itemid, + history[i].clock, + history[i].value.value_uint64); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "insert into proxy_history (itemid,clock,value) values " + "(" ZBX_FS_UI64 ",%d,'" ZBX_FS_UI64 "');\n", + history[i].itemid, + history[i].clock, + history[i].value.value_uint64); +#endif + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into proxy_history (itemid,clock,value) values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_STR) + { + DBescape_string(history[i].value.value_str, value_esc, sizeof(value_esc)); +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(" ZBX_FS_UI64 ",%d,'%s'),", + history[i].itemid, + history[i].clock, + value_esc); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "insert into proxy_history (itemid,clock,value) values " + "(" ZBX_FS_UI64 ",%d,'%s');\n", + history[i].itemid, + history[i].clock, + value_esc); +#endif + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into proxy_history (itemid,clock,value) values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_TEXT) + { + value_esc_dyn = DBdyn_escape_string(history[i].value.value_str); +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + "(" ZBX_FS_UI64 ",%d,'%s'),", + history[i].itemid, + history[i].clock, + value_esc_dyn); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + "insert into proxy_history (itemid,clock,value) values " + "(" ZBX_FS_UI64 ",%d,'%s');\n", + history[i].itemid, + history[i].clock, + value_esc_dyn); +#endif + zbx_free(value_esc_dyn); + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + +#if defined(HAVE_MYSQL) + tmp_offset = sql_offset; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + "insert into proxy_history (itemid,clock,timestamp,source,severity,value) values "); +#endif + + for (i = 0; i < history_num; i++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_LOG) + { + value_esc_dyn = DBdyn_escape_string(history[i].value.value_str); +#if defined(HAVE_MYSQL) + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + "(" ZBX_FS_UI64 "," ZBX_FS_UI64 ",%d,%d,'%s',%d,'%s'),", + history[i].itemid, + history[i].clock, + history[i].timestamp, + history[i].source, + history[i].severity, + value_esc_dyn); +#else + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + "insert into proxy_history (itemid,cloc,timestamp,source,severityk,value) values " + "(" ZBX_FS_UI64 "," ZBX_FS_UI64 ",%d,%d,'%s',%d,'%s');\n", + history[i].itemid, + history[i].clock, + history[i].timestamp, + history[i].source, + history[i].severity, + value_esc_dyn); +#endif + zbx_free(value_esc_dyn); + } + } + +#if defined(HAVE_MYSQL) + if (sql[sql_offset - 1] == ',') + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ";\n"); + } + else + sql_offset = tmp_offset; +#endif + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "end;\n"); +#endif - zabbix_log(LOG_LEVEL_DEBUG,"End of DCshow()"); + if (sql_offset > 16) /* In ORACLE always present begin..end; */ + DBexecute("%s", sql); } -static ZBX_DC_ITEM *get_item(zbx_uint64_t itemid) +static int DCitem_already_exists(ZBX_DC_HISTORY *history, int history_num, zbx_uint64_t itemid) { int i; - ZBX_DC_ITEM *item = NULL; - int found = 0; - for(i=0;i<cache->items_count;i++) + for (i = 0; i < history_num; i++) + if (itemid == history[i].itemid) + return SUCCEED; + + return FAIL; +} + +/****************************************************************************** + * * + * Function: DCsync * + * * + * Purpose: writes updates and new data from pool to database * + * * + * Parameters: * + * * + * Return value: number of synced values * + * * + * Author: Alexei Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +int DCsync_history(int sync_type) +{ + static ZBX_DC_HISTORY history[ZBX_SYNC_MAX]; + int i, j, history_num, n, f; + int syncs; + int total_num = 0; +/* double sec; + sec = zbx_time();*/ + + zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_history(history_first:%d history_num:%d)", + cache->history_first, + cache->history_num); + + if (0 == cache->history_num) + return 0; + + syncs = cache->history_num / ZBX_SYNC_MAX; + + do { - item = &cache->items[i]; - if(item->itemid == itemid) + LOCK_CACHE; + + history_num = 0; + n = cache->history_num; + f = cache->history_first; + + while (n > 0 && history_num < ZBX_SYNC_MAX) { - found = 1; + if (zbx_process == ZBX_PROCESS_PROXY || FAIL == DCitem_already_exists(history, history_num, cache->history[f].itemid)) + { + memcpy(&history[history_num], &cache->history[f], sizeof(ZBX_DC_HISTORY)); + if (history[history_num].value_type == ITEM_VALUE_TYPE_STR + || history[history_num].value_type == ITEM_VALUE_TYPE_TEXT + || history[history_num].value_type == ITEM_VALUE_TYPE_LOG) + { + history[history_num].value.value_str = strdup(cache->history[f].value.value_str); + + if (history[history_num].value_type == ITEM_VALUE_TYPE_LOG) + history[history_num].source = strdup(cache->history[f].source); + } + + for (j = f; j != cache->history_first; j = (j == 0 ? ZBX_HISTORY_SIZE : j) - 1) + { + i = (j == 0 ? ZBX_HISTORY_SIZE : j) - 1; + memcpy(&cache->history[j], &cache->history[i], sizeof(ZBX_DC_HISTORY)); + } + + cache->history_num--; + cache->history_first++; + cache->history_first = cache->history_first % ZBX_HISTORY_SIZE; + + history_num++; + } + + n--; + f++; + f = f % ZBX_HISTORY_SIZE; + } + + UNLOCK_CACHE; + + if (0 == history_num) break; + + if (NULL == sql) + sql = zbx_malloc(sql, sql_allocated); + + DBbegin(); + + if (zbx_process == ZBX_PROCESS_SERVER) + { + DCmass_add_history(history, history_num); + DCmass_update_item(history, history_num); + DCmass_function_update(history, history_num); + DCmass_update_triggers(history, history_num); } - } + else + { + DCmass_proxy_add_history(history, history_num); + DCmass_proxy_update_item(history, history_num); + } + + DBcommit(); - if(found == 0) + for (i = 0; i < history_num; i ++) + { + if (history[i].value_type == ITEM_VALUE_TYPE_STR + || history[i].value_type == ITEM_VALUE_TYPE_TEXT + || history[i].value_type == ITEM_VALUE_TYPE_LOG) + { + zbx_free(history[i].value.value_str); + + if (history[i].value_type == ITEM_VALUE_TYPE_LOG) + zbx_free(history[i].source); + } + } + total_num += history_num; + } while (--syncs > 0 || sync_type == ZBX_SYNC_FULL); + +/* zabbix_log(LOG_LEVEL_CRIT, "DCsync_history first:%6d; cache:%6d; synced:%4d; spent " ZBX_FS_DBL " seconds", + cache->history_first, + cache->history_num, + total_num, + zbx_time() - sec);*/ + return total_num; +} + +/****************************************************************************** + * * + * Function: DCvacuum_text * + * * + * Purpose: * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static void DCvacuum_text() +{ + char *first_text; + int i, index; + size_t offset; + + zabbix_log(LOG_LEVEL_DEBUG, "In DCvacuum_text()"); + + /* vacuumng text buffer */ + first_text = NULL; + for (i = 0; i < cache->history_num; i++) { - item=&cache->items[cache->items_count]; - item->itemid=itemid; - cache->items_count++; + index = (cache->history_first + i) % ZBX_HISTORY_SIZE; + if (cache->history[index].value_type == ITEM_VALUE_TYPE_STR + || cache->history[index].value_type == ITEM_VALUE_TYPE_TEXT + || cache->history[index].value_type == ITEM_VALUE_TYPE_LOG) + { + first_text = cache->history[index].value.value_str; + break; + } } + if (NULL != first_text) + { + offset = first_text - cache->text; + for (i = 0; i < cache->history_num; i++) + { + index = (cache->history_first + i) % ZBX_HISTORY_SIZE; + if (cache->history[index].value_type == ITEM_VALUE_TYPE_STR + || cache->history[index].value_type == ITEM_VALUE_TYPE_TEXT + || cache->history[index].value_type == ITEM_VALUE_TYPE_LOG) + { + cache->history[index].value.value_str -= offset; - return item; + if (cache->history[index].value_type == ITEM_VALUE_TYPE_LOG) + cache->history[index].source -= offset; + } + } + cache->last_text -= offset; + } else + cache->last_text = cache->text; } -int DCadd_trend(zbx_uint64_t itemid, double value, int clock) +/****************************************************************************** + * * + * Function: DCget_history_ptr * + * * + * Purpose: * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static ZBX_DC_HISTORY *DCget_history_ptr(zbx_uint64_t itemid, size_t text_len) { - int hour; - ZBX_DC_TREND *trend = NULL, - *trend_tmp = NULL; - ZBX_DC_ITEM *item = NULL; - DB_RESULT result; - DB_ROW row; - int trend_found=0; + ZBX_DC_HISTORY *history; + int index; + size_t free_len; - zabbix_log(LOG_LEVEL_DEBUG,"In DCadd_trend()"); +retry: + if (cache->history_num >= ZBX_HISTORY_SIZE) + { + UNLOCK_CACHE; - LOCK_CACHE; - hour=clock-clock%3600; + zabbix_log(LOG_LEVEL_DEBUG, "History buffer is full. Sleeping for 1 second."); + sleep(1); - item=get_item(itemid); + LOCK_CACHE; - trend=&item->trend; + goto retry; + } - if(hour == trend->clock) + if (text_len > sizeof(cache->text)) { - trend_found=1; + zabbix_log(LOG_LEVEL_ERR, "Insufficient shared memory"); + exit(-1); } - else if(trend->clock !=0) + + free_len = sizeof(cache->text) - (cache->last_text - cache->text); + + if (text_len > free_len) { -// add_trend2pool(trend); - trend_tmp=&cache->pool.trends[cache->pool.trends_count]; - cache->pool.trends_count++; + DCvacuum_text(); + + free_len = sizeof(cache->text) - (cache->last_text - cache->text); + + if (text_len > free_len) + { + UNLOCK_CACHE; - trend_tmp->operation = trend->operation; - trend_tmp->itemid = trend->itemid; - trend_tmp->clock = trend->clock; - trend_tmp->num = trend->num; - trend_tmp->value_min = trend->value_min; - trend_tmp->value_max = trend->value_max; - trend_tmp->value_avg = trend->value_avg; + zabbix_log(LOG_LEVEL_DEBUG, "History text buffer is full. Sleeping for 1 second."); + sleep(1); - trend->clock = 0; + LOCK_CACHE; + + goto retry; + } } - /* Not found with the same clock */ - if(0 == trend_found) + index = (cache->history_first + cache->history_num) % ZBX_HISTORY_SIZE; + history = &cache->history[index]; + + cache->history_num++; + + return history; +} + +/****************************************************************************** + * * + * Function: DCget_trend_nearestindex * + * * + * Purpose: find nearest index by itemid in array of ZBX_DC_TREND * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static int DCget_trend_nearestindex(zbx_uint64_t itemid) +{ + int first_index, last_index, index; + + if (cache->trends_num == 0) + return 0; + + first_index = 0; + last_index = cache->trends_num - 1; + while (1) { - zabbix_log(LOG_LEVEL_DEBUG,"Not found"); - /* Add new, do not look at the database */ - trend->operation = ZBX_TREND_OP_INSERT; - trend->itemid = itemid; - trend->clock = hour; - trend->num = 1; - trend->value_min = value; - trend->value_max = value; - trend->value_avg = value; + index = first_index + (last_index - first_index) / 2; - /* Try to find in the database */ - result = DBselect("select num,value_min,value_avg,value_max from trends where itemid=" ZBX_FS_UI64 " and clock=%d", - itemid, - hour); + if (cache->trends[index].itemid == itemid) + return index; + else if (last_index == first_index) + { + if (cache->trends[index].itemid < itemid) + index++; + return index; + } + else if (cache->trends[index].itemid < itemid) + first_index = index + 1; + else + last_index = index; + } +} - row=DBfetch(result); +/****************************************************************************** + * * + * Function: DCget_trend * + * * + * Purpose: find existing or add new structure and return pointer * + * * + * Parameters: * + * * + * Return value: pointer to a new structure or NULL if array is full * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static ZBX_DC_TREND *DCget_trend(zbx_uint64_t itemid) +{ + int index; + + index = DCget_trend_nearestindex(itemid); + if (index < cache->trends_num && cache->trends[index].itemid == itemid) + return &cache->trends[index]; + + if (cache->trends_num == ZBX_TREND_SIZE) + return NULL; + + memmove(&cache->trends[index + 1], &cache->trends[index], sizeof(ZBX_DC_TREND) * (cache->trends_num - index)); + memset(&cache->trends[index], 0, sizeof(ZBX_DC_TREND)); + cache->trends[index].itemid = itemid; + cache->trends_num++; - if(row) + return &cache->trends[index]; +} + +/****************************************************************************** + * * + * Function: DCflush_trend * + * * + * Purpose: flush trend to the database * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +static void DCflush_trend(ZBX_DC_TREND *trend) +{ + DB_RESULT result; + DB_ROW row; + int num; + trend_value_t value_min, value_avg, value_max; + + switch (trend->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + result = DBselect("select num,value_min,value_avg,value_max from trends" + " where itemid=" ZBX_FS_UI64 " and clock=%d", + trend->itemid, + trend->clock); + break; + case ITEM_VALUE_TYPE_UINT64: + result = DBselect("select num,value_min,value_avg,value_max from trends_uint" + " where itemid=" ZBX_FS_UI64 " and clock=%d", + trend->itemid, + trend->clock); + break; + default: + zabbix_log(LOG_LEVEL_CRIT, "Invalid value type for trends."); + exit(-1); + } + + if (NULL != (row = DBfetch(result))) + { + num = atoi(row[0]); + switch (trend->value_type) { - trend->operation = ZBX_TREND_OP_UPDATE; - trend->itemid = itemid; - trend->clock = hour; - trend->num = atoi(row[0]); - trend->value_min = atof(row[1]); - trend->value_avg = atof(row[2]); - trend->value_max = atof(row[3]); - if(value<trend->value_min) trend->value_min=value; - if(value>trend->value_max) trend->value_max=value; - trend->value_avg = (trend->num*trend->value_avg+value)/(trend->num+1); - trend->num++; + case ITEM_VALUE_TYPE_FLOAT: + value_min.value_float = atof(row[1]); + value_avg.value_float = atof(row[2]); + value_max.value_float = atof(row[3]); + + if (value_min.value_float < trend->value_min.value_float) + trend->value_min.value_float = value_min.value_float; + if (value_max.value_float > trend->value_max.value_float) + trend->value_max.value_float = value_max.value_float; + trend->value_avg.value_float = (trend->num * trend->value_avg.value_float + + num * value_avg.value_float) / (trend->num + num); + trend->num += num; + + DBexecute("update trends set num=%d,value_min=" ZBX_FS_DBL ",value_avg=" ZBX_FS_DBL + ",value_max=" ZBX_FS_DBL " where itemid=" ZBX_FS_UI64 " and clock=%d", + trend->num, + trend->value_min.value_float, + trend->value_avg.value_float, + trend->value_max.value_float, + trend->itemid, + trend->clock); + break; + case ITEM_VALUE_TYPE_UINT64: + value_min.value_uint64 = zbx_atoui64(row[1]); + value_avg.value_uint64 = zbx_atoui64(row[2]); + value_max.value_uint64 = zbx_atoui64(row[3]); + + if (value_min.value_uint64 < trend->value_min.value_uint64) + trend->value_min.value_uint64 = value_min.value_uint64; + if (value_max.value_uint64 > trend->value_max.value_uint64) + trend->value_max.value_uint64 = value_max.value_uint64; + trend->value_avg.value_uint64 = (trend->num * trend->value_avg.value_uint64 + + num * value_avg.value_uint64) / (trend->num + num); + trend->num += num; + + DBexecute("update trends_uint set num=%d,value_min=" ZBX_FS_UI64 ",value_avg=" ZBX_FS_UI64 + ",value_max=" ZBX_FS_UI64 " where itemid=" ZBX_FS_UI64 " and clock=%d", + trend->num, + trend->value_min.value_uint64, + trend->value_avg.value_uint64, + trend->value_max.value_uint64, + trend->itemid, + trend->clock); + break; } - DBfree_result(result); } else { - zabbix_log(LOG_LEVEL_DEBUG,"Found"); - if(value<trend->value_min) trend->value_min=value; - if(value>trend->value_max) trend->value_max=value; - trend->value_avg=(trend->num*trend->value_avg+value)/(trend->num+1); - trend->num++; + switch (trend->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + DBexecute("insert into trends (itemid,clock,num,value_min,value_avg,value_max)" + " values (" ZBX_FS_UI64 ",%d,%d," ZBX_FS_DBL "," ZBX_FS_DBL "," ZBX_FS_DBL ")", + trend->itemid, + trend->clock, + trend->num, + trend->value_min.value_float, + trend->value_avg.value_float, + trend->value_max.value_float); + break; + case ITEM_VALUE_TYPE_UINT64: + DBexecute("insert into trends_uint (itemid,clock,num,value_min,value_avg,value_max)" + " values (" ZBX_FS_UI64 ",%d,%d," ZBX_FS_UI64 "," ZBX_FS_UI64 "," ZBX_FS_UI64 ")", + trend->itemid, + trend->clock, + trend->num, + trend->value_min.value_uint64, + trend->value_avg.value_uint64, + trend->value_max.value_uint64); + break; + } } - UNLOCK_CACHE; + DBfree_result(result); - zabbix_log(LOG_LEVEL_DEBUG,"End of add_trend()"); + trend->clock = 0; + trend->num = 0; + memset(&trend->value_min, 0, sizeof(trend_value_t)); + memset(&trend->value_avg, 0, sizeof(trend_value_t)); + memset(&trend->value_max, 0, sizeof(trend_value_t)); +} - return SUCCEED; +/****************************************************************************** + * * + * Function: DCsync_trends * + * * + * Purpose: flush all trends to the database * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCsync_trends() +{ + int i; + + zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_trends(trends_num: %d)", + cache->trends_num); + + for (i = 0; i < cache->trends_num; i ++) + DCflush_trend(&cache->trends[i]); + + cache->trends_num = 0; + + zabbix_log(LOG_LEVEL_DEBUG, "End of DCsync_trends()"); } -int DCadd_history(zbx_uint64_t itemid, double value, int clock) +/****************************************************************************** + * * + * Function: DCadd_trend * + * * + * Purpose: add new value to the trends * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCadd_trend(ZBX_DC_HISTORY *history) { - ZBX_DC_HISTORY *history = NULL; + ZBX_DC_TREND *trend = NULL, trend_static; + int clock; + + clock = history->clock - history->clock % 3600; + + if (NULL != (trend = DCget_trend(history->itemid))) + { + if (trend->num > 0 && (trend->clock != clock || trend->value_type != history->value_type)) + DCflush_trend(trend); + + trend->value_type = history->value_type; + trend->clock = clock; + + switch (trend->value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + if (trend->num == 0 || history->value.value_float < trend->value_min.value_float) + trend->value_min.value_float = history->value.value_float; + if (trend->num == 0 || history->value.value_float > trend->value_max.value_float) + trend->value_max.value_float = history->value.value_float; + trend->value_avg.value_float = (trend->num * trend->value_avg.value_float + + history->value.value_float) / (trend->num + 1); + trend->num++; + break; + case ITEM_VALUE_TYPE_UINT64: + if (trend->num == 0 || history->value.value_uint64 < trend->value_min.value_uint64) + trend->value_min.value_uint64 = history->value.value_uint64; + if (trend->num == 0 || history->value.value_uint64 > trend->value_max.value_uint64) + trend->value_max.value_uint64 = history->value.value_uint64; + trend->value_avg.value_uint64 = (trend->num * trend->value_avg.value_uint64 + + history->value.value_uint64) / (trend->num + 1); + trend->num++; + break; + } + } + else + { + zabbix_log(LOG_LEVEL_WARNING, "Insufficient space for trends. Flushing to disk."); + + trend_static.itemid = history->itemid; + trend_static.clock = clock; + trend_static.value_type = history->value_type; + trend_static.num = 1; + switch (trend_static.value_type) + { + case ITEM_VALUE_TYPE_FLOAT: + trend_static.value_min.value_float = history->value.value_float; + trend_static.value_avg.value_float = history->value.value_float; + trend_static.value_max.value_float = history->value.value_float; + break; + case ITEM_VALUE_TYPE_UINT64: + trend_static.value_min.value_uint64 = history->value.value_uint64; + trend_static.value_avg.value_uint64 = history->value.value_uint64; + trend_static.value_max.value_uint64 = history->value.value_uint64; + break; + } + + DCflush_trend(trend); + } +} - zabbix_log(LOG_LEVEL_DEBUG,"In DCadd_history(itemid:" ZBX_FS_UI64 ")", - itemid); +/****************************************************************************** + * * + * Function: DCadd_history * + * * + * Purpose: * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCadd_history(zbx_uint64_t itemid, double value, int clock) +{ + ZBX_DC_HISTORY *history; LOCK_CACHE; - history=&cache->pool.history[cache->pool.history_count]; - cache->pool.history_count++; + history = DCget_history_ptr(itemid, 0); history->itemid = itemid; history->clock = clock; history->value_type = ITEM_VALUE_TYPE_FLOAT; history->value.value_float = value; - UNLOCK_CACHE; + DCadd_trend(history); - return SUCCEED; + UNLOCK_CACHE; } -int DCadd_history_uint(zbx_uint64_t itemid, zbx_uint64_t value, int clock) +/****************************************************************************** + * * + * Function: DCadd_history_uint * + * * + * Purpose: * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCadd_history_uint(zbx_uint64_t itemid, zbx_uint64_t value, int clock) { - ZBX_DC_HISTORY *history = NULL; - - zabbix_log(LOG_LEVEL_DEBUG,"In DCadd_history_uint(itemid:" ZBX_FS_UI64 ")", - itemid); + ZBX_DC_HISTORY *history; LOCK_CACHE; - history=&cache->pool.history[cache->pool.history_count]; - cache->pool.history_count++; + + history = DCget_history_ptr(itemid, 0); history->itemid = itemid; history->clock = clock; history->value_type = ITEM_VALUE_TYPE_UINT64; history->value.value_uint64 = value; - UNLOCK_CACHE; - - zabbix_log(LOG_LEVEL_DEBUG,"End of DCadd_history_uint()"); + DCadd_trend(history); - return SUCCEED; + UNLOCK_CACHE; } -int DCadd_history_str(zbx_uint64_t itemid, char *value, int clock) +/****************************************************************************** + * * + * Function: DCadd_history_str * + * * + * Purpose: * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCadd_history_str(zbx_uint64_t itemid, char *value, int clock) { - ZBX_DC_HISTORY *history = NULL; - - zabbix_log(LOG_LEVEL_DEBUG,"In DCadd_history_uint(itemid:" ZBX_FS_UI64 ")", - itemid); + ZBX_DC_HISTORY *history; + size_t len; LOCK_CACHE; - history=&cache->pool.history[cache->pool.history_count]; - cache->pool.history_count++; + + len = strlen(value) + 1; + history = DCget_history_ptr(itemid, len); history->itemid = itemid; history->clock = clock; history->value_type = ITEM_VALUE_TYPE_STR; - strscpy(history->value.value_str,value); + history->value.value_str = cache->last_text; + zbx_strlcpy(cache->last_text, value, len); + cache->last_text += len; UNLOCK_CACHE; +} - return SUCCEED; +/****************************************************************************** + * * + * Function: DCadd_history_text * + * * + * Purpose: * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCadd_history_text(zbx_uint64_t itemid, char *value, int clock) +{ + ZBX_DC_HISTORY *history; + size_t len; + + LOCK_CACHE; + + len = strlen(value) + 1; + history = DCget_history_ptr(itemid, len); + + history->itemid = itemid; + history->clock = clock; + history->value_type = ITEM_VALUE_TYPE_TEXT; + history->value.value_str = cache->last_text; + zbx_strlcpy(cache->last_text, value, len); + cache->last_text += len; + + UNLOCK_CACHE; } +/****************************************************************************** + * * + * Function: DCadd_history_log * + * * + * Purpose: * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Alekasander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCadd_history_log(zbx_uint64_t itemid, char *value, int clock, int timestamp, char *source, int severity, int lastlogsize) +{ + ZBX_DC_HISTORY *history; + size_t len1, len2; + + LOCK_CACHE; + + len1 = strlen(value) + 1; + len2 = strlen(source) + 1; + history = DCget_history_ptr(itemid, len1 + len2); + + history->itemid = itemid; + history->clock = clock; + history->value_type = ITEM_VALUE_TYPE_LOG; + history->value.value_str = cache->last_text; + zbx_strlcpy(cache->last_text, value, len1); + cache->last_text += len1; + history->timestamp = timestamp; + history->source = cache->last_text; + zbx_strlcpy(cache->last_text, source, len2); + cache->last_text += len2; + history->severity = severity; + history->lastlogsize = lastlogsize; + + UNLOCK_CACHE; +} /****************************************************************************** * * @@ -375,7 +1890,7 @@ int DCadd_history_str(zbx_uint64_t itemid, char *value, int clock) * Comments: * * * ******************************************************************************/ -void init_database_cache(void) +void init_database_cache(zbx_process_t p) { #define ZBX_MAX_ATTEMPTS 10 int attempts = 0; @@ -383,6 +1898,8 @@ void init_database_cache(void) key_t shm_key; int shm_id; + zbx_process = p; + ZBX_GET_SHM_DBCACHE_KEY(shm_key); lbl_create: @@ -427,13 +1944,15 @@ lbl_create: zbx_error("Unable to create mutex for database cache"); exit(FAIL); } + + cache->last_text = cache->text; } /****************************************************************************** * * - * Function: free_database_cache * + * Function: DCsync_all * * * - * Purpose: Free memory aloccated for database cache * + * Purpose: writes updates and new data from pool and cache data to database * * * * Parameters: * * * @@ -444,46 +1963,22 @@ lbl_create: * Comments: * * * ******************************************************************************/ -void free_database_cache(void) +static void DCsync_all() { + zabbix_log(LOG_LEVEL_DEBUG,"In DCsync_all()"); - key_t shm_key; - int shm_id; - - zabbix_log(LOG_LEVEL_WARNING,"In free_database_cache()"); - - if(NULL == cache) return; - - DCsync_all(); - - LOCK_CACHE; - - ZBX_GET_SHM_DBCACHE_KEY(shm_key); - - shm_id = shmget(shm_key, sizeof(ZBX_DC_CACHE), 0); - - if (-1 == shm_id) - { - zabbix_log(LOG_LEVEL_ERR, "Can't find shared memory for database cache. [%s]",strerror(errno)); - exit(1); - } + DCsync_history(ZBX_SYNC_FULL); + if (zbx_process == ZBX_PROCESS_SERVER) + DCsync_trends(); - shmctl(shm_id, IPC_RMID, 0); - - cache = NULL; - - UNLOCK_CACHE; - - zbx_mutex_destroy(&cache_lock); - - zabbix_log(LOG_LEVEL_WARNING,"End of free_database_cache()"); + zabbix_log(LOG_LEVEL_DEBUG,"End of DCsync_all()"); } /****************************************************************************** * * - * Function: DCsync_all * + * Function: free_database_cache * * * - * Purpose: writes updates and new data from pool and cache data to database * + * Purpose: Free memory aloccated for database cache * * * * Parameters: * * * @@ -494,61 +1989,38 @@ void free_database_cache(void) * Comments: * * * ******************************************************************************/ -void DCsync_all() +void free_database_cache() { - int i; - ZBX_DC_ITEM *item; - ZBX_DC_TREND *trend; + key_t shm_key; + int shm_id; - zabbix_log(LOG_LEVEL_WARNING,"In DCsync_all(items %d pool:trends %d pool:history:%d)", - cache->items_count, - cache->pool.trends_count, - cache->pool.history_count); + zabbix_log(LOG_LEVEL_DEBUG, "In free_database_cache()"); - DCsync(); + if (NULL == cache) + return; + + DCsync_all(); LOCK_CACHE; - DBbegin(); + + ZBX_GET_SHM_DBCACHE_KEY(shm_key); - zabbix_log(LOG_LEVEL_WARNING,"In items_count %d", - cache->items_count); + shm_id = shmget(shm_key, sizeof(ZBX_DC_CACHE), 0); - for(i=0;i<cache->items_count;i++) + if (-1 == shm_id) { - item = &cache->items[i]; - trend = &item->trend; - - zabbix_log(LOG_LEVEL_DEBUG,"Trend " ZBX_FS_UI64, - trend->itemid); - - if(trend->clock == 0) continue; - - if(trend->operation == ZBX_TREND_OP_INSERT) - { - DBexecute("insert into trends (clock,itemid,num,value_min,value_avg,value_max) values (%d," ZBX_FS_UI64 ",%d," ZBX_FS_DBL "," ZBX_FS_DBL "," ZBX_FS_DBL ")", - trend->clock, - trend->itemid, - trend->num, - trend->value_min, - trend->value_avg, - trend->value_max); - } - else if(trend->operation == ZBX_TREND_OP_UPDATE) - { - DBexecute("update trends set num=%d, value_min=" ZBX_FS_DBL ", value_avg=" ZBX_FS_DBL ", value_max=" ZBX_FS_DBL " where itemid=" ZBX_FS_UI64 " and clock=%d", - trend->num, - trend->value_min, - trend->value_avg, - trend->value_max, - trend->itemid, - trend->clock); - } + zabbix_log(LOG_LEVEL_ERR, "Can't find shared memory for database cache. [%s]",strerror(errno)); + exit(1); } - DBcommit(); + shmctl(shm_id, IPC_RMID, 0); + + cache = NULL; UNLOCK_CACHE; - zabbix_log(LOG_LEVEL_WARNING,"End of DCsync_all()"); + zbx_mutex_destroy(&cache_lock); + + zabbix_log(LOG_LEVEL_DEBUG,"End of free_database_cache()"); } diff --git a/src/libs/zbxdbcache/nextchecks.c b/src/libs/zbxdbcache/nextchecks.c new file mode 100644 index 00000000..935f2cd9 --- /dev/null +++ b/src/libs/zbxdbcache/nextchecks.c @@ -0,0 +1,213 @@ +/* +** ZABBIX +** Copyright (C) 2000-2007 SIA Zabbix +** +** This program is free software; you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation; either version 2 of the License, or +** (at your option) any later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software +** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +**/ + +#include "common.h" +#include "log.h" + +#include "db.h" +#include "dbcache.h" + +static ZBX_DC_NEXTCHECK *nextchecks = NULL; +static int nextcheck_allocated = 64; +static int nextcheck_num; + +/****************************************************************************** + * * + * Function: DCinit_nextchecks * + * * + * Purpose: initialize nextchecks array * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Aleksander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCinit_nextchecks() +{ + zabbix_log(LOG_LEVEL_DEBUG, "In DCinit_nextchecks()"); + + if (NULL == nextchecks) + nextchecks = zbx_malloc(nextchecks, nextcheck_allocated * sizeof(ZBX_DC_NEXTCHECK)); + + nextcheck_num = 0; +} + +static int DCget_nextcheck_nearestindex(time_t clock) +{ + int first_index, last_index, index; + + if (nextcheck_num == 0) + return 0; + + first_index = 0; + last_index = nextcheck_num - 1; + while (1) + { + index = first_index + (last_index - first_index) / 2; + + if (nextchecks[index].clock == clock) + return index; + else if (last_index == first_index) + { + if (nextchecks[index].clock < clock) + index++; + return index; + } + else if (nextchecks[index].clock < clock) + first_index = index + 1; + else + last_index = index; + } +} + +/****************************************************************************** + * * + * Function: DCadd_nextcheck * + * * + * Purpose: add item nextcheck to the array * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Aleksander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCadd_nextcheck(DB_ITEM *item, time_t now, const char *error_msg) +{ + int index, nextcheck; + + zabbix_log(LOG_LEVEL_DEBUG, "In DCadd_nextcheck()"); + + if (nextcheck_allocated == nextcheck_num) + { + nextcheck_allocated *= 2; + nextchecks = zbx_realloc(nextchecks, nextcheck_allocated * sizeof(ZBX_DC_NEXTCHECK)); + } + + nextcheck = calculate_item_nextcheck(item->itemid, item->type, item->delay, item->delay_flex, now); + index = DCget_nextcheck_nearestindex(nextcheck); + + memmove(&nextchecks[index + 1], &nextchecks[index], sizeof(ZBX_DC_NEXTCHECK) * (nextcheck_num - index)); + + nextchecks[index].itemid = item->itemid; + nextchecks[index].clock = nextcheck; + nextchecks[index].error_msg = (NULL != error_msg) ? strdup(error_msg) : NULL; + + nextcheck_num ++; +} + +/****************************************************************************** + * * + * Function: DCflush_nextchecks * + * * + * Purpose: add item nextcheck to the array * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Aleksander Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +void DCflush_nextchecks() +{ + int i, sql_offset = 0; + static char *sql = NULL; + static int sql_allocated = 4096; + time_t last_clock = -1; + char error_esc[ITEM_ERROR_LEN_MAX * 2]; + + zabbix_log(LOG_LEVEL_DEBUG, "In DCflush_nextchecks()"); + + if (nextcheck_num == 0) + return; + + if (sql == NULL) + sql = zbx_malloc(sql, sql_allocated); + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "begin\n"); +#endif + + for (i = 0; i < nextcheck_num; i++ ) + { + if (NULL == nextchecks[i].error_msg) + { + if (last_clock != nextchecks[i].clock) { + if (last_clock != -1) + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ");\n"); + } + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, + "update items set nextcheck=%d where itemid in (", + (int)nextchecks[i].clock); + + last_clock = nextchecks[i].clock; + } + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 32, ZBX_FS_UI64 ",", + nextchecks[i].itemid); + } + } + + if (0 != sql_offset) + { + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4, ");\n"); + } + + for (i = 0; i < nextcheck_num; i++ ) + { + if (NULL != nextchecks[i].error_msg) /* not supported items */ + { + DBescape_string(nextchecks[i].error_msg, error_esc, sizeof(error_esc)); + zbx_free(nextchecks[i].error_msg); + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 1024, + "update items set status=%d,lastclock=%d,nextcheck=%d,error='%s' where itemid=" ZBX_FS_UI64 ";\n", + ITEM_STATUS_NOTSUPPORTED, + (int)nextchecks[i].clock, + (int)(nextchecks[i].clock + CONFIG_REFRESH_UNSUPPORTED), + error_esc, + nextchecks[i].itemid); + } + } + +#ifdef HAVE_ORACLE + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, "end;\n"); +#endif + + if (0 != sql_offset) + { + DBbegin(); + DBexecute("%s", sql); + DBcommit(); + } +} diff --git a/src/libs/zbxdbhigh/db.c b/src/libs/zbxdbhigh/db.c index 5eeb1a38..d188d873 100644 --- a/src/libs/zbxdbhigh/db.c +++ b/src/libs/zbxdbhigh/db.c @@ -807,191 +807,149 @@ void DBupdate_triggers_status_after_restart(void) return; } -void DBupdate_host_availability(zbx_uint64_t hostid,int available,int clock, char *error) +void DBupdate_host_availability(DB_ITEM *item, int available, int clock, const char *error) { - DB_RESULT result; - DB_ROW row; - char error_esc[MAX_STRING_LEN]; - int disable_until; - - zabbix_log(LOG_LEVEL_DEBUG,"In update_host_availability()"); - - if(error!=NULL) - { - DBescape_string(error,error_esc,MAX_STRING_LEN); - } - else - { - strscpy(error_esc,""); - } - - result = DBselect("select available,disable_until from hosts where hostid=" ZBX_FS_UI64, - hostid); - row=DBfetch(result); - - if(!row) - { - zabbix_log(LOG_LEVEL_ERR, "Cannot select host with hostid [" ZBX_FS_UI64 "]", - hostid); - zabbix_syslog("Cannot select host with hostid [" ZBX_FS_UI64 "]", - hostid); - DBfree_result(result); - return; - } + char error_esc[MAX_STRING_LEN], error_msg[MAX_STRING_LEN]; + int log_level = LOG_LEVEL_WARNING; - disable_until = atoi(row[1]); + zabbix_log(LOG_LEVEL_DEBUG, "In update_host_availability()"); - if(available == atoi(row[0])) + if (item->host_available == available) { -/* if((available==HOST_AVAILABLE_FALSE) - &&(clock+CONFIG_UNREACHABLE_PERIOD>disable_until) ) + if (available == HOST_AVAILABLE_FALSE) { + DBescape_string(error, error_esc, sizeof(error_esc)); + DBexecute("update hosts set error='%s',disable_until=%d where hostid=" ZBX_FS_UI64, + error_esc, + clock + CONFIG_UNAVAILABLE_DELAY, + item->hostid); } - else - {*/ - zabbix_log(LOG_LEVEL_DEBUG, "Host already has availability [%d]", - available); - DBfree_result(result); - return; -/* }*/ + return; } - DBfree_result(result); + item->host_available = available; - if(available==HOST_AVAILABLE_TRUE) + if (available == HOST_AVAILABLE_TRUE) { - DBexecute("update hosts set available=%d,error=' ',errors_from=0 where hostid=" ZBX_FS_UI64, - HOST_AVAILABLE_TRUE, - hostid); + zbx_snprintf(error_msg, sizeof(error_msg), "Enabling host [%s]", + item->host_name); + + DBexecute("update hosts set available=%d,error='',errors_from=0 where hostid=" ZBX_FS_UI64, + available, + item->hostid); + item->host_errors_from = 0; } - else if(available==HOST_AVAILABLE_FALSE) + else if (available == HOST_AVAILABLE_FALSE) { -/* if(disable_until+CONFIG_UNREACHABLE_PERIOD>clock) - { - zbx_snprintf(sql,sizeof(sql),"update hosts set available=%d,disable_until=disable_until+%d,error='%s' where hostid=%d",HOST_AVAILABLE_FALSE,CONFIG_UNREACHABLE_DELAY,error_esc,hostid); - } - else - { - zbx_snprintf(sql,sizeof(sql),"update hosts set available=%d,disable_until=%d,error='%s' where hostid=%d",HOST_AVAILABLE_FALSE,clock+CONFIG_UNREACHABLE_DELAY,error_esc,hostid); - }*/ - /* '%s ' - space to make Oracle happy */ - DBexecute("update hosts set available=%d,error='%s ' where hostid=" ZBX_FS_UI64, - HOST_AVAILABLE_FALSE, - error_esc, - hostid); + zbx_snprintf(error_msg, sizeof(error_msg), "Host [%s] will be checked after %d seconds", + item->host_name, + CONFIG_UNAVAILABLE_DELAY); + + DBescape_string(error, error_esc, sizeof(error_esc)); + DBexecute("update hosts set available=%d,error='%s',disable_until=%d where hostid=" ZBX_FS_UI64, + available, + error_esc, + clock + CONFIG_UNAVAILABLE_DELAY, + item->hostid); + + update_triggers_status_to_unknown(item->hostid, clock, "Host is unavailable."); } else { - zabbix_log( LOG_LEVEL_ERR, "Unknown host availability [%d] for hostid [" ZBX_FS_UI64 "]", - available, - hostid); - zabbix_syslog("Unknown host availability [%d] for hostid [" ZBX_FS_UI64 "]", - available, - hostid); - return; + log_level = LOG_LEVEL_ERR; + zbx_snprintf(error_msg, sizeof(error_msg), "Unknown host availability [%d] for host [%s]", + available, + item->host_name); } - update_triggers_status_to_unknown(hostid,clock,"Host is unavailable."); - zabbix_log(LOG_LEVEL_DEBUG,"End of update_host_availability()"); - - return; + zabbix_log(log_level, "%s", error_msg); + zabbix_syslog("%s", error_msg); } -void DBproxy_update_host_availability(zbx_uint64_t hostid, int available, int clock) +void DBproxy_update_host_availability(DB_ITEM *item, int available, int clock) { - DB_RESULT result; - DB_ROW row; - int disable_until; + char error_msg[MAX_STRING_LEN]; + int log_level = LOG_LEVEL_WARNING; - zabbix_log(LOG_LEVEL_DEBUG,"In DBproxy_update_host_availability()"); + zabbix_log(LOG_LEVEL_DEBUG, "In DBproxy_update_host_availability()"); - result = DBselect("select available,disable_until from hosts where hostid=" ZBX_FS_UI64, - hostid); - - if (NULL == (row = DBfetch(result))) { - zabbix_log(LOG_LEVEL_ERR, "Cannot select host with hostid [" ZBX_FS_UI64 "]", - hostid); - zabbix_syslog("Cannot select host with hostid [" ZBX_FS_UI64 "]", - hostid); - goto out; + if (item->host_available == available) + { + if (available == HOST_AVAILABLE_FALSE) + { + DBexecute("update hosts set disable_until=%d where hostid=" ZBX_FS_UI64, + clock + CONFIG_UNAVAILABLE_DELAY, + item->hostid); + } + return; } - disable_until = atoi(row[1]); + item->host_available = available; - if (available == atoi(row[0])) { - zabbix_log(LOG_LEVEL_DEBUG, "Host already has availability [%d]", - available); - goto out; - } + if (available == HOST_AVAILABLE_TRUE) + { + zbx_snprintf(error_msg, sizeof(error_msg), "Enabling host [%s]", + item->host_name); - switch (available) { - case HOST_AVAILABLE_TRUE: DBexecute("update hosts set available=%d,errors_from=0 where hostid=" ZBX_FS_UI64, - HOST_AVAILABLE_TRUE, - hostid); - break; - case HOST_AVAILABLE_FALSE: - DBexecute("update hosts set available=%d where hostid=" ZBX_FS_UI64, - HOST_AVAILABLE_FALSE, - hostid); - break; - default: - zabbix_log( LOG_LEVEL_ERR, "Unknown host availability [%d] for hostid [" ZBX_FS_UI64 "]", available, - hostid); + item->hostid); + item->host_errors_from = 0; } + else if (available == HOST_AVAILABLE_FALSE) + { + zbx_snprintf(error_msg, sizeof(error_msg), "Host [%s] will be checked after %d seconds", + item->host_name, + CONFIG_UNAVAILABLE_DELAY); -out: - DBfree_result(result); - - zabbix_log(LOG_LEVEL_DEBUG,"End of DBproxy_update_host_availability()"); + DBexecute("update hosts set available=%d,disable_until=%d where hostid=" ZBX_FS_UI64, + available, + clock + CONFIG_UNAVAILABLE_DELAY, + item->hostid); + } + else + { + log_level = LOG_LEVEL_ERR; + zbx_snprintf(error_msg, sizeof(error_msg), "Unknown host availability [%d] for host [%s]", + available, + item->host_name); + } - return; + zabbix_log(log_level, "%s", error_msg); + zabbix_syslog("%s", error_msg); } -int DBupdate_item_status_to_notsupported(zbx_uint64_t itemid, const char *error) +int DBupdate_item_status_to_notsupported(DB_ITEM *item, int clock, const char *error) { char error_esc[MAX_STRING_LEN]; - int now; - zabbix_log(LOG_LEVEL_DEBUG,"In DBupdate_item_status_to_notsupported()"); + zabbix_log(LOG_LEVEL_DEBUG, "In DBupdate_item_status_to_notsupported()"); - if(error!=NULL) - { - DBescape_string(error,error_esc,MAX_STRING_LEN); - } - else - { - strscpy(error_esc,""); - } + DBescape_string(error, error_esc, sizeof(error_esc)); - now = time(NULL); + DBexecute("update items set status=%d,lastclock=%d,nextcheck=%d,error='%s' where itemid=" ZBX_FS_UI64, + ITEM_STATUS_NOTSUPPORTED, + clock, + clock + CONFIG_REFRESH_UNSUPPORTED, + error_esc, + item->itemid); - /* '%s ' to make Oracle happy */ - DBexecute("update items set status=%d,nextcheck=%d,error='%s ' where itemid=" ZBX_FS_UI64, - ITEM_STATUS_NOTSUPPORTED, - CONFIG_REFRESH_UNSUPPORTED+now, - error_esc, - itemid); + item->status = ITEM_STATUS_NOTSUPPORTED; return SUCCEED; } - -int DBproxy_update_item_status_to_notsupported(zbx_uint64_t itemid) +/* +int DBproxy_update_item_status_to_notsupported(DB_ITEM *item, int clock) { - int now; - - zabbix_log(LOG_LEVEL_DEBUG,"In DBproxy_update_item_status_to_notsupported()"); - - now = time(NULL); + zabbix_log(LOG_LEVEL_DEBUG, "In DBproxy_update_item_status_to_notsupported()"); DBexecute("update items set status=%d,nextcheck=%d where itemid=" ZBX_FS_UI64, - ITEM_STATUS_NOTSUPPORTED, - CONFIG_REFRESH_UNSUPPORTED+now, - itemid); + ITEM_STATUS_NOTSUPPORTED, + clock + CONFIG_REFRESH_UNSUPPORTED, + itemid); return SUCCEED; -} +}*/ static int DBadd_trend(zbx_uint64_t itemid, double value, int clock) { @@ -1003,11 +961,6 @@ static int DBadd_trend(zbx_uint64_t itemid, double value, int clock) zabbix_log(LOG_LEVEL_DEBUG,"In add_trend()"); - if(CONFIG_DBSYNCER_FORKS >0) - { - return DCadd_trend(itemid, value, clock); - } - hour=clock-clock%3600; result = DBselect("select num,value_min,value_avg,value_max from trends where itemid=" ZBX_FS_UI64 " and clock=%d", @@ -1118,17 +1071,17 @@ int DBadd_history(zbx_uint64_t itemid, double value, int clock) clock, itemid, value); - } - DBadd_trend(itemid, value, clock); + DBadd_trend(itemid, value, clock); - if((CONFIG_NODE_NOHISTORY == 0) && (CONFIG_MASTER_NODEID>0)) - { - DBexecute("insert into history_sync (nodeid,clock,itemid,value) values (%d,%d," ZBX_FS_UI64 "," ZBX_FS_DBL ")", - get_nodeid_by_id(itemid), - clock, - itemid, - value); + if((CONFIG_NODE_NOHISTORY == 0) && (CONFIG_MASTER_NODEID>0)) + { + DBexecute("insert into history_sync (nodeid,clock,itemid,value) values (%d,%d," ZBX_FS_UI64 "," ZBX_FS_DBL ")", + get_nodeid_by_id(itemid), + clock, + itemid, + value); + } } return SUCCEED; @@ -1148,17 +1101,17 @@ int DBadd_history_uint(zbx_uint64_t itemid, zbx_uint64_t value, int clock) clock, itemid, value); - } - DBadd_trend_uint(itemid, value, clock); + DBadd_trend_uint(itemid, value, clock); - if((CONFIG_NODE_NOHISTORY == 0) && (CONFIG_MASTER_NODEID>0)) - { - DBexecute("insert into history_uint_sync (nodeid,clock,itemid,value) values (%d,%d," ZBX_FS_UI64 "," ZBX_FS_UI64 ")", - get_nodeid_by_id(itemid), - clock, - itemid, - value); + if((CONFIG_NODE_NOHISTORY == 0) && (CONFIG_MASTER_NODEID>0)) + { + DBexecute("insert into history_uint_sync (nodeid,clock,itemid,value) values (%d,%d," ZBX_FS_UI64 "," ZBX_FS_UI64 ")", + get_nodeid_by_id(itemid), + clock, + itemid, + value); + } } return SUCCEED; @@ -1174,23 +1127,23 @@ int DBadd_history_str(zbx_uint64_t itemid, char *value, int clock) { DCadd_history_str(itemid, value, clock); } + else + { + DBescape_string(value,value_esc,MAX_STRING_LEN); - DBescape_string(value,value_esc,MAX_STRING_LEN); - - if(CONFIG_DBSYNCER_FORKS == 0) { DBexecute("insert into history_str (clock,itemid,value) values (%d," ZBX_FS_UI64 ",'%s')", clock, itemid, value_esc); - } - if((CONFIG_NODE_NOHISTORY == 0) && (CONFIG_MASTER_NODEID>0)) - { - DBexecute("insert into history_str_sync (nodeid,clock,itemid,value) values (%d,%d," ZBX_FS_UI64 ",'%s')", - get_nodeid_by_id(itemid), - clock, - itemid, - value_esc); + if((CONFIG_NODE_NOHISTORY == 0) && (CONFIG_MASTER_NODEID>0)) + { + DBexecute("insert into history_str_sync (nodeid,clock,itemid,value) values (%d,%d," ZBX_FS_UI64 ",'%s')", + get_nodeid_by_id(itemid), + clock, + itemid, + value_esc); + } } return SUCCEED; @@ -1204,133 +1157,141 @@ int DBadd_history_text(zbx_uint64_t itemid, char *value, int clock) int value_esc_max_len = 0; int ret = FAIL; zbx_uint64_t id; +#else + char *value_esc = NULL; + zbx_uint64_t id; +#endif - sqlo_lob_desc_t loblp; /* the lob locator */ - sqlo_stmt_handle_t sth = 0; + zabbix_log(LOG_LEVEL_DEBUG, "In add_history_text()"); - sqlo_autocommit_off(oracle); + if(CONFIG_DBSYNCER_FORKS > 0) + { + DCadd_history_text(itemid, value, clock); + } + else + { +#ifdef HAVE_ORACLE + sqlo_lob_desc_t loblp; /* the lob locator */ + sqlo_stmt_handle_t sth = 0; - zabbix_log(LOG_LEVEL_DEBUG,"In add_history_text()"); + sqlo_autocommit_off(oracle); - value_esc_max_len = strlen(value)+1024; - value_esc = zbx_malloc(value_esc, value_esc_max_len); + value_esc_max_len = strlen(value)+1024; + value_esc = zbx_malloc(value_esc, value_esc_max_len); - DBescape_string(value, value_esc, value_esc_max_len-1); - value_esc_max_len = strlen(value_esc); + DBescape_string(value, value_esc, value_esc_max_len-1); + value_esc_max_len = strlen(value_esc); - /* alloate the lob descriptor */ - if(sqlo_alloc_lob_desc(oracle, &loblp) < 0) - { - zabbix_log(LOG_LEVEL_DEBUG,"CLOB allocating failed:%s", sqlo_geterror(oracle)); - goto lbl_exit; - } + /* alloate the lob descriptor */ + if(sqlo_alloc_lob_desc(oracle, &loblp) < 0) + { + zabbix_log(LOG_LEVEL_DEBUG,"CLOB allocating failed:%s", sqlo_geterror(oracle)); + goto lbl_exit; + } - id = DBget_maxid("history_text", "id"); - zbx_snprintf(sql, sizeof(sql), "insert into history_text (id,clock,itemid,value)" - " values (" ZBX_FS_UI64 ",%d," ZBX_FS_UI64 ", EMPTY_CLOB()) returning value into :1", - id, - clock, - itemid); + id = DBget_maxid("history_text", "id"); + zbx_snprintf(sql, sizeof(sql), "insert into history_text (id,clock,itemid,value)" + " values (" ZBX_FS_UI64 ",%d," ZBX_FS_UI64 ", EMPTY_CLOB()) returning value into :1", + id, + clock, + itemid); - zabbix_log(LOG_LEVEL_DEBUG,"Query:%s", sql); + zabbix_log(LOG_LEVEL_DEBUG,"Query:%s", sql); - /* parse the statement */ - sth = sqlo_prepare(oracle, sql); - if(sth < 0) - { - zabbix_log(LOG_LEVEL_DEBUG,"Query prepearing failed:%s", sqlo_geterror(oracle)); - goto lbl_exit; - } + /* parse the statement */ + sth = sqlo_prepare(oracle, sql); + if(sth < 0) + { + zabbix_log(LOG_LEVEL_DEBUG,"Query prepearing failed:%s", sqlo_geterror(oracle)); + goto lbl_exit; + } - /* bind input variables. Note: we bind the lob descriptor here */ - if(SQLO_SUCCESS != sqlo_bind_by_pos(sth, 1, SQLOT_CLOB, &loblp, 0, NULL, 0)) - { - zabbix_log(LOG_LEVEL_DEBUG,"CLOB binding failed:%s", sqlo_geterror(oracle)); - goto lbl_exit_loblp; - } + /* bind input variables. Note: we bind the lob descriptor here */ + if(SQLO_SUCCESS != sqlo_bind_by_pos(sth, 1, SQLOT_CLOB, &loblp, 0, NULL, 0)) + { + zabbix_log(LOG_LEVEL_DEBUG,"CLOB binding failed:%s", sqlo_geterror(oracle)); + goto lbl_exit_loblp; + } - /* execute the statement */ - if(sqlo_execute(sth, 1) != SQLO_SUCCESS) - { - zabbix_log(LOG_LEVEL_DEBUG,"Query failed:%s", sqlo_geterror(oracle)); - goto lbl_exit_loblp; - } + /* execute the statement */ + if(sqlo_execute(sth, 1) != SQLO_SUCCESS) + { + zabbix_log(LOG_LEVEL_DEBUG,"Query failed:%s", sqlo_geterror(oracle)); + goto lbl_exit_loblp; + } - /* write the lob */ - ret = sqlo_lob_write_buffer(oracle, loblp, value_esc_max_len, value_esc, value_esc_max_len, SQLO_ONE_PIECE); - if(ret < 0) - { - zabbix_log(LOG_LEVEL_DEBUG,"CLOB writing failed:%s", sqlo_geterror(oracle) ); - goto lbl_exit_loblp; - } + /* write the lob */ + ret = sqlo_lob_write_buffer(oracle, loblp, value_esc_max_len, value_esc, value_esc_max_len, SQLO_ONE_PIECE); + if(ret < 0) + { + zabbix_log(LOG_LEVEL_DEBUG,"CLOB writing failed:%s", sqlo_geterror(oracle) ); + goto lbl_exit_loblp; + } - /* commiting */ - if(sqlo_commit(oracle) < 0) - { - zabbix_log(LOG_LEVEL_DEBUG,"Commiting failed:%s", sqlo_geterror(oracle) ); - } + /* commiting */ + if(sqlo_commit(oracle) < 0) + { + zabbix_log(LOG_LEVEL_DEBUG,"Commiting failed:%s", sqlo_geterror(oracle) ); + } - ret = SUCCEED; + ret = SUCCEED; lbl_exit_loblp: - sqlo_free_lob_desc(oracle, &loblp); + sqlo_free_lob_desc(oracle, &loblp); lbl_exit: - if(sth >= 0) sqlo_close(sth); - zbx_free(value_esc); + if(sth >= 0) sqlo_close(sth); + zbx_free(value_esc); - sqlo_autocommit_on(oracle); + sqlo_autocommit_on(oracle); - return ret; + return ret; #else /* HAVE_ORACLE */ + value_esc = DBdyn_escape_string(value); - char *value_esc = NULL; - zbx_uint64_t id; - - zabbix_log(LOG_LEVEL_DEBUG, "In add_history_str()"); - - value_esc = DBdyn_escape_string(value); - - id = DBget_maxid("history_text", "id"); + id = DBget_maxid("history_text", "id"); - DBexecute("insert into history_text (id,clock,itemid,value)" - " values (" ZBX_FS_UI64 ",%d," ZBX_FS_UI64 ",'%s')", - id, - clock, - itemid, - value_esc); - - zbx_free(value_esc); - - return SUCCEED; + DBexecute("insert into history_text (id,clock,itemid,value)" + " values (" ZBX_FS_UI64 ",%d," ZBX_FS_UI64 ",'%s')", + id, + clock, + itemid, + value_esc); + zbx_free(value_esc); #endif + } + return SUCCEED; } -int DBadd_history_log(zbx_uint64_t id, zbx_uint64_t itemid, char *value, int clock, int timestamp, char *source, int severity) +int DBadd_history_log(zbx_uint64_t itemid, char *value, int clock, int timestamp, char *source, int severity, int lastlogsize) { - char *value_esc = NULL, source_esc[MAX_STRING_LEN]; + char *value_esc = NULL, source_esc[MAX_STRING_LEN]; zabbix_log(LOG_LEVEL_DEBUG, "In add_history_log()"); - value_esc = DBdyn_escape_string(value); - DBescape_string(source, source_esc, sizeof(source_esc)); - - if (id == 0) - id = DBget_maxid("history_log", "id"); + if(CONFIG_DBSYNCER_FORKS > 0) + { + DCadd_history_log(itemid, value, clock, timestamp, source, severity, lastlogsize); + } + else + { + value_esc = DBdyn_escape_string(value); + DBescape_string(source, source_esc, sizeof(source_esc)); - DBexecute("insert into history_log (id,clock,itemid,timestamp,value,source,severity)" - " values (" ZBX_FS_UI64 ",%d," ZBX_FS_UI64 ",%d,'%s','%s',%d)", - id, - clock, - itemid, - timestamp, - value_esc, - source_esc, - severity); + DBexecute("insert into history_log (id,clock,itemid,timestamp,value,source,severity)" + " values (" ZBX_FS_UI64 ",%d," ZBX_FS_UI64 ",%d,'%s','%s',%d)", + DBget_maxid("history_log", "id"), + clock, + itemid, + timestamp, + value_esc, + source_esc, + severity); - zbx_free(value_esc); + zbx_free(value_esc); + } return SUCCEED; } @@ -1917,7 +1878,7 @@ const ZBX_FIELD *DBget_field(const ZBX_TABLE *table, const char *fieldname) return NULL; } -zbx_uint64_t DBget_maxid(char *tablename, char *fieldname) +zbx_uint64_t DBget_maxid_num(char *tablename, char *fieldname, int num) { DB_RESULT result; DB_ROW row; @@ -2001,10 +1962,11 @@ zbx_uint64_t DBget_maxid(char *tablename, char *fieldname) continue; } - DBexecute("update ids set nextid=nextid+1 where nodeid=%d and table_name='%s' and field_name='%s'", - nodeid, - tablename, - fieldname); + DBexecute("update ids set nextid=nextid+%d where nodeid=%d and table_name='%s' and field_name='%s'", + num, + nodeid, + tablename, + fieldname); result = DBselect("select nextid from ids where nodeid=%d and table_name='%s' and field_name='%s'", nodeid, @@ -2018,7 +1980,7 @@ zbx_uint64_t DBget_maxid(char *tablename, char *fieldname) } else { ZBX_STR2UINT64(ret2, row[0]); DBfree_result(result); - if (ret1 + 1 == ret2) + if (ret1 + num == ret2) found = SUCCEED; } } @@ -2030,7 +1992,7 @@ zbx_uint64_t DBget_maxid(char *tablename, char *fieldname) fieldname, ret2); - return ret2; + return ret2 - num + 1; /* if(CONFIG_NODEID == 0) { @@ -2071,106 +2033,109 @@ zbx_uint64_t DBget_maxid(char *tablename, char *fieldname) return ret;*/ } -int DBproxy_add_history(const char *host, const char *key, int clock, double value) +void DBproxy_add_history(zbx_uint64_t itemid, double value, int clock) { - char host_esc[MAX_STRING_LEN], key_esc[MAX_STRING_LEN]; - zabbix_log(LOG_LEVEL_DEBUG, "In proxy_add_history()"); - DBescape_string(host, host_esc, sizeof(host_esc)); - DBescape_string(key, key_esc, sizeof(key_esc)); - - DBexecute("insert into proxy_history (host,key_,clock,value) values ('%s','%s',%d,'" ZBX_FS_DBL "')", - host_esc, - key_esc, - clock, - value); - - return SUCCEED; + if(CONFIG_DBSYNCER_FORKS > 0) + { + DCadd_history(itemid, value, clock); + } + else + { + DBexecute("insert into proxy_history (itemid,clock,value) values (" ZBX_FS_UI64 ",%d,'" ZBX_FS_DBL "')", + itemid, + clock, + value); + } } -int DBproxy_add_history_uint(const char *host, const char *key, int clock, zbx_uint64_t value) +void DBproxy_add_history_uint(zbx_uint64_t itemid, zbx_uint64_t value, int clock) { - char host_esc[MAX_STRING_LEN], key_esc[MAX_STRING_LEN]; - zabbix_log(LOG_LEVEL_DEBUG, "In proxy_add_history_uint()"); - DBescape_string(host, host_esc, sizeof(host_esc)); - DBescape_string(key, key_esc, sizeof(key_esc)); - - DBexecute("insert into proxy_history (host,key_,clock,value) values ('%s','%s',%d,'" ZBX_FS_UI64 "')", - host_esc, - key_esc, - clock, - value); - - return SUCCEED; + if(CONFIG_DBSYNCER_FORKS > 0) + { + DCadd_history_uint(itemid, value, clock); + } + else + { + DBexecute("insert into proxy_history (itemid,clock,value) values (" ZBX_FS_UI64 ",%d,'" ZBX_FS_UI64 "')", + itemid, + clock, + value); + } } -int DBproxy_add_history_str(const char *host, const char *key, int clock, char *value) +void DBproxy_add_history_str(zbx_uint64_t itemid, char *value, int clock) { - char host_esc[MAX_STRING_LEN], key_esc[MAX_STRING_LEN], value_esc[MAX_STRING_LEN]; + char value_esc[MAX_STRING_LEN]; zabbix_log(LOG_LEVEL_DEBUG, "In proxy_add_history_str()"); - DBescape_string(host, host_esc, sizeof(host_esc)); - DBescape_string(key, key_esc, sizeof(key_esc)); - DBescape_string(value, value_esc, sizeof(value_esc)); - - DBexecute("insert into proxy_history (host,key_,clock,value) values ('%s','%s',%d,'%s')", - host_esc, - key_esc, - clock, - value_esc); + if(CONFIG_DBSYNCER_FORKS > 0) + { + DCadd_history_str(itemid, value, clock); + } + else + { + DBescape_string(value, value_esc, sizeof(value_esc)); - return SUCCEED; + DBexecute("insert into proxy_history (itemid,clock,value) values (" ZBX_FS_UI64 ",%d,'%s')", + itemid, + clock, + value_esc); + } } -int DBproxy_add_history_text(const char *host, const char *key, int clock, char *value) +void DBproxy_add_history_text(zbx_uint64_t itemid, char *value, int clock) { - char host_esc[MAX_STRING_LEN], key_esc[MAX_STRING_LEN], *value_esc; + char *value_esc; zabbix_log(LOG_LEVEL_DEBUG, "In proxy_add_history_text()"); - DBescape_string(host, host_esc, sizeof(host_esc)); - DBescape_string(key, key_esc, sizeof(key_esc)); - value_esc = DBdyn_escape_string(value); - - DBexecute("insert into proxy_history (host,key_,clock,value) values ('%s','%s',%d,'%s')", - host_esc, - key_esc, - clock, - value_esc); + if(CONFIG_DBSYNCER_FORKS > 0) + { + DCadd_history_text(itemid, value, clock); + } + else + { + value_esc = DBdyn_escape_string(value); - zbx_free(value_esc); + DBexecute("insert into proxy_history (itemid,clock,value) values (" ZBX_FS_UI64 ",%d,'%s')", + itemid, + clock, + value_esc); - return SUCCEED; + zbx_free(value_esc); + } } -int DBproxy_add_history_log(const char *host, const char *key, int clock, int timestamp, char *source, int severity, char *value) +void DBproxy_add_history_log(zbx_uint64_t itemid, char *value, int clock, int timestamp, char *source, int severity, int lastlogsize) { - char host_esc[MAX_STRING_LEN], key_esc[MAX_STRING_LEN], - source_esc[MAX_STRING_LEN], *value_esc; + char source_esc[MAX_STRING_LEN], *value_esc; zabbix_log(LOG_LEVEL_DEBUG, "In proxy_add_history_log()"); - DBescape_string(host, host_esc, sizeof(host_esc)); - DBescape_string(key, key_esc, sizeof(key_esc)); - DBescape_string(source, source_esc, sizeof(source_esc)); - value_esc = DBdyn_escape_string(value); - - DBexecute("insert into proxy_history (host,key_,clock,timestamp,source,severity,value)" - " values ('%s','%s',%d,%d,'%s',%d,'%s')", - host_esc, - key_esc, - clock, - timestamp, - source_esc, - severity, - value_esc); + if(CONFIG_DBSYNCER_FORKS > 0) + { + DCadd_history_log(itemid, value, clock, timestamp, source, severity, lastlogsize); + } + else + { + DBescape_string(source, source_esc, sizeof(source_esc)); + value_esc = DBdyn_escape_string(value); - zbx_free(value_esc); + DBexecute("insert into proxy_history (itemid,clock,timestamp,source,severity,value)" + " values (" ZBX_FS_UI64 ",%d,%d,'%s',%d,'%s')", + itemid, + clock, + timestamp, + source_esc, + severity, + value_esc); - return SUCCEED; + zbx_free(value_esc); + } } diff --git a/src/libs/zbxserver/expression.c b/src/libs/zbxserver/expression.c index fdc23363..3141d35d 100644 --- a/src/libs/zbxserver/expression.c +++ b/src/libs/zbxserver/expression.c @@ -1503,9 +1503,9 @@ void substitute_macros(DB_EVENT *event, DB_ACTION *action, char **data) if('\0' == *data[0]) return; - zabbix_log(LOG_LEVEL_DEBUG, "Before substitute_simple_macros(%s)", *data); +/* zabbix_log(LOG_LEVEL_DEBUG, "Before substitute_simple_macros(%s)", *data);*/ substitute_simple_macros(event, action, data, MACRO_TYPE_MESSAGE_SUBJECT | MACRO_TYPE_MESSAGE_BODY); - zabbix_log(LOG_LEVEL_DEBUG, "After substitute_simple_macros(%s)", *data); +/* zabbix_log(LOG_LEVEL_DEBUG, "After substitute_simple_macros(%s)", *data);*/ pl = *data; while((pr = strchr(pl, '{'))) @@ -1599,7 +1599,7 @@ void substitute_macros(DB_EVENT *event, DB_ACTION *action, char **data) * Comments: example: "({15}>10)|({123}=0)" => "(6.456>10)|(0=0) * * * ******************************************************************************/ -int substitute_functions(char **exp, char *error, int maxerrlen) +static int substitute_functions(char **exp, char *error, int maxerrlen) { char *value; char functionid[MAX_STRING_LEN]; @@ -1674,7 +1674,7 @@ int substitute_functions(char **exp, char *error, int maxerrlen) * ({a0:system[procload].max(300)}>3) * * * ******************************************************************************/ -int evaluate_expression(int *result,char **expression, int trigger_value, char *error, int maxerrlen) +int evaluate_expression(int *result,char **expression, DB_TRIGGER *trigger, char *error, int maxerrlen) { /* Required for substitution of macros */ DB_EVENT event; @@ -1686,7 +1686,7 @@ int evaluate_expression(int *result,char **expression, int trigger_value, char * /* Substitute macros first */ memset(&event,0,sizeof(DB_EVENT)); memset(&action,0,sizeof(DB_ACTION)); - event.value = trigger_value; + event.value = trigger->value; substitute_simple_macros(&event, &action, expression, MACRO_TYPE_TRIGGER_EXPRESSION); diff --git a/src/libs/zbxserver/expression.h b/src/libs/zbxserver/expression.h index 9876dc08..15678c81 100644 --- a/src/libs/zbxserver/expression.h +++ b/src/libs/zbxserver/expression.h @@ -26,7 +26,6 @@ int cmp_double(double a,double b); int find_char(char *str,char c); -int evaluate_expression(int *result,char **expression, int triggger_value, char *error, int maxerrlen); void delete_reol(char *c); #endif diff --git a/src/libs/zbxserver/functions.c b/src/libs/zbxserver/functions.c index f453a9ca..e63e4121 100644 --- a/src/libs/zbxserver/functions.c +++ b/src/libs/zbxserver/functions.c @@ -152,7 +152,7 @@ void update_triggers(zbx_uint64_t itemid) trigger.type = atoi(row[8]); exp = strdup(trigger.expression); - if( evaluate_expression(&exp_value, &exp, trigger.value, error, sizeof(error)) != 0 ) + if( evaluate_expression(&exp_value, &exp, &trigger, error, sizeof(error)) != 0 ) { zabbix_log( LOG_LEVEL_WARNING, "Expression [%s] cannot be evaluated [%s]", trigger.expression, @@ -295,7 +295,8 @@ static int add_history(DB_ITEM *item, AGENT_RESULT *value, int now) else if(item->value_type==ITEM_VALUE_TYPE_LOG) { if(GET_STR_RESULT(value)) - DBadd_history_log(0, item->itemid,value->str,now,item->timestamp,item->eventlog_source,item->eventlog_severity); + DBadd_history_log(item->itemid, value->str, now, item->timestamp, item->eventlog_source, + item->eventlog_severity, item->lastlogsize); } else if(item->value_type==ITEM_VALUE_TYPE_TEXT) { @@ -575,10 +576,16 @@ void process_new_value(DB_ITEM *item, AGENT_RESULT *value, time_t now) } } } +/* +zabbix_log(LOG_LEVEL_CRIT, "I"); +*/ add_history(item, value, now); - update_item(item, value, now); - update_functions(item); + if (0 == CONFIG_DBSYNCER_FORKS) + { + update_item(item, value, now); + update_functions(item); + } } /****************************************************************************** @@ -625,27 +632,27 @@ static int proxy_add_history(DB_ITEM *item, AGENT_RESULT *value, int now) if(item->value_type==ITEM_VALUE_TYPE_UINT64) { if(GET_UI64_RESULT(value)) - DBproxy_add_history_uint(item->host_name, item->key, now, value->ui64); + DBproxy_add_history_uint(item->itemid, value->ui64, now); } else if(item->value_type==ITEM_VALUE_TYPE_FLOAT) { if(GET_DBL_RESULT(value)) - DBproxy_add_history(item->host_name, item->key, now, value->dbl); + DBproxy_add_history(item->itemid, value->dbl, now); } else if(item->value_type==ITEM_VALUE_TYPE_STR) { if(GET_STR_RESULT(value)) - DBproxy_add_history_str(item->host_name, item->key, now, value->str); + DBproxy_add_history_str(item->itemid, value->str, now); } else if(item->value_type==ITEM_VALUE_TYPE_LOG) { if(GET_STR_RESULT(value)) - DBproxy_add_history_log(item->host_name, item->key, now, item->timestamp, item->eventlog_source, item->eventlog_severity, value->str); + DBproxy_add_history_log(item->itemid, value->str, now, item->timestamp, item->eventlog_source, item->eventlog_severity, item->lastlogsize); } else if(item->value_type==ITEM_VALUE_TYPE_TEXT) { if(GET_TEXT_RESULT(value)) - DBproxy_add_history_text(item->host_name, item->key, now, value->text); + DBproxy_add_history_text(item->itemid, value->str, now); } else { @@ -738,5 +745,6 @@ void proxy_process_new_value(DB_ITEM *item, AGENT_RESULT *value, time_t now) item->key); proxy_add_history(item, value, now); - proxy_update_item(item, value, now); + if (0 == CONFIG_DBSYNCER_FORKS) + proxy_update_item(item, value, now); } diff --git a/src/zabbix_proxy/datasender/datasender.c b/src/zabbix_proxy/datasender/datasender.c index b1f1898c..0e836259 100644 --- a/src/zabbix_proxy/datasender/datasender.c +++ b/src/zabbix_proxy/datasender/datasender.c @@ -33,6 +33,7 @@ struct history_field_t { const char *field; const char *tag; + zbx_json_type_t jt; }; struct history_table_t { @@ -48,13 +49,11 @@ struct last_ids { static ZBX_HISTORY_TABLE ht[]={ {"proxy_history", "history_lastid", { - {"host", ZBX_PROTO_TAG_HOST}, - {"key_", ZBX_PROTO_TAG_KEY}, - {"clock", ZBX_PROTO_TAG_CLOCK}, - {"timestamp", ZBX_PROTO_TAG_LOGTIMESTAMP}, - {"source", ZBX_PROTO_TAG_LOGSOURCE}, - {"severity", ZBX_PROTO_TAG_LOGSEVERITY}, - {"value", ZBX_PROTO_TAG_VALUE}, + {"clock", ZBX_PROTO_TAG_CLOCK, ZBX_JSON_TYPE_INT}, + {"timestamp", ZBX_PROTO_TAG_LOGTIMESTAMP, ZBX_JSON_TYPE_INT}, + {"source", ZBX_PROTO_TAG_LOGSOURCE, ZBX_JSON_TYPE_STRING}, + {"severity", ZBX_PROTO_TAG_LOGSEVERITY, ZBX_JSON_TYPE_INT}, + {"value", ZBX_PROTO_TAG_VALUE, ZBX_JSON_TYPE_STRING}, {NULL} } }, @@ -64,14 +63,14 @@ static ZBX_HISTORY_TABLE ht[]={ static ZBX_HISTORY_TABLE dht[]={ {"proxy_dhistory", "dhistory_lastid", { - {"clock", ZBX_PROTO_TAG_CLOCK}, - {"druleid", ZBX_PROTO_TAG_DRULE}, - {"type", ZBX_PROTO_TAG_TYPE}, - {"ip", ZBX_PROTO_TAG_IP}, - {"port", ZBX_PROTO_TAG_PORT}, - {"key_", ZBX_PROTO_TAG_KEY}, - {"value", ZBX_PROTO_TAG_VALUE}, - {"status", ZBX_PROTO_TAG_STATUS}, + {"clock", ZBX_PROTO_TAG_CLOCK, ZBX_JSON_TYPE_INT}, + {"druleid", ZBX_PROTO_TAG_DRULE, ZBX_JSON_TYPE_INT}, + {"type", ZBX_PROTO_TAG_TYPE, ZBX_JSON_TYPE_INT}, + {"ip", ZBX_PROTO_TAG_IP, ZBX_JSON_TYPE_STRING}, + {"port", ZBX_PROTO_TAG_PORT, ZBX_JSON_TYPE_INT}, + {"key_", ZBX_PROTO_TAG_KEY, ZBX_JSON_TYPE_STRING}, + {"value", ZBX_PROTO_TAG_VALUE, ZBX_JSON_TYPE_STRING}, + {"status", ZBX_PROTO_TAG_STATUS, ZBX_JSON_TYPE_INT}, {NULL} } }, @@ -187,9 +186,6 @@ static int get_history_data(struct zbx_json *j, const ZBX_HISTORY_TABLE *ht, zbx char sql[MAX_STRING_LEN]; DB_RESULT result; DB_ROW row; - const ZBX_FIELD *field; - const ZBX_TABLE *table; - zbx_json_type_t jt; zbx_uint64_t id; zabbix_log(LOG_LEVEL_DEBUG, "In get_history_data() [table:%s]", @@ -199,42 +195,28 @@ static int get_history_data(struct zbx_json *j, const ZBX_HISTORY_TABLE *ht, zbx get_lastid(ht, &id); - offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, "select id"); + offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, "select p.id,h.host,i.key_"); for (f = 0; ht->fields[f].field != NULL; f ++) - offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, ",%s", + offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, ",p.%s", ht->fields[f].field); - offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, " from %s" - " where id>" ZBX_FS_UI64 " order by id", + offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, " from hosts h,items i,%s p" + " where h.hostid=i.hostid and i.itemid=p.itemid and p.id>" ZBX_FS_UI64 " order by p.id", ht->table, id); result = DBselectN(sql, 1000); - table = DBget_table(ht->table); - while (NULL != (row = DBfetch(result))) { zbx_json_addobject(j, NULL); *lastid = zbx_atoui64(row[0]); + zbx_json_addstring(j, ZBX_PROTO_TAG_HOST, row[1], ZBX_JSON_TYPE_STRING); + zbx_json_addstring(j, ZBX_PROTO_TAG_KEY, row[2], ZBX_JSON_TYPE_STRING); - for (f = 0; ht->fields[f].field != NULL; f ++) { - field = DBget_field(table, ht->fields[f].field); - - switch (field->type) { - case ZBX_TYPE_ID: - case ZBX_TYPE_INT: - case ZBX_TYPE_UINT: - jt = ZBX_JSON_TYPE_INT; - break; - default : - jt = ZBX_JSON_TYPE_STRING; - break; - } - - zbx_json_addstring(j, ht->fields[f].tag, row[f + 1], jt); - } + for (f = 0; ht->fields[f].field != NULL; f ++) + zbx_json_addstring(j, ht->fields[f].tag, row[f + 3], ht->fields[f].jt); records++; @@ -267,9 +249,6 @@ static int get_dhistory_data(struct zbx_json *j, const ZBX_HISTORY_TABLE *ht, zb char sql[MAX_STRING_LEN]; DB_RESULT result; DB_ROW row; - const ZBX_FIELD *field; - const ZBX_TABLE *table; - zbx_json_type_t jt; zbx_uint64_t id; zabbix_log(LOG_LEVEL_DEBUG, "In get_dhistory_data() [table:%s]", @@ -292,29 +271,13 @@ static int get_dhistory_data(struct zbx_json *j, const ZBX_HISTORY_TABLE *ht, zb result = DBselectN(sql, 1000); - table = DBget_table(ht->table); - while (NULL != (row = DBfetch(result))) { zbx_json_addobject(j, NULL); *lastid = zbx_atoui64(row[0]); - for (f = 0; ht->fields[f].field != NULL; f ++) { - field = DBget_field(table, ht->fields[f].field); - - switch (field->type) { - case ZBX_TYPE_ID: - case ZBX_TYPE_INT: - case ZBX_TYPE_UINT: - jt = ZBX_JSON_TYPE_INT; - break; - default : - jt = ZBX_JSON_TYPE_STRING; - break; - } - - zbx_json_addstring(j, ht->fields[f].tag, row[f + 1], jt); - } + for (f = 0; ht->fields[f].field != NULL; f ++) + zbx_json_addstring(j, ht->fields[f].tag, row[f + 1], ht->fields[f].jt); records++; diff --git a/src/zabbix_proxy/proxy.c b/src/zabbix_proxy/proxy.c index df199b3c..eb7bc161 100644 --- a/src/zabbix_proxy/proxy.c +++ b/src/zabbix_proxy/proxy.c @@ -107,7 +107,7 @@ pid_t *threads=NULL; int CONFIG_CONFSYNCER_FORKS = 1; int CONFIG_DATASENDER_FORKS = 1; -int CONFIG_DBSYNCER_FORKS = 0;//1; +int CONFIG_DBSYNCER_FORKS = 1; int CONFIG_DISCOVERER_FORKS = 1; int CONFIG_HOUSEKEEPER_FORKS = 1; int CONFIG_PINGER_FORKS = 1; @@ -130,7 +130,7 @@ int CONFIG_HEARTBEAT_FREQUENCY = 60; int CONFIG_PROXYCONFIG_FREQUENCY = 3600*24; -int CONFIG_DATASENDER_FREQUENCY = 10; +int CONFIG_DATASENDER_FREQUENCY = 1; int CONFIG_SENDER_FREQUENCY = 30; int CONFIG_DBSYNCER_FREQUENCY = 5; @@ -199,6 +199,7 @@ void init_config(void) {"ServerPort",&CONFIG_SERVER_PORT,0,TYPE_INT,PARM_OPT,1024,32768}, {"Hostname",&CONFIG_HOSTNAME,0,TYPE_STRING,PARM_OPT,0,0}, + {"StartDBSyncers",&CONFIG_DBSYNCER_FORKS,0,TYPE_INT,PARM_OPT,0,16}, {"StartDiscoverers",&CONFIG_DISCOVERER_FORKS,0,TYPE_INT,PARM_OPT,0,255}, {"StartHTTPPollers",&CONFIG_HTTPPOLLER_FORKS,0,TYPE_INT,PARM_OPT,0,255}, {"StartPingers",&CONFIG_PINGER_FORKS,0,TYPE_INT,PARM_OPT,0,255}, @@ -350,7 +351,7 @@ int main(int argc, char **argv) if(CONFIG_DBSYNCER_FORKS!=0) { - init_database_cache(); + init_database_cache(ZBX_PROCESS_PROXY); } return daemon_start(CONFIG_ALLOW_ROOT); diff --git a/src/zabbix_proxy/zlog.c b/src/zabbix_proxy/zlog.c index 51570a02..6104e790 100644 --- a/src/zabbix_proxy/zlog.c +++ b/src/zabbix_proxy/zlog.c @@ -65,11 +65,14 @@ void __zbx_zabbix_syslog(const char *fmt, ...) /* This is made to disable writing to database for watchdog */ if(CONFIG_ENABLE_LOG == 0) return; - result = DBselect("select %s where h.hostid=i.hostid and i.key_='%s' and i.value_type=%d" DB_NODE, - ZBX_SQL_ITEM_SELECT, - SERVER_ZABBIXLOG_KEY, - ITEM_VALUE_TYPE_STR, - DBnode_local("h.hostid")); + result = DBselect("select %s where h.hostid=i.hostid and h.status=%d and i.status=%d" + " and h.proxy_hostid=0 and i.key_='%s' and i.value_type=%d" DB_NODE, + ZBX_SQL_ITEM_SELECT, + ITEM_STATUS_ACTIVE, + HOST_STATUS_MONITORED, + SERVER_ZABBIXLOG_KEY, + ITEM_VALUE_TYPE_STR, + DBnode_local("h.hostid")); now = time(NULL); @@ -84,10 +87,16 @@ void __zbx_zabbix_syslog(const char *fmt, ...) init_result(&agent); SET_STR_RESULT(&agent, strdup(value_str)); - process_new_value(&item, &agent, now); - free_result(&agent); - update_triggers(item.itemid); + if (0 == CONFIG_DBSYNCER_FORKS) + { + process_new_value(&item, &agent, now); + update_triggers(item.itemid); + } + else + process_new_value(&item, &agent, now); + + free_result(&agent); } DBfree_result(result); diff --git a/src/zabbix_server/dbsyncer/dbsyncer.c b/src/zabbix_server/dbsyncer/dbsyncer.c index cf28a7db..c341668b 100644 --- a/src/zabbix_server/dbsyncer/dbsyncer.c +++ b/src/zabbix_server/dbsyncer/dbsyncer.c @@ -22,6 +22,7 @@ #include "db.h" #include "log.h" #include "zlog.h" +#include "threads.h" #include "dbcache.h" #include "dbsyncer.h" @@ -43,9 +44,11 @@ ******************************************************************************/ int main_dbsyncer_loop() { - int now; + int now, sleeptime, last_sleeptime = -1, num; double sec; + zabbix_log(LOG_LEVEL_DEBUG, "In main_dbsyncer_loop()"); + zbx_setproctitle("db syncer [connecting to the database]"); DBconnect(ZBX_DB_CONNECT_NORMAL); @@ -55,18 +58,41 @@ int main_dbsyncer_loop() now = time(NULL); sec = zbx_time(); + num = DCsync_history(ZBX_SYNC_PARTIAL); + sec = zbx_time() - sec; + + if (last_sleeptime == -1) + { + sleeptime = now - time(NULL) + CONFIG_DBSYNCER_FREQUENCY; + } + else + { + sleeptime = last_sleeptime; + if (num >= ZBX_SYNC_MAX) + sleeptime--; + else if (num < ZBX_SYNC_MAX / 2) + sleeptime++; + } + + if (sleeptime < 0) + sleeptime = 0; + else if (sleeptime > CONFIG_DBSYNCER_FREQUENCY) + sleeptime = CONFIG_DBSYNCER_FREQUENCY; - DCsync(); + last_sleeptime = sleeptime; - zabbix_log(LOG_LEVEL_DEBUG, "Spent " ZBX_FS_DBL " sec", - zbx_time() - sec); + zabbix_log(LOG_LEVEL_DEBUG, "DB syncer spent " ZBX_FS_DBL " second while processing %d items. " + "Nextsync after %d sec.", + sec, + num, + sleeptime); - zbx_setproctitle("db syncer [sleeping for %d seconds]", - CONFIG_DBSYNCER_FREQUENCY); - zabbix_log(LOG_LEVEL_DEBUG, "Sleeping for %d seconds", - CONFIG_DBSYNCER_FREQUENCY); + if (sleeptime > 0) { + zbx_setproctitle("db syncer [sleeping for %d seconds]", + sleeptime); - sleep(CONFIG_DBSYNCER_FREQUENCY); + sleep(sleeptime); + } } DBclose(); } diff --git a/src/zabbix_server/httppoller/httptest.c b/src/zabbix_server/httppoller/httptest.c index 04bd5713..6b527477 100644 --- a/src/zabbix_server/httppoller/httptest.c +++ b/src/zabbix_server/httppoller/httptest.c @@ -84,17 +84,22 @@ static int process_value(zbx_uint64_t itemid, AGENT_RESULT *value) now = time(NULL); - DBbegin(); - switch (zbx_process) { - case ZBX_PROCESS_SERVER: - process_new_value(&item, value, now); - update_triggers(item.itemid); - break; - case ZBX_PROCESS_PROXY: - proxy_process_new_value(&item, value, now); - break; + if (0 == CONFIG_DBSYNCER_FORKS) + { + DBbegin(); + switch (zbx_process) { + case ZBX_PROCESS_SERVER: + process_new_value(&item, value, now); + update_triggers(item.itemid); + break; + case ZBX_PROCESS_PROXY: + proxy_process_new_value(&item, value, now); + break; + } + DBcommit(); } - DBcommit(); + else + process_new_value(&item, value, now); DBfree_result(result); diff --git a/src/zabbix_server/pinger/pinger.c b/src/zabbix_server/pinger/pinger.c index 2b98cb48..8796347d 100644 --- a/src/zabbix_server/pinger/pinger.c +++ b/src/zabbix_server/pinger/pinger.c @@ -28,6 +28,7 @@ #include "zbxicmpping.h" #include "pinger.h" +#include "dbcache.h" static zbx_process_t zbx_process; static int pinger_num; @@ -79,17 +80,25 @@ static void process_value(char *key, ZBX_FPING_HOST *host, AGENT_RESULT *value, while (NULL != (row = DBfetch(result))) { DBget_item_from_db(&item, row); - DBbegin(); - switch (zbx_process) { - case ZBX_PROCESS_SERVER: + if (0 == CONFIG_DBSYNCER_FORKS) + { + DBbegin(); + switch (zbx_process) { + case ZBX_PROCESS_SERVER: + process_new_value(&item, value, now); + update_triggers(item.itemid); + break; + case ZBX_PROCESS_PROXY: + proxy_process_new_value(&item, value, now); + break; + } + DBcommit(); + } + else + { process_new_value(&item, value, now); - update_triggers(item.itemid); - break; - case ZBX_PROCESS_PROXY: - proxy_process_new_value(&item, value, now); - break; + DCadd_nextcheck(&item, now, NULL); } - DBcommit(); (*items)++; } @@ -118,6 +127,9 @@ static int process_values(ZBX_FPING_HOST *hosts, int hosts_count, int now) zabbix_log(LOG_LEVEL_DEBUG, "In process_values()"); + if (0 != CONFIG_DBSYNCER_FORKS) + DCinit_nextchecks(); + for (i = 0; i < hosts_count; i++) { zabbix_log(LOG_LEVEL_DEBUG, "Host [%s] alive [%d] " ZBX_FS_DBL " sec.", hosts[i].addr, @@ -135,6 +147,9 @@ static int process_values(ZBX_FPING_HOST *hosts, int hosts_count, int now) free_result(&value); } + if (0 != CONFIG_DBSYNCER_FORKS) + DCflush_nextchecks(); + return items; } diff --git a/src/zabbix_server/poller/checks_agent.c b/src/zabbix_server/poller/checks_agent.c index 1ae90662..b6238336 100644 --- a/src/zabbix_server/poller/checks_agent.c +++ b/src/zabbix_server/poller/checks_agent.c @@ -61,21 +61,16 @@ int get_value_agent(DB_ITEM *item, AGENT_RESULT *result) addr, item->key); - zabbix_log(LOG_LEVEL_DEBUG, "Before zbx_tcp_connect"); if (SUCCEED == (ret = zbx_tcp_connect(&s, addr, item->port, 0))) { - zabbix_log(LOG_LEVEL_DEBUG, "After1 zbx_tcp_connect"); zbx_snprintf(packet, sizeof(packet), "%s\n",item->key); zabbix_log(LOG_LEVEL_DEBUG, "Sending [%s]", packet); /* Send requests using old protocol */ if( SUCCEED == (ret = zbx_tcp_send_raw(&s, packet)) ) { - zabbix_log(LOG_LEVEL_DEBUG, "Before read"); - ret = zbx_tcp_recv_ext(&s, &buf, ZBX_TCP_READ_UNTIL_CLOSE); } } - zabbix_log(LOG_LEVEL_DEBUG, "After2 zbx_tcp_connect"); if( SUCCEED == ret ) { diff --git a/src/zabbix_server/poller/poller.c b/src/zabbix_server/poller/poller.c index b7ec5b57..666014b5 100644 --- a/src/zabbix_server/poller/poller.c +++ b/src/zabbix_server/poller/poller.c @@ -24,6 +24,7 @@ #include "sysinfo.h" #include "daemon.h" #include "zbxserver.h" +#include "dbcache.h" #include "poller.h" @@ -100,7 +101,7 @@ int get_value(DB_ITEM *item, AGENT_RESULT *result) zabbix_log(LOG_LEVEL_DEBUG, "End get_value()"); return res; } - +/* static int get_minnextcheck(int now) { DB_RESULT result; @@ -108,11 +109,11 @@ static int get_minnextcheck(int now) int res; char istatus[16]; - +*/ /* Host status 0 == MONITORED 1 == NOT MONITORED 2 == UNREACHABLE */ - if(poller_type == ZBX_POLLER_TYPE_UNREACHABLE) +/* if(poller_type == ZBX_POLLER_TYPE_UNREACHABLE) { result = DBselect("select count(*),min(nextcheck) as nextcheck from items i,hosts h" " where " ZBX_SQL_MOD(h.hostid,%d) "=%d and i.nextcheck<=%d and i.status in (%d)" @@ -177,49 +178,54 @@ static int get_minnextcheck(int now) return res; } - +*/ /* Update special host's item - "status" */ static void update_key_status(zbx_uint64_t hostid, int host_status, time_t now) { -/* char value_str[MAX_STRING_LEN];*/ AGENT_RESULT agent; - DB_ITEM item; DB_RESULT result; DB_ROW row; - int update; zabbix_log(LOG_LEVEL_DEBUG, "In update_key_status(" ZBX_FS_UI64 ",%d)", - hostid, - host_status); + hostid, + host_status); - result = DBselect("select %s where h.hostid=i.hostid and h.proxy_hostid=0 and h.hostid=" ZBX_FS_UI64 " and i.key_='%s'", - ZBX_SQL_ITEM_SELECT, - hostid, - SERVER_STATUS_KEY); + result = DBselect("select %s where h.hostid=i.hostid and i.status=%d" + " and h.proxy_hostid=0 and i.key_='%s' and h.hostid=" ZBX_FS_UI64, + ZBX_SQL_ITEM_SELECT, + ITEM_STATUS_ACTIVE, + SERVER_STATUS_KEY, + hostid); - while (NULL != (row = DBfetch(result))) { - DBget_item_from_db(&item,row); + while (NULL != (row = DBfetch(result))) + { + DBget_item_from_db(&item, row); /* Do not process new value for status, if previous status is the same */ - update = (item.lastvalue_null==1); - update = update || ((item.value_type == ITEM_VALUE_TYPE_FLOAT) &&(cmp_double(item.lastvalue_dbl, (double)host_status) == 1)); - update = update || ((item.value_type == ITEM_VALUE_TYPE_UINT64) &&(item.lastvalue_uint64 != host_status)); + update = (item.lastvalue_null == 1); + update = update || (item.value_type == ITEM_VALUE_TYPE_FLOAT && cmp_double(item.lastvalue_dbl, (double)host_status) == 1); + update = update || (item.value_type == ITEM_VALUE_TYPE_UINT64 && item.lastvalue_uint64 != host_status); if (update) { init_result(&agent); SET_UI64_RESULT(&agent, host_status); - switch (zbx_process) { - case ZBX_PROCESS_SERVER: - process_new_value(&item, &agent, now); - update_triggers(item.itemid); - break; - case ZBX_PROCESS_PROXY: - proxy_process_new_value(&item, &agent, now); - break; + if (0 == CONFIG_DBSYNCER_FORKS) + { + switch (zbx_process) { + case ZBX_PROCESS_SERVER: + process_new_value(&item, &agent, now); + update_triggers(item.itemid); + break; + case ZBX_PROCESS_PROXY: + proxy_process_new_value(&item, &agent, now); + break; + } } + else + process_new_value(&item, &agent, now); free_result(&agent); } @@ -228,54 +234,34 @@ static void update_key_status(zbx_uint64_t hostid, int host_status, time_t now) DBfree_result(result); } -static void enable_host(DB_ITEM *item, time_t now, char *error) +static void enable_host(DB_ITEM *item, time_t now) { assert(item); - zabbix_log(LOG_LEVEL_WARNING, "Enabling host [%s]", - item->host_name); - zabbix_syslog("Enabling host [%s]", - item->host_name); - switch (zbx_process) { case ZBX_PROCESS_SERVER: - DBupdate_host_availability(item->hostid, HOST_AVAILABLE_TRUE, now, error); + DBupdate_host_availability(item, HOST_AVAILABLE_TRUE, now, NULL); update_key_status(item->hostid, HOST_STATUS_MONITORED, now); /* 0 */ break; case ZBX_PROCESS_PROXY: - DBproxy_update_host_availability(item->hostid, HOST_AVAILABLE_TRUE, now); + DBproxy_update_host_availability(item, HOST_AVAILABLE_TRUE, now); break; } - - item->host_available = HOST_AVAILABLE_TRUE; } static void disable_host(DB_ITEM *item, time_t now, char *error) { assert(item); - zabbix_log(LOG_LEVEL_WARNING, "Host [%s] will be checked after %d seconds", - item->host_name, - CONFIG_UNAVAILABLE_DELAY); - zabbix_syslog("Host [%s] will be checked after %d seconds", - item->host_name, - CONFIG_UNAVAILABLE_DELAY); - switch (zbx_process) { case ZBX_PROCESS_SERVER: - DBupdate_host_availability(item->hostid, HOST_AVAILABLE_FALSE, now, error); + DBupdate_host_availability(item, HOST_AVAILABLE_FALSE, now, error); update_key_status(item->hostid, HOST_AVAILABLE_FALSE, now); /* 2 */ break; case ZBX_PROCESS_PROXY: - DBproxy_update_host_availability(item->hostid, HOST_AVAILABLE_FALSE, now); + DBproxy_update_host_availability(item, HOST_AVAILABLE_FALSE, now); break; } - - item->host_available = HOST_AVAILABLE_FALSE; - - DBexecute("update hosts set disable_until=%d where hostid=" ZBX_FS_UI64, - now + CONFIG_UNAVAILABLE_DELAY, - item->hostid); } /****************************************************************************** @@ -293,14 +279,13 @@ static void disable_host(DB_ITEM *item, time_t now, char *error) * Comments: always SUCCEED * * * ******************************************************************************/ -int get_values(void) +static int get_values(int now) { DB_RESULT result; DB_RESULT result2; DB_ROW row; DB_ROW row2; - time_t now; int delay; int res; DB_ITEM item; @@ -308,14 +293,20 @@ int get_values(void) int stop = 0, items = 0; char *unreachable_hosts = NULL; - char tmp[MAX_STRING_LEN], istatus[16]; + int unreachable_hosts_alloc = 128, + unreachable_hosts_offset = 0; + + char istatus[16]; zabbix_log( LOG_LEVEL_DEBUG, "In get_values()"); + if (0 != CONFIG_DBSYNCER_FORKS) + DCinit_nextchecks(); + now = time(NULL); - zbx_snprintf(tmp,sizeof(tmp)-1,ZBX_FS_UI64,0); - unreachable_hosts=zbx_strdcat(unreachable_hosts,tmp); + unreachable_hosts = zbx_malloc(unreachable_hosts, unreachable_hosts_alloc); + *unreachable_hosts = '\0'; /* Poller for unreachable hosts */ if(poller_type == ZBX_POLLER_TYPE_UNREACHABLE) @@ -327,7 +318,7 @@ int get_values(void) " and i.key_ not in ('%s','%s','%s','%s')" DB_NODE " group by h.hostid", CONFIG_UNREACHABLE_POLLER_FORKS, poller_num-1, - now, + now + POLLER_DELAY, ITEM_STATUS_ACTIVE, ITEM_TYPE_TRAPPER, ITEM_TYPE_ZABBIX_ACTIVE, ITEM_TYPE_HTTPTEST, HOST_STATUS_MONITORED, @@ -352,7 +343,7 @@ int get_values(void) " and " ZBX_SQL_MOD(i.itemid,%d) "=%d and i.key_ not in ('%s','%s','%s','%s')" DB_NODE " order by i.nextcheck", ZBX_SQL_ITEM_SELECT, - now, + now + POLLER_DELAY, istatus, ITEM_TYPE_TRAPPER, ITEM_TYPE_ZABBIX_ACTIVE, ITEM_TYPE_HTTPTEST, HOST_STATUS_MONITORED, @@ -406,51 +397,55 @@ int get_values(void) now = time(NULL); - DBbegin(); - - if(res == SUCCEED ) + if (res == SUCCEED) { - switch (zbx_process) { - case ZBX_PROCESS_SERVER: - process_new_value(&item, &agent, now); - break; - case ZBX_PROCESS_PROXY: - proxy_process_new_value(&item, &agent, now); - break; - } - - if (HOST_AVAILABLE_TRUE != item.host_available) { - enable_host(&item, now, agent.msg); + if (HOST_AVAILABLE_TRUE != item.host_available) + { + DBbegin(); + + enable_host(&item, now); stop = 1; + + DBcommit(); } - if (item.host_errors_from != 0) { + + if (item.host_errors_from != 0) + { + DBbegin(); + DBexecute("update hosts set errors_from=0 where hostid=" ZBX_FS_UI64, item.hostid); stop = 1; - } - switch (zbx_process) { - case ZBX_PROCESS_SERVER: - update_triggers(item.itemid); - break; - default: - /* nothing */; + DBcommit(); } - } - else if(res == NOTSUPPORTED || res == AGENT_ERROR) - { - if(item.status == ITEM_STATUS_NOTSUPPORTED) + if (0 == CONFIG_DBSYNCER_FORKS) { - /* It is not correct */ -/* snprintf(sql,sizeof(sql)-1,"update items set nextcheck=%d, lastclock=%d where itemid=%d",calculate_item_nextcheck(item.itemid, CONFIG_REFRESH_UNSUPPORTED,now), now, item.itemid);*/ - DBexecute("update items set nextcheck=%d, lastclock=%d where itemid=" ZBX_FS_UI64, - CONFIG_REFRESH_UNSUPPORTED+now, - now, - item.itemid); + DBbegin(); + + switch (zbx_process) { + case ZBX_PROCESS_SERVER: + process_new_value(&item, &agent, now); + update_triggers(item.itemid); + break; + case ZBX_PROCESS_PROXY: + proxy_process_new_value(&item, &agent, now); + break; + } + + DBcommit(); } else { + process_new_value(&item, &agent, now); + DCadd_nextcheck(&item, now, NULL); + } + } + else if (res == NOTSUPPORTED || res == AGENT_ERROR) + { + if (item.status != ITEM_STATUS_NOTSUPPORTED) + { zabbix_log(LOG_LEVEL_WARNING, "Parameter [%s] is not supported by agent on host [%s] Old status [%d]", item.key, item.host_name, @@ -458,89 +453,101 @@ int get_values(void) zabbix_syslog("Parameter [%s] is not supported by agent on host [%s]", item.key, item.host_name); + } - switch (zbx_process) { - case ZBX_PROCESS_SERVER: - DBupdate_item_status_to_notsupported(item.itemid, agent.msg); - break; - case ZBX_PROCESS_PROXY: - DBproxy_update_item_status_to_notsupported(item.itemid); - break; - } + if (0 == CONFIG_DBSYNCER_FORKS) + { + DBbegin(); + + DBupdate_item_status_to_notsupported(&item, now, agent.msg); - /* if(HOST_STATUS_UNREACHABLE == item.host_status)*/ - if (HOST_AVAILABLE_TRUE != item.host_available) { - enable_host(&item, now, agent.msg); - stop = 1; - } + DBcommit(); + } + else + DCadd_nextcheck(&item, now, agent.msg); + + if (HOST_AVAILABLE_TRUE != item.host_available) { + DBbegin(); + + enable_host(&item, now); + stop = 1; + + DBcommit(); } } - else if(res == NETWORK_ERROR) + else if (res == NETWORK_ERROR) { + DBbegin(); + /* First error */ - if(item.host_errors_from==0) + if (item.host_errors_from == 0) { zabbix_log( LOG_LEVEL_WARNING, "Host [%s]: first network error, wait for %d seconds", - item.host_name, - CONFIG_UNREACHABLE_DELAY); + item.host_name, + CONFIG_UNREACHABLE_DELAY); zabbix_syslog("Host [%s]: first network error, wait for %d seconds", - item.host_name, - CONFIG_UNREACHABLE_DELAY); + item.host_name, + CONFIG_UNREACHABLE_DELAY); - item.host_errors_from=now; DBexecute("update hosts set errors_from=%d,disable_until=%d where hostid=" ZBX_FS_UI64, - now, - now+CONFIG_UNREACHABLE_DELAY, - item.hostid); + now, + now + CONFIG_UNREACHABLE_DELAY, + item.hostid); + + item.host_errors_from = now; delay = MIN(4*item.delay, 300); - zabbix_log( LOG_LEVEL_WARNING, "Parameter [%s] will be checked after %d seconds on host [%s]", - item.key, - delay, - item.host_name); + + zabbix_log(LOG_LEVEL_WARNING, "Parameter [%s] will be checked after %d seconds on host [%s]", + item.key, + delay, + item.host_name); + DBexecute("update items set nextcheck=%d where itemid=" ZBX_FS_UI64, - now + delay, - item.itemid); + now + delay, + item.itemid); } else { - if (now - item.host_errors_from > CONFIG_UNREACHABLE_PERIOD) { + if (now - item.host_errors_from > CONFIG_UNREACHABLE_PERIOD) + { disable_host(&item, now, agent.msg); } - /* Still unavailable, but won't change status to UNAVAILABLE yet */ else { - zabbix_log( LOG_LEVEL_WARNING, "Host [%s]: another network error, wait for %d seconds", - item.host_name, - CONFIG_UNREACHABLE_DELAY); + /* Still unavailable, but won't change status to UNAVAILABLE yet */ + zabbix_log(LOG_LEVEL_WARNING, "Host [%s]: another network error, wait for %d seconds", + item.host_name, + CONFIG_UNREACHABLE_DELAY); zabbix_syslog("Host [%s]: another network error, wait for %d seconds", - item.host_name, - CONFIG_UNREACHABLE_DELAY); + item.host_name, + CONFIG_UNREACHABLE_DELAY); DBexecute("update hosts set disable_until=%d where hostid=" ZBX_FS_UI64, - now+CONFIG_UNREACHABLE_DELAY, - item.hostid); + now + CONFIG_UNREACHABLE_DELAY, + item.hostid); } } - zbx_snprintf(tmp,sizeof(tmp)-1,"," ZBX_FS_UI64,item.hostid); - unreachable_hosts=zbx_strdcat(unreachable_hosts,tmp); + DBcommit(); -/* stop=1;*/ + zbx_snprintf_alloc(&unreachable_hosts, &unreachable_hosts_alloc, &unreachable_hosts_offset, 32, + "%s" ZBX_FS_UI64, + 0 == unreachable_hosts_offset ? "" : ",", + item.hostid); } else { - zabbix_log( LOG_LEVEL_CRIT, "Unknown response code returned."); + zabbix_log(LOG_LEVEL_CRIT, "Unknown response code returned."); assert(0==1); } /* Poller for unreachable hosts */ - if(poller_type == ZBX_POLLER_TYPE_UNREACHABLE) + if (poller_type == ZBX_POLLER_TYPE_UNREACHABLE) { /* We cannot freeit earlier because items has references to the structure */ DBfree_result(result2); } free_result(&agent); - DBcommit(); items++; } @@ -548,7 +555,12 @@ int get_values(void) zbx_free(unreachable_hosts); DBfree_result(result); - zabbix_log( LOG_LEVEL_DEBUG, "End get_values()"); + + if (0 != CONFIG_DBSYNCER_FORKS) + DCflush_nextchecks(); + + zabbix_log(LOG_LEVEL_DEBUG, "End get_values()"); + return items; } @@ -556,7 +568,7 @@ void main_poller_loop(zbx_process_t p, int type, int num) { struct sigaction phan; int now; - int nextcheck, sleeptime; + int /*nextcheck, */sleeptime; int items; double sec; @@ -579,31 +591,32 @@ void main_poller_loop(zbx_process_t p, int type, int num) zbx_setproctitle("poller [getting values]"); now = time(NULL); - sec = zbx_time(); - items = get_values(); + items = get_values(now); sec = zbx_time() - sec; - nextcheck = get_minnextcheck(now); +/* nextcheck = get_minnextcheck(now);*/ - zabbix_log(LOG_LEVEL_DEBUG, "Poller spent " ZBX_FS_DBL " seconds while updating %3d values. Nextcheck: %d Time: %d", + zabbix_log(LOG_LEVEL_DEBUG, "Poller spent " ZBX_FS_DBL " seconds while updating %3d values." + /*" Nextcheck: %d Time: %d"*/, sec, - items, + items/*, nextcheck, - (int)time(NULL)); + (int)time(NULL)*/); - if( FAIL == nextcheck) +/* if( FAIL == nextcheck) { sleeptime=POLLER_DELAY; } else { - sleeptime=nextcheck-time(NULL); + sleeptime=nextcheck-time(NULL);*/ + sleeptime = now - time(NULL) + 1; if(sleeptime<0) { sleeptime=0; } - } +/* }*/ if(sleeptime>0) { if(sleeptime > POLLER_DELAY) diff --git a/src/zabbix_server/server.c b/src/zabbix_server/server.c index 272bdf8d..89982cf7 100644 --- a/src/zabbix_server/server.c +++ b/src/zabbix_server/server.c @@ -117,7 +117,7 @@ static char shortopts[] = pid_t *threads=NULL; int CONFIG_ALERTER_FORKS = 1; -int CONFIG_DBSYNCER_FORKS = 0; +int CONFIG_DBSYNCER_FORKS = 1; int CONFIG_DISCOVERER_FORKS = 1; int CONFIG_HOUSEKEEPER_FORKS = 1; int CONFIG_NODEWATCHER_FORKS = 1; @@ -195,7 +195,7 @@ void init_config(void) static struct cfg_line cfg[]= { /* PARAMETER ,VAR ,FUNC, TYPE(0i,1s),MANDATORY,MIN,MAX */ -/* {"StartDBSyncers",&CONFIG_DBSYNCER_FORKS,0,TYPE_INT,PARM_OPT,0,1},*/ + {"StartDBSyncers",&CONFIG_DBSYNCER_FORKS,0,TYPE_INT,PARM_OPT,0,16}, {"StartDiscoverers",&CONFIG_DISCOVERER_FORKS,0,TYPE_INT,PARM_OPT,0,255}, {"StartHTTPPollers",&CONFIG_HTTPPOLLER_FORKS,0,TYPE_INT,PARM_OPT,0,255}, {"StartPingers",&CONFIG_PINGER_FORKS,0,TYPE_INT,PARM_OPT,0,255}, @@ -929,7 +929,7 @@ int main(int argc, char **argv) if(CONFIG_DBSYNCER_FORKS!=0) { - init_database_cache(); + init_database_cache(ZBX_PROCESS_SERVER); } switch (task) { @@ -1073,10 +1073,10 @@ int MAIN_ZABBIX_ENTRY(void) trend(); return 0; #endif - threads = calloc(1+CONFIG_POLLER_FORKS+CONFIG_TRAPPERD_FORKS+CONFIG_PINGER_FORKS+CONFIG_ALERTER_FORKS - +CONFIG_HOUSEKEEPER_FORKS+CONFIG_TIMER_FORKS+CONFIG_UNREACHABLE_POLLER_FORKS - +CONFIG_NODEWATCHER_FORKS+CONFIG_HTTPPOLLER_FORKS+CONFIG_DISCOVERER_FORKS+CONFIG_ESCALATOR_FORKS, - sizeof(pid_t)); + threads = calloc(1 + CONFIG_POLLER_FORKS + CONFIG_TRAPPERD_FORKS + CONFIG_PINGER_FORKS + CONFIG_ALERTER_FORKS + + CONFIG_HOUSEKEEPER_FORKS + CONFIG_TIMER_FORKS + CONFIG_UNREACHABLE_POLLER_FORKS + + CONFIG_NODEWATCHER_FORKS + CONFIG_HTTPPOLLER_FORKS + CONFIG_DISCOVERER_FORKS + CONFIG_DBSYNCER_FORKS + + CONFIG_ESCALATOR_FORKS, sizeof(pid_t)); if(CONFIG_TRAPPERD_FORKS > 0) { @@ -1087,8 +1087,10 @@ int MAIN_ZABBIX_ENTRY(void) } } - for( i=1; - i<=CONFIG_POLLER_FORKS+CONFIG_TRAPPERD_FORKS+CONFIG_PINGER_FORKS+CONFIG_ALERTER_FORKS+CONFIG_HOUSEKEEPER_FORKS+CONFIG_TIMER_FORKS+CONFIG_UNREACHABLE_POLLER_FORKS+CONFIG_NODEWATCHER_FORKS+CONFIG_HTTPPOLLER_FORKS+CONFIG_DISCOVERER_FORKS+CONFIG_DBSYNCER_FORKS+CONFIG_ESCALATOR_FORKS; + for ( i = 1; i <= CONFIG_POLLER_FORKS + CONFIG_TRAPPERD_FORKS + CONFIG_PINGER_FORKS + CONFIG_ALERTER_FORKS + + CONFIG_HOUSEKEEPER_FORKS + CONFIG_TIMER_FORKS + CONFIG_UNREACHABLE_POLLER_FORKS + + CONFIG_NODEWATCHER_FORKS + CONFIG_HTTPPOLLER_FORKS + CONFIG_DISCOVERER_FORKS + CONFIG_DBSYNCER_FORKS + + CONFIG_ESCALATOR_FORKS; i++) { if((pid = zbx_fork()) == 0) @@ -1247,10 +1249,13 @@ void zbx_on_exit() if(threads != NULL) { - for(i = 1; i <= CONFIG_POLLER_FORKS+CONFIG_TRAPPERD_FORKS+CONFIG_PINGER_FORKS+CONFIG_ALERTER_FORKS+CONFIG_HOUSEKEEPER_FORKS+CONFIG_TIMER_FORKS+CONFIG_UNREACHABLE_POLLER_FORKS+CONFIG_NODEWATCHER_FORKS+CONFIG_HTTPPOLLER_FORKS+CONFIG_DISCOVERER_FORKS+CONFIG_DBSYNCER_FORKS+CONFIG_ESCALATOR_FORKS; i++) + for (i = 1; i <= CONFIG_POLLER_FORKS + CONFIG_TRAPPERD_FORKS + CONFIG_PINGER_FORKS + CONFIG_ALERTER_FORKS + + CONFIG_HOUSEKEEPER_FORKS + CONFIG_TIMER_FORKS + CONFIG_UNREACHABLE_POLLER_FORKS + + CONFIG_NODEWATCHER_FORKS + CONFIG_HTTPPOLLER_FORKS + CONFIG_DISCOVERER_FORKS + CONFIG_DBSYNCER_FORKS + + CONFIG_ESCALATOR_FORKS; i++) { - if(threads[i]) { - kill(threads[i],SIGTERM); + if (threads[i]) { + kill(threads[i], SIGTERM); threads[i] = (ZBX_THREAD_HANDLE)NULL; } } diff --git a/src/zabbix_server/trapper/trapper.c b/src/zabbix_server/trapper/trapper.c index 621e9f78..22ba5a9b 100644 --- a/src/zabbix_server/trapper/trapper.c +++ b/src/zabbix_server/trapper/trapper.c @@ -27,6 +27,7 @@ #include "zlog.h" #include "zbxjson.h" #include "zbxserver.h" +#include "dbcache.h" #include "../nodewatcher/nodecomms.h" #include "../nodewatcher/nodesender.h" @@ -136,31 +137,41 @@ static void calc_timestamp(char *line,int *timestamp, char *format) * Comments: for trapper server process * * * ******************************************************************************/ -static int process_data(zbx_sock_t *sock, zbx_uint64_t proxy_hostid, time_t now, char *server, char *key, char *value, - char *lastlogsize, char *timestamp, char *source, char *severity) +static void process_mass_data(zbx_sock_t *sock, zbx_uint64_t proxy_hostid, AGENT_VALUE *values, int value_num, int *processed) { AGENT_RESULT agent; DB_RESULT result; DB_ROW row; DB_ITEM item; - char server_esc[MAX_STRING_LEN], key_esc[MAX_STRING_LEN]; - char item_types[32]; + char host_esc[MAX_STRING_LEN], key_esc[MAX_STRING_LEN]; + static char *sql = NULL; + static int sql_allocated = 65536; + int sql_offset = 0, i; - zabbix_log(LOG_LEVEL_DEBUG, "In process_data([%s],[%s],[%s],[%s])", - server, - key, - value, - lastlogsize); + zabbix_log(LOG_LEVEL_DEBUG, "In process_mass_data()"); - DBescape_string(server, server_esc, MAX_STRING_LEN); - DBescape_string(key, key_esc, MAX_STRING_LEN); + if (NULL == sql) + sql = zbx_malloc(sql, sql_allocated); - if (proxy_hostid == 0) { - zbx_snprintf(item_types, sizeof(item_types), "%d,%d", + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 2048, + "select %s where h.hostid=i.hostid and h.proxy_hostid=" ZBX_FS_UI64 + " and h.status=%d and i.status in (%d,%d)", + ZBX_SQL_ITEM_SELECT, + proxy_hostid, + HOST_STATUS_MONITORED, + ITEM_STATUS_ACTIVE, ITEM_STATUS_NOTSUPPORTED); + + if (proxy_hostid == 0) + { + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + " and i.type in (%d,%d)", ITEM_TYPE_TRAPPER, ITEM_TYPE_ZABBIX_ACTIVE); - } else { - zbx_snprintf(item_types, sizeof(item_types), "%d,%d,%d,%d,%d,%d,%d,%d,%d", + } + else + { + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 64, + " and i.type in (%d,%d,%d,%d,%d,%d,%d,%d,%d)", ITEM_TYPE_ZABBIX, ITEM_TYPE_SNMPv1, ITEM_TYPE_TRAPPER, @@ -172,112 +183,116 @@ static int process_data(zbx_sock_t *sock, zbx_uint64_t proxy_hostid, time_t now, ITEM_TYPE_EXTERNAL); } - result = DBselect("select %s where h.status=%d and h.hostid=i.hostid and h.host='%s' and h.proxy_hostid=" ZBX_FS_UI64 - " and i.key_='%s' and i.status in (%d,%d) and i.type in (%s)" DB_NODE, - ZBX_SQL_ITEM_SELECT, - HOST_STATUS_MONITORED, - server_esc, - proxy_hostid, - key_esc, - ITEM_STATUS_ACTIVE, ITEM_STATUS_NOTSUPPORTED, - item_types, + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 8, " and ("); + + for (i = 0; i < value_num; i++) + { + DBescape_string(values[i].host_name, host_esc, sizeof(host_esc)); + DBescape_string(values[i].key, key_esc, sizeof(key_esc)); + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + "(h.host='%s' and i.key_='%s') or ", + host_esc, + key_esc); + } + + sql_offset -= 4; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, ")" DB_NODE, DBnode_local("h.hostid")); - if (NULL == (row = DBfetch(result))) { - DBfree_result(result); - return FAIL; -/* - zabbix_log( LOG_LEVEL_DEBUG, "Before checking autoregistration for [%s]", - server); + result = DBselect("%s", sql); - if(autoregister(server) == SUCCEED) - { - DBfree_result(result); - - result = DBselect("select %s where h.status=%d and h.hostid=i.hostid and h.host='%s' and i.key_='%s' and i.status=%d and i.type in (%d,%d) and" ZBX_COND_NODEID, - ZBX_SQL_ITEM_SELECT, - HOST_STATUS_MONITORED, - server_esc, - key_esc, - ITEM_STATUS_ACTIVE, - ITEM_TYPE_TRAPPER, - ITEM_TYPE_ZABBIX_ACTIVE, - LOCAL_NODE("h.hostid")); - row = DBfetch(result); - if(!row) - { - DBfree_result(result); - return FAIL; - } - } - else - { - DBfree_result(result); - return FAIL; - } -*/ - } + if (0 != CONFIG_DBSYNCER_FORKS) + DCinit_nextchecks(); - DBget_item_from_db(&item, row); + while (NULL != (row = DBfetch(result))) { + DBget_item_from_db(&item, row); - if (item.type == ITEM_TYPE_ZABBIX_ACTIVE && FAIL == zbx_tcp_check_security(sock, item.trapper_hosts, 1)) { - DBfree_result(result); - return FAIL; - } + if (item.type == ITEM_TYPE_ZABBIX_ACTIVE && FAIL == zbx_tcp_check_security(sock, item.trapper_hosts, 1)) + continue; - zabbix_log(LOG_LEVEL_DEBUG, "Processing [%s]", - value); - - if (0 == strcmp(value, "ZBX_NOTSUPPORTED")) { - zabbix_log( LOG_LEVEL_WARNING, "Active parameter [%s] is not supported by agent on host [%s]", - item.key, - item.host_name); - zabbix_syslog("Active parameter [%s] is not supported by agent on host [%s]", - item.key, - item.host_name); - DBupdate_item_status_to_notsupported(item.itemid, "Not supported by ZABBIX agent"); - } else { - if (0 == strncmp(item.key, "log[", 4) || 0 == strncmp(item.key, "eventlog[", 9)) { - item.lastlogsize = atoi(lastlogsize); - item.timestamp = atoi(timestamp); - - calc_timestamp(value, &item.timestamp, item.logtimefmt); - - item.eventlog_severity = atoi(severity); - item.eventlog_source = source; - zabbix_log(LOG_LEVEL_DEBUG, "Value [%s] Lastlogsize [%s] Timestamp [%s]", - value, - lastlogsize, - timestamp); - } + for (i = 0; i < value_num; i++) + { + if (0 == strcmp(item.host_name, values[i].host_name) && 0 == strcmp(item.key, values[i].key)) { +/* zabbix_log(LOG_LEVEL_DEBUG, "Processing [%s@%s: \"%s\"]", + item.key, + item.host_name, + values[i].value);*/ - init_result(&agent); + if (0 == strcmp(values[i].value, "ZBX_NOTSUPPORTED")) + { + zabbix_log(LOG_LEVEL_WARNING, "Active parameter [%s] is not supported by agent on host [%s]", + item.key, + item.host_name); + zabbix_syslog("Active parameter [%s] is not supported by agent on host [%s]", + item.key, + item.host_name); - if( SUCCEED == set_result_type(&agent, item.value_type, value)) { - switch (zbx_process) { - case ZBX_PROCESS_SERVER: - process_new_value(&item, &agent, now); - update_triggers(item.itemid); - break; - case ZBX_PROCESS_PROXY: - proxy_process_new_value(&item, &agent, now); - break; + if (0 == CONFIG_DBSYNCER_FORKS) + DBupdate_item_status_to_notsupported(&item, values[i].clock, "Not supported by ZABBIX agent"); + else + DCadd_nextcheck(&item, values[i].clock, "Not supported by ZABBIX agent"); + + (*processed)++; + + } + else + { + if (0 == strncmp(item.key, "log[", 4) || 0 == strncmp(item.key, "eventlog[", 9)) + { + item.lastlogsize = values[i].lastlogsize; + item.timestamp = values[i].timestamp; + + calc_timestamp(values[i].value, &item.timestamp, item.logtimefmt); + + item.eventlog_severity = values[i].severity; + item.eventlog_source = values[i].source; + +/* zabbix_log(LOG_LEVEL_DEBUG, "Value [%s] Lastlogsize [%s] Timestamp [%s]", + values[i].value, + item.lastlogsize, + item.timestamp);*/ + } + + init_result(&agent); + + if (SUCCEED == set_result_type(&agent, item.value_type, values[i].value)) + { + if (0 == CONFIG_DBSYNCER_FORKS) + { + switch (zbx_process) { + case ZBX_PROCESS_SERVER: + process_new_value(&item, &agent, values[i].clock); + update_triggers(item.itemid); + break; + case ZBX_PROCESS_PROXY: + proxy_process_new_value(&item, &agent, values[i].clock); + break; + } + } + else + process_new_value(&item, &agent, values[i].clock); + (*processed)++; + } + else + { + zabbix_log( LOG_LEVEL_WARNING, "Type of received value [%s] is not suitable for [%s@%s]", + values[i].value, + item.key, + item.host_name); + zabbix_syslog("Type of received value [%s] is not suitable for [%s@%s]", + values[i].value, + item.key, + item.host_name); + } + free_result(&agent); + } } - } else { - zabbix_log( LOG_LEVEL_WARNING, "Type of received value [%s] is not suitable for [%s@%s]", - value, - item.key, - item.host_name); - zabbix_syslog("Type of received value [%s] is not suitable for [%s@%s]", - value, - item.key, - item.host_name); } - free_result(&agent); - } + } DBfree_result(result); - return SUCCEED; + if (0 != CONFIG_DBSYNCER_FORKS) + DCflush_nextchecks(); } /****************************************************************************** @@ -315,6 +330,17 @@ int send_result(zbx_sock_t *sock, int result, char *info) return ret; } +static void clean_agent_values(AGENT_VALUE *values, int value_num) +{ + int i; + + for (i = 0; i < value_num; i ++) { + zbx_free(values[i].value); + if (NULL != values[i].source) + zbx_free(values[i].source); + } +} + /****************************************************************************** * * * Function: process_new_values * @@ -335,22 +361,29 @@ static int process_new_values(zbx_sock_t *sock, struct zbx_json_parse *jp, const { struct zbx_json_parse jp_data, jp_row; const char *p; - char host[HOST_HOST_LEN_MAX], key[ITEM_KEY_LEN_MAX], - value[MAX_STRING_LEN], info[MAX_STRING_LEN], lastlogsize[MAX_STRING_LEN], + char /*host[HOST_HOST_LEN_MAX], key[ITEM_KEY_LEN_MAX], + value[MAX_STRING_LEN], */info[MAX_STRING_LEN], /*lastlogsize[MAX_STRING_LEN], timestamp[MAX_STRING_LEN], source[MAX_STRING_LEN], severity[MAX_STRING_LEN], - clock[MAX_STRING_LEN]; + clock[MAX_STRING_LEN], */tmp[MAX_STRING_LEN]; int ret = SUCCEED; - int processed_ok = 0, processed_fail = 0; + int processed = 0, processed_fail = 0; double sec; - time_t now, hosttime = 0, itemtime; + time_t now, hosttime = 0/*, itemtime*/; + +#define VALUES_MAX 256 + static AGENT_VALUE *values = NULL; + int value_num = 0; zabbix_log(LOG_LEVEL_DEBUG, "In process_new_values()"); now = time(NULL); sec = zbx_time(); - if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_CLOCK, clock, sizeof(clock))) - hosttime = atoi(clock); + if (NULL == values) + values = zbx_malloc(values, VALUES_MAX * sizeof(AGENT_VALUE)); + + if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_CLOCK, tmp, sizeof(tmp))) + hosttime = atoi(tmp); /* {"request":"ZBX_SENDER_DATA","data":[{"key":"system.cpu.num",...,...},{...},...]} * ^ @@ -379,41 +412,52 @@ static int process_new_values(zbx_sock_t *sock, struct zbx_json_parse *jp, const */ if (FAIL == (ret = zbx_json_brackets_open(p, &jp_row))) break; - zabbix_log(LOG_LEVEL_DEBUG, "Next \"%.*s\"", +/* zabbix_log(LOG_LEVEL_DEBUG, "Next \"%.*s\"", jp_row.end - jp_row.start + 1, - jp_row.start); - - *host = '\0'; - *key = '\0'; - *value = '\0'; - *lastlogsize = '\0'; - *timestamp = '\0'; - *source = '\0'; - *severity = '\0'; - itemtime = now; - - if (hosttime) - if (SUCCEED == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_CLOCK, clock, sizeof(clock))) - itemtime -= hosttime - atoi(clock); + jp_row.start);*/ + + memset(&values[value_num], 0, sizeof(AGENT_VALUE)); - if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_HOST, host, sizeof(host))) + values[value_num].clock = now; + + if (hosttime && SUCCEED == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_CLOCK, tmp, sizeof(tmp))) + values[value_num].clock -= hosttime - atoi(tmp); + + if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_HOST, values[value_num].host_name, sizeof(values[value_num].host_name))) continue; - if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_KEY, key, sizeof(key))) + if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_KEY, values[value_num].key, sizeof(values[value_num].key))) continue; - if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_VALUE, value, sizeof(value))) + if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_VALUE, tmp, sizeof(tmp))) continue; - zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LOGLASTSIZE, lastlogsize, sizeof(lastlogsize)); + values[value_num].value = strdup(tmp); - zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LOGTIMESTAMP, timestamp, sizeof(timestamp)); + if (SUCCEED == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LOGLASTSIZE, tmp, sizeof(tmp))) + values[value_num].lastlogsize = atoi(tmp); - zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LOGSOURCE, source, sizeof(source)); + if (SUCCEED == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LOGTIMESTAMP, tmp, sizeof(tmp))) + values[value_num].timestamp = atoi(tmp); - zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LOGSEVERITY, severity, sizeof(severity)); + if (SUCCEED == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LOGSOURCE, tmp, sizeof(tmp))) + values[value_num].source = strdup(tmp); - DBbegin(); + if (SUCCEED == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LOGSEVERITY, tmp, sizeof(tmp))) + values[value_num].severity = atoi(tmp); + + value_num ++; + + if (value_num == VALUES_MAX) { + DBbegin(); + process_mass_data(sock, proxy_hostid, values, value_num, &processed); + DBcommit(); + + clean_agent_values(values, value_num); + value_num = 0; + } + +/* DBbegin(); if(SUCCEED == process_data(sock, proxy_hostid, itemtime, host, key, value, lastlogsize, timestamp, source, severity)) { processed_ok ++; @@ -422,16 +466,24 @@ static int process_new_values(zbx_sock_t *sock, struct zbx_json_parse *jp, const { processed_fail ++; } + DBcommit();*/ + } + + if (value_num > 0) { + DBbegin(); + process_mass_data(sock, proxy_hostid, values, value_num, &processed); DBcommit(); } - zbx_snprintf(info,sizeof(info),"Processed %d Failed %d Total %d Seconds spent " ZBX_FS_DBL, - processed_ok, + clean_agent_values(values, value_num); + + zbx_snprintf(info, sizeof(info), "Processed %d Failed %d Total %d Seconds spent " ZBX_FS_DBL, + processed, processed_fail, - processed_ok+processed_fail, - (double)(zbx_time()-sec)); + processed + processed_fail, + zbx_time() - sec); - if(send_result(sock, ret, info) != SUCCEED) + if (send_result(sock, ret, info) != SUCCEED) { zabbix_log( LOG_LEVEL_WARNING, "Error sending result back"); zabbix_syslog("Trapper: error sending result back"); @@ -676,7 +728,7 @@ static int process_trap(zbx_sock_t *sock, char *s, int max_len) zabbix_log( LOG_LEVEL_DEBUG, "Value [%s]", value_string); DBbegin(); - ret=process_data(sock, 0, time(NULL), server, key, value_string, lastlogsize, timestamp, source, severity); +/* ret=process_data(sock, 0, time(NULL), server, key, value_string, lastlogsize, timestamp, source, severity);*/ DBcommit(); if( zbx_tcp_send_raw(sock, SUCCEED == ret ? "OK" : "NOT OK") != SUCCEED) diff --git a/src/zabbix_server/trapper/trapper.h b/src/zabbix_server/trapper/trapper.h index 01a3e91b..5cdc66b5 100644 --- a/src/zabbix_server/trapper/trapper.h +++ b/src/zabbix_server/trapper/trapper.h @@ -23,6 +23,20 @@ #include "common.h" #include "comms.h" +#define AGENT_VALUE struct zbx_agent_value_t + +AGENT_VALUE +{ + int clock; + char host_name[HOST_HOST_LEN_MAX]; + char key[ITEM_KEY_LEN_MAX]; + char *value; + int lastlogsize; + int timestamp; + char *source; + int severity; +}; + int send_result(zbx_sock_t *sock, int result, char *info); void child_trapper_main(zbx_process_t p, zbx_sock_t *s); diff --git a/src/zabbix_server/zlog.c b/src/zabbix_server/zlog.c index e44d10ce..dac1e760 100644 --- a/src/zabbix_server/zlog.c +++ b/src/zabbix_server/zlog.c @@ -66,11 +66,14 @@ void __zbx_zabbix_syslog(const char *fmt, ...) /* This is made to disable writing to database for watchdog */ if(CONFIG_ENABLE_LOG == 0) return; - result = DBselect("select %s where h.hostid=i.hostid and i.key_='%s' and i.value_type=%d" DB_NODE, - ZBX_SQL_ITEM_SELECT, - SERVER_ZABBIXLOG_KEY, - ITEM_VALUE_TYPE_STR, - DBnode_local("h.hostid")); + result = DBselect("select %s where h.hostid=i.hostid and h.status=%d and i.status=%d" + " and h.proxy_hostid=0 and i.key_='%s' and i.value_type=%d" DB_NODE, + ZBX_SQL_ITEM_SELECT, + ITEM_STATUS_ACTIVE, + HOST_STATUS_MONITORED, + SERVER_ZABBIXLOG_KEY, + ITEM_VALUE_TYPE_STR, + DBnode_local("h.hostid")); now = time(NULL); @@ -85,10 +88,16 @@ void __zbx_zabbix_syslog(const char *fmt, ...) init_result(&agent); SET_STR_RESULT(&agent, strdup(value_str)); - process_new_value(&item, &agent, now); - free_result(&agent); - update_triggers(item.itemid); + if (0 == CONFIG_DBSYNCER_FORKS) + { + process_new_value(&item, &agent, now); + update_triggers(item.itemid); + } + else + process_new_value(&item, &agent, now); + + free_result(&agent); } DBfree_result(result); |