diff options
| author | sasha <sasha@97f52cf1-0a1b-0410-bd0e-c28be96e8082> | 2007-10-20 18:04:00 +0000 |
|---|---|---|
| committer | sasha <sasha@97f52cf1-0a1b-0410-bd0e-c28be96e8082> | 2007-10-20 18:04:00 +0000 |
| commit | 381c6742533757053481569ae11496f8528cd37c (patch) | |
| tree | 655a6b994d1affe5540ff34584f918b9fc6bc71f /src/zabbix_server/nodewatcher | |
| parent | 68285fb56347e83ce36fc611482a09c84754f488 (diff) | |
| download | zabbix-381c6742533757053481569ae11496f8528cd37c.tar.gz zabbix-381c6742533757053481569ae11496f8528cd37c.tar.xz zabbix-381c6742533757053481569ae11496f8528cd37c.zip | |
- [ZBX-102] Distributed monitoring: overwriting information
[svn merge svn://svn.zabbix.com/branches/1.4.j -r4872:4874]
git-svn-id: svn://svn.zabbix.com/trunk@4875 97f52cf1-0a1b-0410-bd0e-c28be96e8082
Diffstat (limited to 'src/zabbix_server/nodewatcher')
| -rw-r--r-- | src/zabbix_server/nodewatcher/nodesender.c | 599 | ||||
| -rw-r--r-- | src/zabbix_server/nodewatcher/nodesender.h | 2 | ||||
| -rw-r--r-- | src/zabbix_server/nodewatcher/nodewatcher.c | 460 |
3 files changed, 535 insertions, 526 deletions
diff --git a/src/zabbix_server/nodewatcher/nodesender.c b/src/zabbix_server/nodewatcher/nodesender.c index 6c474abf..af354a20 100644 --- a/src/zabbix_server/nodewatcher/nodesender.c +++ b/src/zabbix_server/nodewatcher/nodesender.c @@ -33,221 +33,9 @@ /****************************************************************************** * * - * Function: send_config_data * - * * - * Purpose: send configuration changes to required node * - * * - * Parameters: * - * * - * Return value: SUCCESS - processed succesfully * - * FAIL - an error occured * - * * - * Author: Alexei Vladishev * - * * - * Comments: * - * * - ******************************************************************************/ -static int send_config_data(int nodeid, int dest_nodeid, zbx_uint64_t maxlogid, int node_type) -{ - DB_RESULT result; - DB_RESULT result2; - DB_ROW row; - DB_ROW row2; - - char *xml = NULL, *hex = NULL; - char fields[MAX_STRING_LEN]; - int offset=0; - int allocated=1024; - - int found=0; - - int i, j, hex_allocated=1024, rowlen; - - xml=zbx_malloc(xml, allocated); - hex=zbx_malloc(hex, hex_allocated); - - memset(xml,0,allocated); - - - zabbix_log( LOG_LEVEL_DEBUG, "In send_config_data(nodeid:%d,dest_node:%d,maxlogid:" ZBX_FS_UI64 ",type:%d)", - nodeid, - dest_nodeid, - maxlogid, - node_type); - - /* Begin work */ - if(node_type == ZBX_NODE_MASTER) - { - result=DBselect("select tablename,recordid,operation from node_configlog where nodeid=%d and sync_master=0 and conflogid<=" ZBX_FS_UI64 " order by tablename,operation", - nodeid, - maxlogid); - } - else - { - result=DBselect("select tablename,recordid,operation from node_configlog where nodeid=%d and sync_slave=0 and conflogid<=" ZBX_FS_UI64 " order by tablename,operation", - nodeid, - maxlogid); - } - - zbx_snprintf_alloc(&xml, &allocated, &offset, 128, "Data%c%d%c%d", - ZBX_DM_DELIMITER, - CONFIG_NODEID, - ZBX_DM_DELIMITER, - nodeid); - - while((row=DBfetch(result))) - { - found = 1; - - zabbix_log( LOG_LEVEL_DEBUG, "Fetched [%s,%s,%s]",row[0],row[1],row[2]); - /* Special (simpler) processing for operation DELETE */ - if(atoi(row[2]) == NODE_CONFIGLOG_OP_DELETE) - { - zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "\n%s%c%s%c%s", - row[0], - ZBX_DM_DELIMITER, - row[1], - ZBX_DM_DELIMITER, - row[2]); - continue; - } - for(i=0;tables[i].table!=0;i++) - { - if(strcmp(tables[i].table, row[0])==0) break; - } - - /* Found table */ - if(tables[i].table!=0) - { - fields[0]=0; - /* for each field */ - for(j=0;tables[i].fields[j].name!=0;j++) - { - zbx_strlcat(fields,tables[i].fields[j].name,sizeof(fields)); - zbx_strlcat(fields,",",sizeof(fields)); - - zbx_strlcat(fields,"length(",sizeof(fields)); - zbx_strlcat(fields,tables[i].fields[j].name,sizeof(fields)); - zbx_strlcat(fields,"),",sizeof(fields)); - } - if(fields[0]!=0) fields[strlen(fields)-1]=0; - - result2=DBselect("select %s from %s where %s=%s", - fields, - row[0], - tables[i].recid, - row[1]); - - row2=DBfetch(result2); - - if(row2) - { - zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "\n%s%c%s%c%s", - row[0], - ZBX_DM_DELIMITER, - row[1], - ZBX_DM_DELIMITER, - row[2]); - /* for each field */ - for(j=0;tables[i].fields[j].name!=0;j++) - { - if( (tables[i].fields[j].flags & ZBX_SYNC) ==0) continue; - /* Fieldname, type, value */ - if(DBis_null(row2[j*2]) == SUCCEED) - { -/* zabbix_log( LOG_LEVEL_WARNING, "Field name [%s] [%s]",tables[i].fields[j].name,row2[j*2]);*/ - zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "%c%s%c%d%cNULL", - ZBX_DM_DELIMITER, - tables[i].fields[j].name, - ZBX_DM_DELIMITER, - tables[i].fields[j].type, - ZBX_DM_DELIMITER); - } - else - { - if(tables[i].fields[j].type == ZBX_TYPE_INT || - tables[i].fields[j].type == ZBX_TYPE_UINT || - tables[i].fields[j].type == ZBX_TYPE_ID || - tables[i].fields[j].type == ZBX_TYPE_FLOAT) - { - zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "%c%s%c%d%c%s", - ZBX_DM_DELIMITER, - tables[i].fields[j].name, - ZBX_DM_DELIMITER, - tables[i].fields[j].type, - ZBX_DM_DELIMITER, - row2[j*2]); - } - else - { - rowlen = atoi(row2[j*2+1]); - zbx_binary2hex((u_char *)row2[j*2], rowlen, &hex, &hex_allocated); - zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "%c%s%c%d%c%s", - ZBX_DM_DELIMITER, - tables[i].fields[j].name, - ZBX_DM_DELIMITER, - tables[i].fields[j].type, - ZBX_DM_DELIMITER, - hex); - } - } - } - } - else - { - /* We assume that the record was just deleted, so we change operation to DELETE */ - zabbix_log( LOG_LEVEL_DEBUG, "Cannot select %s from table %s where %s=%s", - fields, - row[0], - tables[i].recid, - row[1]); - - zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "\n%s%c%s%c%d", - row[0], - ZBX_DM_DELIMITER, - row[1], - ZBX_DM_DELIMITER, - NODE_CONFIGLOG_OP_DELETE); - } - DBfree_result(result2); - } - else - { - zabbix_log( LOG_LEVEL_WARNING, "Cannot find table [%s]", - row[0]); - } - } - zabbix_log( LOG_LEVEL_DEBUG, "DATA [%s]", - xml); - if( (found == 1) && send_to_node("configuration changes", dest_nodeid, nodeid, xml) == SUCCEED) - { - if(node_type == ZBX_NODE_MASTER) - { - DBexecute("update node_configlog set sync_master=1 where nodeid=%d and sync_master=0 and conflogid<=" ZBX_FS_UI64, - nodeid, - maxlogid); - } - else - { - DBexecute("update node_configlog set sync_slave=1 where nodeid=%d and sync_slave=0 and conflogid<=" ZBX_FS_UI64, - nodeid, - maxlogid); - } - } - - DBfree_result(result); - zbx_free(xml); - zbx_free(hex); - /* Commit */ - - return SUCCEED; -} - -/****************************************************************************** - * * * Function: get_slave_node * * * - * Purpose: send configuration changes to required node * + * Purpose: * * * * Parameters: * * * @@ -259,42 +47,35 @@ static int send_config_data(int nodeid, int dest_nodeid, zbx_uint64_t maxlogid, * Comments: * * * ******************************************************************************/ -static int get_slave_node(int nodeid) +static int get_slave_node(int nodeid, int synked_nodeid) { DB_RESULT result; DB_ROW row; - int ret = 0; - int m; + int master_nodeid; zabbix_log( LOG_LEVEL_DEBUG, "In get_slave_node(%d)", nodeid); result = DBselect("select masterid from nodes where nodeid=%d", - nodeid); - row = DBfetch(result); - if(row) - { - m = atoi(row[0]); - if(m == CONFIG_NODEID) - { - ret = nodeid; - } - else if(m ==0) - { - ret = m; - } - else ret = get_slave_node(m); - } + synked_nodeid); + if (NULL != (row = DBfetch(result))) + master_nodeid = atoi(row[0]); + else + master_nodeid = 0; DBfree_result(result); - return ret; + if (master_nodeid == 0) + return 0; + if (master_nodeid == nodeid) + return synked_nodeid; + return get_slave_node(nodeid, master_nodeid); } /****************************************************************************** * * * Function: get_master_node * * * - * Purpose: send configuration changes to required node * + * Purpose: * * * * Parameters: * * * @@ -316,10 +97,8 @@ int get_master_node(int nodeid) nodeid); result = DBselect("select masterid from nodes where nodeid=%d", - CONFIG_NODEID); - row = DBfetch(result); - if(row) - { + nodeid); + if (NULL != (row = DBfetch(result))) { ret = atoi(row[0]); } DBfree_result(result); @@ -343,118 +122,231 @@ int get_master_node(int nodeid) * Comments: * * * ******************************************************************************/ -static int send_to_master_and_slave(int nodeid) +char *get_config_data(int synked_nodeid, int dest_nodetype) { DB_RESULT result; + DB_RESULT result2; DB_ROW row; - int master_nodeid, - slave_nodeid, - master_result = FAIL, - slave_result = FAIL; - zbx_uint64_t maxlogid; - - zabbix_log( LOG_LEVEL_DEBUG, "In send_to_master_and_slave(node:%d)", - nodeid); - - result = DBselect("select max(conflogid) from node_configlog where nodeid=%d", - nodeid); - - row = DBfetch(result); - - if(row && DBis_null(row[0]) == SUCCEED) - { - zabbix_log( LOG_LEVEL_DEBUG, "No configuration changes of node %d", - nodeid); - DBfree_result(result); - return SUCCEED; - } - ZBX_STR2UINT64(maxlogid,row[0]); - DBfree_result(result); - - - master_nodeid=get_master_node(nodeid); - slave_nodeid=get_slave_node(nodeid); + DB_ROW row2; - if(master_nodeid != 0) - { - master_result = send_config_data(nodeid, master_nodeid, maxlogid, ZBX_NODE_MASTER); - } + char *data = NULL, *hex = NULL, *sql = NULL, c, sync[131], *s, *r[2], *d[2]; + int data_offset=0, sql_offset = 0; + int data_allocated=1024, hex_allocated=1024, sql_allocated=8*1024; + int t, f, j, rowlen, found = 0; + + zabbix_log( LOG_LEVEL_DEBUG, "In get_config_data(synked_node:%d,dest_nodetype:%s)", + synked_nodeid, + dest_nodetype == ZBX_NODE_MASTER ? "MASTER" : "SLAVE"); + + data = zbx_malloc(data, data_allocated); + hex = zbx_malloc(hex, hex_allocated); + sql = zbx_malloc(sql, sql_allocated); + c = '1'; + + /* Find updated records */ + result = DBselect("select curr.tablename,curr.recordid,prev.cksum,curr.cksum,prev.sync " + "from node_cksum curr, node_cksum prev " + "where curr.nodeid=%1$d and prev.nodeid=%1$d and " + "curr.tablename=prev.tablename and curr.recordid=prev.recordid and " + "curr.cksumtype=%3$d and prev.cksumtype=%2$d " + /*" and curr.tablename='hosts' "*/ + "union all " + /* Find new records */ + "select curr.tablename,curr.recordid,prev.cksum,curr.cksum,curr.sync " + "from node_cksum curr left join node_cksum prev " + "on prev.nodeid=%1$d and prev.tablename=curr.tablename and " + "prev.recordid=curr.recordid and prev.cksumtype=%2$d " + "where curr.nodeid=%1$d and curr.cksumtype=%3$d and prev.tablename is null " + /*" and curr.tablename='hosts' "*/ + "union all " + /* Find deleted records */ + "select prev.tablename,prev.recordid,prev.cksum,curr.cksum,prev.sync " + "from node_cksum prev left join node_cksum curr " + "on prev.nodeid=curr.nodeid and curr.nodeid=%1$d and curr.tablename=prev.tablename and " + "curr.recordid=prev.recordid and curr.cksumtype=%3$d " + "where prev.nodeid=%1$d and prev.cksumtype=%2$d and curr.tablename is null" + /*" and prev.tablename='hosts' "*/, + synked_nodeid, + NODE_CKSUM_TYPE_OLD, /* prev */ + NODE_CKSUM_TYPE_NEW); /* curr */ + + zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "Data%c%d%c%d\n", + ZBX_DM_DELIMITER, + CONFIG_NODEID, + ZBX_DM_DELIMITER, + synked_nodeid); - if(slave_nodeid != 0) - { - slave_result = send_config_data(nodeid, slave_nodeid, maxlogid, ZBX_NODE_SLAVE); - } + while (NULL != (row = DBfetch(result))) { + for (t = 0; tables[t].table != 0 && strcmp(tables[t].table, row[0]) != 0; t++) + ; - if( (master_nodeid!=0) && (slave_nodeid != 0)) - { - if((master_result == SUCCEED) && (slave_result == SUCCEED)) - { - DBexecute("delete from node_configlog where nodeid=%d and sync_slave=1 and sync_master=1 and conflogid<=" ZBX_FS_UI64, - nodeid, - maxlogid); + /* Found table */ + if (tables[t].table == 0) { + zabbix_log( LOG_LEVEL_WARNING, "Cannot find table [%s]", + row[0]); + continue; } - } - if(master_nodeid!=0) - { - if(master_result == SUCCEED) - { - DBexecute("delete from node_configlog where nodeid=%d and sync_master=1 and conflogid<=" ZBX_FS_UI64, - nodeid, - maxlogid); - } - } + if (DBis_null(row[4]) == FAIL) + strcpy(sync, row[4]); + else + memset(sync, ' ', sizeof(sync)); + s = sync; - if(slave_nodeid!=0) - { - if(slave_result == SUCCEED) - { - DBexecute("delete from node_configlog where nodeid=%d and sync_slave=1 and conflogid<=" ZBX_FS_UI64, - nodeid, - maxlogid); + /* Special (simpler) processing for operation DELETE */ + if (DBis_null(row[2]) == FAIL && DBis_null(row[3]) == SUCCEED && + ((dest_nodetype == ZBX_NODE_SLAVE && *s != c) || + (dest_nodetype == ZBX_NODE_MASTER && *(s+1) != c))) { + zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "%s%c%s%c%d\n", + row[0], + ZBX_DM_DELIMITER, + row[1], + ZBX_DM_DELIMITER, + NODE_CONFIGLOG_OP_DELETE); + found = 1; + continue; } - } - - return SUCCEED; -} - - -/****************************************************************************** - * * - * Function: process_node * - * * - * Purpose: select all related nodes and send config changes * - * * - * Parameters: * - * * - * Return value: SUCCESS - processed succesfully * - * FAIL - an error occured * - * * - * Author: Alexei Vladishev * - * * - * Comments: * - * * - ******************************************************************************/ -static int process_node(int nodeid) -{ - DB_RESULT result; - DB_ROW row; - zabbix_log( LOG_LEVEL_DEBUG, "In process_node(node:%d)", - nodeid); + r[0] = row[2]; + r[1] = row[3]; + f = 0; + sql_offset = 0; + s += 2; + + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "select "); + do { + if ((tables[t].fields[f].flags & ZBX_SYNC) == 0) + f++; + + if (strcmp(tables[t].recid, tables[t].fields[f].name) == 0) + f++; + + d[0] = NULL; + d[1] = NULL; + if (NULL != r[0] && NULL != (d[0] = strchr(r[0], ZBX_CKSUM_DELIMITER))) + *d[0] = '\0'; + if (NULL != r[1] && NULL != (d[1] = strchr(r[1], ZBX_CKSUM_DELIMITER))) + *d[1] = '\0'; + + if (r[0] == NULL || r[1] == NULL || (dest_nodetype == ZBX_NODE_SLAVE && *s != c) || + (dest_nodetype == ZBX_NODE_MASTER && *(s+1) != c) || strcmp(r[0], r[1]) != 0) { + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "%s,length(%1$s),", + tables[t].fields[f].name); + } + s += 2; + + if (d[0] != NULL) { + *d[0] = ZBX_CKSUM_DELIMITER; + r[0] = d[0] + 1; + } + if (d[1] != NULL) { + *d[1] = ZBX_CKSUM_DELIMITER; + r[1] = d[1] + 1; + } + + if (d[0] == NULL && d[1] == NULL) + break; + f++; + } while (1); + + if (sql[sql_offset-1] != ',') + continue; + + sql_offset--; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, " from %s where %s=%s", + row[0], + tables[t].recid, + row[1]); + + result2 = DBselect("%s", sql); + if (NULL == (row2=DBfetch(result2))) + goto out; + + zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "%s%c%s%c%d", + row[0], + ZBX_DM_DELIMITER, + row[1], + ZBX_DM_DELIMITER, + NODE_CONFIGLOG_OP_UPDATE); + + r[0] = row[2]; + r[1] = row[3]; + s = sync + 2; + f = 0; + j = 0; + + do { + if ((tables[t].fields[f].flags & ZBX_SYNC) == 0) + f++; + + if (strcmp(tables[t].recid, tables[t].fields[f].name) == 0) + f++; + + d[0] = NULL; + d[1] = NULL; + if (NULL != r[0] && NULL != (d[0] = strchr(r[0], ZBX_CKSUM_DELIMITER))) + *d[0] = '\0'; + if (NULL != r[1] && NULL != (d[1] = strchr(r[1], ZBX_CKSUM_DELIMITER))) + *d[1] = '\0'; + + if (r[0] == NULL || r[1] == NULL || (dest_nodetype == ZBX_NODE_SLAVE && *s != c) || + (dest_nodetype == ZBX_NODE_MASTER && *(s+1) != c) || strcmp(r[0], r[1]) != 0) { + + zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "%c%s%c%d%c", + ZBX_DM_DELIMITER, + tables[t].fields[f].name, + ZBX_DM_DELIMITER, + tables[t].fields[f].type, + ZBX_DM_DELIMITER); + + /* Fieldname, type, value */ + if (DBis_null(row2[j*2]) == SUCCEED) { + zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "NULL"); + } else if(tables[t].fields[f].type == ZBX_TYPE_INT || + tables[t].fields[f].type == ZBX_TYPE_UINT || + tables[t].fields[f].type == ZBX_TYPE_ID || + tables[t].fields[f].type == ZBX_TYPE_FLOAT) { + + zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "%s", row2[j*2]); + } else { + rowlen = atoi(row2[j*2+1]); + zbx_binary2hex((u_char *)row2[j*2], rowlen, &hex, &hex_allocated); + zbx_snprintf_alloc(&data, &data_allocated, &data_offset, strlen(hex)+128, "%s", hex); +/*zabbix_log(LOG_LEVEL_CRIT, "----- [field:%s][type:%d][row:%s][hex:%s]",tables[t].fields[f].name,tables[t].fields[f].type,row2[j*2],hex);*/ + } + found = 1; + j++; + } + s += 2; + + if (d[0] != NULL) { + *d[0] = ZBX_CKSUM_DELIMITER; + r[0] = d[0] + 1; + } + if (d[1] != NULL) { + *d[1] = ZBX_CKSUM_DELIMITER; + r[1] = d[1] + 1; + } + + if (d[0] == NULL && d[1] == NULL) + break; + f++; + } while (1); + zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "\n"); +out: + DBfree_result(result2); + } + DBfree_result(result); - send_to_master_and_slave(nodeid); + zbx_free(hex); + zbx_free(sql); - result = DBselect("select nodeid from nodes where masterid=%d and nodeid not in (%d)", - nodeid, - nodeid); - while((row=DBfetch(result))) - { - process_node(atoi(row[0])); + if (0 == found) { + zbx_free(data); + data = NULL; } - DBfree_result(result); - return SUCCEED; + return data; } /****************************************************************************** @@ -472,29 +364,40 @@ static int process_node(int nodeid) * Comments: never returns * * * ******************************************************************************/ -void main_nodesender() +void main_nodesender(int synked_nodeid, int *synked_slave, int *synked_master) { DB_RESULT result; DB_ROW row; + int nodeid, slave_nodeid, master_nodeid; + char *data; + + zabbix_log(LOG_LEVEL_DEBUG, "In main_nodesender()"); - zabbix_log( LOG_LEVEL_DEBUG, "In main_nodesender()"); + *synked_slave = FAIL; + *synked_master = FAIL; result = DBselect("select nodeid from nodes where nodetype=%d", ZBX_NODE_TYPE_LOCAL); - row = DBfetch(result); - - if(row) - { - if(CONFIG_NODEID != atoi(row[0])) - { - zabbix_log( LOG_LEVEL_WARNING, "NodeID does not match configuration settings. Processing of the node is disabled."); - } - else - { - process_node(atoi(row[0])); + if (NULL != (row = DBfetch(result))) { + nodeid = atoi(row[0]); + if (CONFIG_NODEID != nodeid) { + zabbix_log(LOG_LEVEL_WARNING, "NodeID does not match configuration settings." + " Processing of the node is disabled."); + } else { + slave_nodeid = get_slave_node(nodeid, synked_nodeid); + master_nodeid = CONFIG_MASTER_NODEID; + if (0 != slave_nodeid && NULL != (data = get_config_data(synked_nodeid, ZBX_NODE_SLAVE))) { +/*zabbix_log(LOG_LEVEL_CRIT, "-----> [%s]", data);*/ + *synked_slave = send_to_node("configuration changes", slave_nodeid, synked_nodeid, data); + zbx_free(data); + } + if (0 != master_nodeid && NULL != (data = get_config_data(synked_nodeid, ZBX_NODE_MASTER))) { +/*zabbix_log(LOG_LEVEL_CRIT, "-----> [%s]", data);*/ + *synked_master = send_to_node("configuration changes", master_nodeid, synked_nodeid, data); + zbx_free(data); + } } } - DBfree_result(result); } diff --git a/src/zabbix_server/nodewatcher/nodesender.h b/src/zabbix_server/nodewatcher/nodesender.h index ff9ba5c7..30c9f346 100644 --- a/src/zabbix_server/nodewatcher/nodesender.h +++ b/src/zabbix_server/nodewatcher/nodesender.h @@ -20,7 +20,7 @@ #ifndef ZABBIX_NODESENDER_H #define ZABBIX_NODESENDER_H -void main_nodesender(); +void main_nodesender(int nodeid, int *synked_slave, int *synked_master); int get_master_node(int nodeid); #endif diff --git a/src/zabbix_server/nodewatcher/nodewatcher.c b/src/zabbix_server/nodewatcher/nodewatcher.c index d4534ca3..486148ef 100644 --- a/src/zabbix_server/nodewatcher/nodewatcher.c +++ b/src/zabbix_server/nodewatcher/nodewatcher.c @@ -18,7 +18,6 @@ **/ #include "common.h" - #include "cfg.h" #include "db.h" #include "log.h" @@ -46,129 +45,109 @@ * Comments: * * * ******************************************************************************/ -static int calculate_checksums() +int calculate_checksums(int nodeid, const char *tablename, const zbx_uint64_t id) { + char *sql = NULL; + int sql_allocated = 16*1024, sql_offset = 0; + int t, f, res = SUCCEED; - char *sql = NULL; - int sql_allocated, sql_offset; - - int i = 0; - int j; - DB_RESULT result; - DB_RESULT result2; - DB_ROW row; - DB_ROW row2; - int nodeid; - - int now; - - zabbix_log( LOG_LEVEL_DEBUG, "In calculate_checksums"); + zabbix_log(LOG_LEVEL_DEBUG, "In calculate_checksums"); - DBexecute("delete from node_cksum where cksumtype=%d", - NODE_CKSUM_TYPE_NEW); + sql = zbx_malloc(sql, sql_allocated); - /* Select all nodes */ - result =DBselect("select nodeid from nodes"); - while((row=DBfetch(result))) - { - sql_allocated=64*1024; - sql_offset=0; - sql=zbx_malloc(sql, sql_allocated); - - now = time(NULL); - nodeid = atoi(row[0]); - - zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, - "select 'table ','field ',itemid, '012345678901234' from items where 1=0\n"); + for (t = 0; tables[t].table != 0; t++) { + /* Do not sync some of tables */ + if ((tables[t].flags & ZBX_SYNC) == 0) + continue; - for(i=0;tables[i].table!=0;i++) - { -/* zabbix_log( LOG_LEVEL_WARNING, "In calculate_checksums2 [%s]", tables[i].table ); */ - /* Do not sync some of tables */ - if( (tables[i].flags & ZBX_SYNC) ==0) continue; + if (NULL != tablename && 0 != strcmp(tablename, tables[t].table)) + continue; - zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, #ifdef HAVE_MYSQL - "union all select '%s','%s',%s,md5(concat(", + "%s select %d,'%s',%s,%d,concat(", #else - "union all select '%s','%s',%s,md5(", + "%s select %d,'%s',%s,%d,", #endif - tables[i].table, - tables[i].recid, - tables[i].recid); - - j=0; - while(tables[i].fields[j].name != 0) - { - if( (tables[i].fields[j].flags & ZBX_SYNC) ==0) - { - j++; - continue; - } -#ifdef HAVE_MYSQL - zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "coalesce(%s,'1234567890'),", - tables[i].fields[j].name); -#else - if(tables[i].fields[j].type == ZBX_TYPE_BLOB) /* postgresql is not work: coalesce(blob,'1234567890')||coalesce(varchar,'1234567890') */ - { - zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "md5(coalesce(%s,'1234567890'))||", - tables[i].fields[j].name); + sql_offset > 0 ? "union all" : "insert into node_cksum (nodeid,tablename,recordid,cksumtype,cksum)", + nodeid, + tables[t].table, + tables[t].recid, + NODE_CKSUM_TYPE_NEW); + + for (f = 0; tables[t].fields[f].name != 0; f ++) { + if ((tables[t].fields[f].flags & ZBX_SYNC) == 0) + continue; + + if (strcmp(tables[t].recid, tables[t].fields[f].name) == 0) + continue; + + if (tables[t].fields[f].flags & ZBX_NOTNULL) { + switch ( tables[t].fields[f].type ) { + case ZBX_TYPE_ID : + case ZBX_TYPE_INT : + case ZBX_TYPE_UINT : + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, + "%s", + tables[t].fields[f].name); + break; + default : + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, + "md5(%s)", + tables[t].fields[f].name); + break; } - else - { - zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "coalesce(%s,'1234567890')||", - tables[i].fields[j].name); + } else { + switch ( tables[t].fields[f].type ) { + case ZBX_TYPE_ID : + case ZBX_TYPE_INT : + case ZBX_TYPE_UINT : + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, + "case when %s is null then 'NULL' else %1$s end", + tables[t].fields[f].name); + break; + default : + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, + "case when %s is null then 'NULL' else md5(%1$s) end", + tables[t].fields[f].name); + break; } -#endif - j++; } + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 16, #ifdef HAVE_MYSQL - if(j>0) sql_offset--; /* Remove last */ + ",'%c',", #else - if(j>0) sql_offset-=2; /* Remove last */ + "||'%c'||", #endif + ZBX_CKSUM_DELIMITER); + } - /* select table,recid,md5(fields) from table union all ... */ - zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096, + /* remove last delimiter */ + if (f > 0) { #ifdef HAVE_MYSQL - ")) from %s where %s>=" ZBX_FS_UI64 " and %s<=" ZBX_FS_UI64 "\n", + sql_offset -= 5; + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 16, ")"); #else - ") from %s where %s>=" ZBX_FS_UI64 " and %s<=" ZBX_FS_UI64 "\n", + sql_offset -= 7; #endif - tables[i].table, - tables[i].recid, - (zbx_uint64_t)__UINT64_C(100000000000000)*(zbx_uint64_t)nodeid, - tables[i].recid, - (zbx_uint64_t)__UINT64_C(100000000000000)*(zbx_uint64_t)nodeid+__UINT64_C(99999999999999)); - } -/* zabbix_log( LOG_LEVEL_WARNING, "SQL DUMP [%s]", sql);*/ - result2 =DBselect("%s",sql); + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512, + " from %s where"ZBX_COND_NODEID, + tables[t].table, + ZBX_NODE(tables[t].recid,nodeid)); -/* zabbix_log( LOG_LEVEL_WARNING, "Selected records in %d seconds", time(NULL)-now);*/ - now = time(NULL); - i=0; - while((row2=DBfetch(result2))) - { - DBexecute("insert into node_cksum (cksumid,nodeid,tablename,fieldname,recordid,cksumtype,cksum) "\ - "values (" ZBX_FS_UI64 ",%d,'%s','%s',%s,%d,'%s')", -/* DBget_nextid("node_cksum","cksumid"),*/ - DBget_maxid("node_cksum","cksumid"), - nodeid, - row2[0], - row2[1], - row2[2], - NODE_CKSUM_TYPE_NEW, - row2[3]); - i++; + if (0 != id) { + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, + "and %s="ZBX_FS_UI64, + tables[t].recid, + id); } - DBfree_result(result2); - zbx_free(sql); + zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "\n"); } - DBfree_result(result); - - return SUCCEED; + if (DBexecute("%s", sql) < ZBX_DB_OK) + res = FAIL; + return res; } /****************************************************************************** @@ -187,102 +166,237 @@ static int calculate_checksums() * Comments: * * * ******************************************************************************/ -static int update_checksums() +int update_checksums(int nodeid, int synked_slave, int synked_master, const char *tablename, const zbx_uint64_t id, char *fields) { - DBexecute("delete from node_cksum where cksumtype=%d", - NODE_CKSUM_TYPE_OLD); - DBexecute("update node_cksum set cksumtype=%d where cksumtype=%d", - NODE_CKSUM_TYPE_OLD, - NODE_CKSUM_TYPE_NEW); + char *r[2], *d[2], sync[131], *s; + char cs, cm, sql[2][256]; + DB_RESULT result; + DB_ROW row; + int t, f; + + zabbix_log(LOG_LEVEL_DEBUG, "In update_checksums"); + + cs = synked_slave == SUCCEED ? '1' : ' '; + cm = synked_master == SUCCEED ? '1' : ' '; + + if (NULL != tablename) { + zbx_snprintf(sql[0], sizeof(sql[0]), " and curr.tablename='%s' and curr.recordid="ZBX_FS_UI64, + tablename, id); + zbx_snprintf(sql[1], sizeof(sql[1]), " and prev.tablename='%s' and prev.recordid="ZBX_FS_UI64, + tablename, id); + } else { + *sql[0] = '\0'; + *sql[1] = '\0'; + } + + /* Find updated records */ + result = DBselect("select curr.tablename,curr.recordid,prev.cksum,curr.cksum,prev.sync " + "from node_cksum curr, node_cksum prev " + "where curr.nodeid=%1$d and prev.nodeid=%1$d and " + "curr.tablename=prev.tablename and curr.recordid=prev.recordid and " + "curr.cksumtype=%3$d and prev.cksumtype=%2$d%4$s " + "union all " + /* Find new records */ + "select curr.tablename,curr.recordid,prev.cksum,curr.cksum,NULL " + "from node_cksum curr left join node_cksum prev " + "on prev.nodeid=%1$d and prev.tablename=curr.tablename and " + "prev.recordid=curr.recordid and prev.cksumtype=%2$d " + "where curr.nodeid=%1$d and curr.cksumtype=%3$d and prev.tablename is null%4$s " + "union all " + /* Find deleted records */ + "select prev.tablename,prev.recordid,prev.cksum,curr.cksum,prev.sync " + "from node_cksum prev left join node_cksum curr " + "on prev.nodeid=curr.nodeid and curr.nodeid=%1$d and curr.tablename=prev.tablename and " + "curr.recordid=prev.recordid and curr.cksumtype=%3$d " + "where prev.nodeid=%1$d and prev.cksumtype=%2$d and curr.tablename is null%5$s", + nodeid, + NODE_CKSUM_TYPE_OLD, /* prev */ + NODE_CKSUM_TYPE_NEW, /* curr */ + sql[0], + sql[1]); + + while (NULL != (row = DBfetch(result))) { + for (t = 0; tables[t].table != 0 && strcmp(tables[t].table, row[0]) != 0; t++) + ; + + /* Found table */ + if (tables[t].table == 0) { + zabbix_log(LOG_LEVEL_WARNING, "Cannot find table [%s]", + row[0]); + continue; + } + + if (DBis_null(row[4]) == FAIL) + strcpy(sync, row[4]); + else + memset(sync, ' ', sizeof(sync)); + s = sync; + + /* Special (simpler) processing for operation DELETE */ + if (DBis_null(row[3]) == SUCCEED) { + if (synked_slave == SUCCEED && *s != cs) + *s = cs; + if (synked_master == SUCCEED && *(s+1) != cm) + *(s+1) = cm; + s += 2; + } else { + r[0] = row[2]; + r[1] = row[3]; + s += 2; + f = 0; + + do { + if ((tables[t].fields[f].flags & ZBX_SYNC) == 0) + f++; + + if (strcmp(tables[t].recid, tables[t].fields[f].name) == 0) + f++; + + d[0] = NULL; + d[1] = NULL; + if (NULL != r[0] && NULL != (d[0] = strchr(r[0], ZBX_CKSUM_DELIMITER))) + *d[0] = '\0'; + if (NULL != r[1] && NULL != (d[1] = strchr(r[1], ZBX_CKSUM_DELIMITER))) + *d[1] = '\0'; + + if (NULL == tablename || SUCCEED == str_in_list(fields, tables[t].fields[f].name, ',')) { + if (r[0] == NULL || r[1] == NULL || strcmp(r[0], r[1]) != 0) { + *s = cs; + *(s+1) = cm; + } else { + if (synked_slave == SUCCEED && *s != cs) + *s = cs; + if (synked_master == SUCCEED && *(s+1) != cm) + *(s+1) = cm; + } + } + s += 2; + + if (d[0] != NULL) { + *d[0] = ZBX_CKSUM_DELIMITER; + r[0] = d[0] + 1; + } + if (d[1] != NULL) { + *d[1] = ZBX_CKSUM_DELIMITER; + r[1] = d[1] + 1; + } + + if (d[0] == NULL && d[1] == NULL) + break; + f++; + } while (1); + } + *s = '\0'; + + if (DBis_null(row[2]) == SUCCEED || DBis_null(row[3]) == SUCCEED || + strcmp(row[4], sync) != 0 || strcmp(row[2], row[3]) != 0) { + DBexecute("update node_cksum set cksumtype=%d,cksum=\'%s\',sync=\'%s\' " + "where nodeid=%d and tablename=\'%s\' and recordid=%s and cksumtype=%d", + NODE_CKSUM_TYPE_OLD, + DBis_null(row[3]) == SUCCEED ? row[2] : row[3], + sync, + nodeid, + row[0], + row[1], + DBis_null(row[2]) == SUCCEED ? NODE_CKSUM_TYPE_NEW : NODE_CKSUM_TYPE_OLD); + } + } + DBfree_result(result); return SUCCEED; } /****************************************************************************** * * - * Function: compare_checksums * + * Function: lock_node * * * - * Purpose: compare new checksums with old ones. Write difference to * - * table 'node_config' * + * Purpose: * * * * Parameters: * * * - * Return value: SUCCESS - calculated succesfully * - * FAIL - an error occured * + * Return value: * * * - * Author: Alexei Vladishev * + * Author: Aleksander Vladishev * * * * Comments: * * * ******************************************************************************/ -static int compare_checksums() +int lock_sync_node(int nodeid) { DB_RESULT result; DB_ROW row; - - /* Find updated records */ - result = DBselect("select curr.nodeid,curr.tablename,curr.recordid from node_cksum prev, node_cksum curr where curr.tablename=prev.tablename and curr.recordid=prev.recordid and curr.fieldname=prev.fieldname and curr.nodeid=prev.nodeid and curr.cksum<>prev.cksum and curr.cksumtype=%d and prev.cksumtype=%d", - NODE_CKSUM_TYPE_NEW, - NODE_CKSUM_TYPE_OLD); - while((row=DBfetch(result))) - { - zabbix_log( LOG_LEVEL_DEBUG, "Adding record to node_configlog NODE_CONFIGLOG_OP_UPDATE"); - DBexecute("insert into node_configlog (conflogid,nodeid,tablename,recordid,operation)" \ - "values (" ZBX_FS_UI64 ",%s,'%s',%s,%d)", -/* DBget_nextid("node_configlog","conflogid"),*/ - DBget_maxid("node_configlog","conflogid"), - row[0], - row[1], - row[2], - NODE_CONFIGLOG_OP_UPDATE); + int sync; + int res = FAIL; + + if (DBexecute("update nodes set sync=sync+1 where nodeid=%d", + nodeid) >= ZBX_DB_OK) { + + result = DBselect("select sync from nodes where nodeid=%d", + nodeid); + if (NULL != (row=DBfetch(result))) { + sync = atoi(row[0]); + if (sync == 1 || sync > 25) { + if (DBexecute("delete from node_cksum where nodeid=%d and cksumtype=%d", + nodeid, + NODE_CKSUM_TYPE_NEW) >= ZBX_DB_OK) { + res = SUCCEED; + } + } + } + DBfree_result(result); } - DBfree_result(result); + return res; +} - /* Find new records */ - result = DBselect("select curr.nodeid,curr.tablename,curr.recordid from node_cksum curr" \ - " left join node_cksum prev" \ - " on curr.tablename=prev.tablename and curr.recordid=prev.recordid and curr.fieldname=prev.fieldname and curr.nodeid=prev.nodeid and curr.cksumtype<>prev.cksumtype" \ - " where prev.cksumid is null and curr.cksumtype=%d", - NODE_CKSUM_TYPE_NEW); +/****************************************************************************** + * * + * Function: process_nodes * + * * + * Purpose: calculates checks sum of config data * + * * + * Parameters: * + * * + * Return value: * + * * + * Author: Aleksander Vladishev * + * * + * Comments: never returns * + * * + ******************************************************************************/ +void process_nodes() +{ + DB_RESULT result; + DB_ROW row; + int nodeid, synked_slave, synked_master; +/* int now = time(NULL);*/ - while((row=DBfetch(result))) - { - zabbix_log( LOG_LEVEL_DEBUG, "Adding record to node_configlog NODE_CONFIGLOG_OP_ADD"); - DBexecute("insert into node_configlog (conflogid,nodeid,tablename,recordid,operation)" \ - "values (" ZBX_FS_UI64 ",%s,'%s',%s,%d)", -/* DBget_nextid("node_configlog","conflogid"),*/ - DBget_maxid("node_configlog","conflogid"), - row[0], - row[1], - row[2], - NODE_CONFIGLOG_OP_ADD); - } - DBfree_result(result); +/* DBbegin();*/ - /* Find deleted records */ - result = DBselect("select curr.nodeid,curr.tablename,curr.recordid from node_cksum curr" \ - " left join node_cksum prev" \ - " on curr.tablename=prev.tablename and curr.recordid=prev.recordid and curr.fieldname=prev.fieldname and curr.nodeid=prev.nodeid and curr.cksumtype<>prev.cksumtype" \ - " where prev.cksumid is null and curr.cksumtype=%d", - NODE_CKSUM_TYPE_OLD); + /* Select all nodes */ + result = DBselect("select nodeid from nodes"); + while (NULL != (row=DBfetch(result))) { + nodeid = atoi(row[0]); - while((row=DBfetch(result))) - { - zabbix_log( LOG_LEVEL_DEBUG, "Adding record to node_configlog NODE_CONFIGLOG_OP_DELETE"); - DBexecute("insert into node_configlog (conflogid,nodeid,tablename,recordid,operation)" \ - "values (" ZBX_FS_UI64 ",%s,'%s',%s,%d)", -/* DBget_nextid("node_configlog","conflogid"),*/ - DBget_maxid("node_configlog","conflogid"), - row[0], - row[1], - row[2], - NODE_CONFIGLOG_OP_DELETE); + if (FAIL == lock_sync_node(nodeid)) + continue; + + if (FAIL == calculate_checksums(nodeid, NULL, 0)) + continue; + + /* Send configuration changes to required nodes */ + main_nodesender(nodeid, &synked_slave, &synked_master); + if (synked_slave == SUCCEED || synked_master == SUCCEED) + update_checksums(nodeid, synked_slave, synked_master, NULL, 0, NULL); + + DBexecute("update nodes set sync=0 where nodeid=%d", + nodeid); } DBfree_result(result); - return SUCCEED; -} +/* DBcommit();*/ +/* zabbix_log(LOG_LEVEL_CRIT, "----- process_nodes [Selected records in %d seconds]", time(NULL)-now);*/ +} /****************************************************************************** * * @@ -316,15 +430,7 @@ int main_nodewatcher_loop() if(lastrun + 120 < start) { - - DBbegin(); - calculate_checksums(); - compare_checksums(); - update_checksums(); - - /* Send configuration changes to required nodes */ - main_nodesender(); - DBcommit(); + process_nodes(); lastrun = start; } |
