summaryrefslogtreecommitdiffstats
path: root/src/zabbix_server/nodewatcher
diff options
context:
space:
mode:
authorsasha <sasha@97f52cf1-0a1b-0410-bd0e-c28be96e8082>2007-10-20 18:04:00 +0000
committersasha <sasha@97f52cf1-0a1b-0410-bd0e-c28be96e8082>2007-10-20 18:04:00 +0000
commit381c6742533757053481569ae11496f8528cd37c (patch)
tree655a6b994d1affe5540ff34584f918b9fc6bc71f /src/zabbix_server/nodewatcher
parent68285fb56347e83ce36fc611482a09c84754f488 (diff)
downloadzabbix-381c6742533757053481569ae11496f8528cd37c.tar.gz
zabbix-381c6742533757053481569ae11496f8528cd37c.tar.xz
zabbix-381c6742533757053481569ae11496f8528cd37c.zip
- [ZBX-102] Distributed monitoring: overwriting information
[svn merge svn://svn.zabbix.com/branches/1.4.j -r4872:4874] git-svn-id: svn://svn.zabbix.com/trunk@4875 97f52cf1-0a1b-0410-bd0e-c28be96e8082
Diffstat (limited to 'src/zabbix_server/nodewatcher')
-rw-r--r--src/zabbix_server/nodewatcher/nodesender.c599
-rw-r--r--src/zabbix_server/nodewatcher/nodesender.h2
-rw-r--r--src/zabbix_server/nodewatcher/nodewatcher.c460
3 files changed, 535 insertions, 526 deletions
diff --git a/src/zabbix_server/nodewatcher/nodesender.c b/src/zabbix_server/nodewatcher/nodesender.c
index 6c474abf..af354a20 100644
--- a/src/zabbix_server/nodewatcher/nodesender.c
+++ b/src/zabbix_server/nodewatcher/nodesender.c
@@ -33,221 +33,9 @@
/******************************************************************************
* *
- * Function: send_config_data *
- * *
- * Purpose: send configuration changes to required node *
- * *
- * Parameters: *
- * *
- * Return value: SUCCESS - processed succesfully *
- * FAIL - an error occured *
- * *
- * Author: Alexei Vladishev *
- * *
- * Comments: *
- * *
- ******************************************************************************/
-static int send_config_data(int nodeid, int dest_nodeid, zbx_uint64_t maxlogid, int node_type)
-{
- DB_RESULT result;
- DB_RESULT result2;
- DB_ROW row;
- DB_ROW row2;
-
- char *xml = NULL, *hex = NULL;
- char fields[MAX_STRING_LEN];
- int offset=0;
- int allocated=1024;
-
- int found=0;
-
- int i, j, hex_allocated=1024, rowlen;
-
- xml=zbx_malloc(xml, allocated);
- hex=zbx_malloc(hex, hex_allocated);
-
- memset(xml,0,allocated);
-
-
- zabbix_log( LOG_LEVEL_DEBUG, "In send_config_data(nodeid:%d,dest_node:%d,maxlogid:" ZBX_FS_UI64 ",type:%d)",
- nodeid,
- dest_nodeid,
- maxlogid,
- node_type);
-
- /* Begin work */
- if(node_type == ZBX_NODE_MASTER)
- {
- result=DBselect("select tablename,recordid,operation from node_configlog where nodeid=%d and sync_master=0 and conflogid<=" ZBX_FS_UI64 " order by tablename,operation",
- nodeid,
- maxlogid);
- }
- else
- {
- result=DBselect("select tablename,recordid,operation from node_configlog where nodeid=%d and sync_slave=0 and conflogid<=" ZBX_FS_UI64 " order by tablename,operation",
- nodeid,
- maxlogid);
- }
-
- zbx_snprintf_alloc(&xml, &allocated, &offset, 128, "Data%c%d%c%d",
- ZBX_DM_DELIMITER,
- CONFIG_NODEID,
- ZBX_DM_DELIMITER,
- nodeid);
-
- while((row=DBfetch(result)))
- {
- found = 1;
-
- zabbix_log( LOG_LEVEL_DEBUG, "Fetched [%s,%s,%s]",row[0],row[1],row[2]);
- /* Special (simpler) processing for operation DELETE */
- if(atoi(row[2]) == NODE_CONFIGLOG_OP_DELETE)
- {
- zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "\n%s%c%s%c%s",
- row[0],
- ZBX_DM_DELIMITER,
- row[1],
- ZBX_DM_DELIMITER,
- row[2]);
- continue;
- }
- for(i=0;tables[i].table!=0;i++)
- {
- if(strcmp(tables[i].table, row[0])==0) break;
- }
-
- /* Found table */
- if(tables[i].table!=0)
- {
- fields[0]=0;
- /* for each field */
- for(j=0;tables[i].fields[j].name!=0;j++)
- {
- zbx_strlcat(fields,tables[i].fields[j].name,sizeof(fields));
- zbx_strlcat(fields,",",sizeof(fields));
-
- zbx_strlcat(fields,"length(",sizeof(fields));
- zbx_strlcat(fields,tables[i].fields[j].name,sizeof(fields));
- zbx_strlcat(fields,"),",sizeof(fields));
- }
- if(fields[0]!=0) fields[strlen(fields)-1]=0;
-
- result2=DBselect("select %s from %s where %s=%s",
- fields,
- row[0],
- tables[i].recid,
- row[1]);
-
- row2=DBfetch(result2);
-
- if(row2)
- {
- zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "\n%s%c%s%c%s",
- row[0],
- ZBX_DM_DELIMITER,
- row[1],
- ZBX_DM_DELIMITER,
- row[2]);
- /* for each field */
- for(j=0;tables[i].fields[j].name!=0;j++)
- {
- if( (tables[i].fields[j].flags & ZBX_SYNC) ==0) continue;
- /* Fieldname, type, value */
- if(DBis_null(row2[j*2]) == SUCCEED)
- {
-/* zabbix_log( LOG_LEVEL_WARNING, "Field name [%s] [%s]",tables[i].fields[j].name,row2[j*2]);*/
- zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "%c%s%c%d%cNULL",
- ZBX_DM_DELIMITER,
- tables[i].fields[j].name,
- ZBX_DM_DELIMITER,
- tables[i].fields[j].type,
- ZBX_DM_DELIMITER);
- }
- else
- {
- if(tables[i].fields[j].type == ZBX_TYPE_INT ||
- tables[i].fields[j].type == ZBX_TYPE_UINT ||
- tables[i].fields[j].type == ZBX_TYPE_ID ||
- tables[i].fields[j].type == ZBX_TYPE_FLOAT)
- {
- zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "%c%s%c%d%c%s",
- ZBX_DM_DELIMITER,
- tables[i].fields[j].name,
- ZBX_DM_DELIMITER,
- tables[i].fields[j].type,
- ZBX_DM_DELIMITER,
- row2[j*2]);
- }
- else
- {
- rowlen = atoi(row2[j*2+1]);
- zbx_binary2hex((u_char *)row2[j*2], rowlen, &hex, &hex_allocated);
- zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "%c%s%c%d%c%s",
- ZBX_DM_DELIMITER,
- tables[i].fields[j].name,
- ZBX_DM_DELIMITER,
- tables[i].fields[j].type,
- ZBX_DM_DELIMITER,
- hex);
- }
- }
- }
- }
- else
- {
- /* We assume that the record was just deleted, so we change operation to DELETE */
- zabbix_log( LOG_LEVEL_DEBUG, "Cannot select %s from table %s where %s=%s",
- fields,
- row[0],
- tables[i].recid,
- row[1]);
-
- zbx_snprintf_alloc(&xml, &allocated, &offset, 16*1024, "\n%s%c%s%c%d",
- row[0],
- ZBX_DM_DELIMITER,
- row[1],
- ZBX_DM_DELIMITER,
- NODE_CONFIGLOG_OP_DELETE);
- }
- DBfree_result(result2);
- }
- else
- {
- zabbix_log( LOG_LEVEL_WARNING, "Cannot find table [%s]",
- row[0]);
- }
- }
- zabbix_log( LOG_LEVEL_DEBUG, "DATA [%s]",
- xml);
- if( (found == 1) && send_to_node("configuration changes", dest_nodeid, nodeid, xml) == SUCCEED)
- {
- if(node_type == ZBX_NODE_MASTER)
- {
- DBexecute("update node_configlog set sync_master=1 where nodeid=%d and sync_master=0 and conflogid<=" ZBX_FS_UI64,
- nodeid,
- maxlogid);
- }
- else
- {
- DBexecute("update node_configlog set sync_slave=1 where nodeid=%d and sync_slave=0 and conflogid<=" ZBX_FS_UI64,
- nodeid,
- maxlogid);
- }
- }
-
- DBfree_result(result);
- zbx_free(xml);
- zbx_free(hex);
- /* Commit */
-
- return SUCCEED;
-}
-
-/******************************************************************************
- * *
* Function: get_slave_node *
* *
- * Purpose: send configuration changes to required node *
+ * Purpose: *
* *
* Parameters: *
* *
@@ -259,42 +47,35 @@ static int send_config_data(int nodeid, int dest_nodeid, zbx_uint64_t maxlogid,
* Comments: *
* *
******************************************************************************/
-static int get_slave_node(int nodeid)
+static int get_slave_node(int nodeid, int synked_nodeid)
{
DB_RESULT result;
DB_ROW row;
- int ret = 0;
- int m;
+ int master_nodeid;
zabbix_log( LOG_LEVEL_DEBUG, "In get_slave_node(%d)",
nodeid);
result = DBselect("select masterid from nodes where nodeid=%d",
- nodeid);
- row = DBfetch(result);
- if(row)
- {
- m = atoi(row[0]);
- if(m == CONFIG_NODEID)
- {
- ret = nodeid;
- }
- else if(m ==0)
- {
- ret = m;
- }
- else ret = get_slave_node(m);
- }
+ synked_nodeid);
+ if (NULL != (row = DBfetch(result)))
+ master_nodeid = atoi(row[0]);
+ else
+ master_nodeid = 0;
DBfree_result(result);
- return ret;
+ if (master_nodeid == 0)
+ return 0;
+ if (master_nodeid == nodeid)
+ return synked_nodeid;
+ return get_slave_node(nodeid, master_nodeid);
}
/******************************************************************************
* *
* Function: get_master_node *
* *
- * Purpose: send configuration changes to required node *
+ * Purpose: *
* *
* Parameters: *
* *
@@ -316,10 +97,8 @@ int get_master_node(int nodeid)
nodeid);
result = DBselect("select masterid from nodes where nodeid=%d",
- CONFIG_NODEID);
- row = DBfetch(result);
- if(row)
- {
+ nodeid);
+ if (NULL != (row = DBfetch(result))) {
ret = atoi(row[0]);
}
DBfree_result(result);
@@ -343,118 +122,231 @@ int get_master_node(int nodeid)
* Comments: *
* *
******************************************************************************/
-static int send_to_master_and_slave(int nodeid)
+char *get_config_data(int synked_nodeid, int dest_nodetype)
{
DB_RESULT result;
+ DB_RESULT result2;
DB_ROW row;
- int master_nodeid,
- slave_nodeid,
- master_result = FAIL,
- slave_result = FAIL;
- zbx_uint64_t maxlogid;
-
- zabbix_log( LOG_LEVEL_DEBUG, "In send_to_master_and_slave(node:%d)",
- nodeid);
-
- result = DBselect("select max(conflogid) from node_configlog where nodeid=%d",
- nodeid);
-
- row = DBfetch(result);
-
- if(row && DBis_null(row[0]) == SUCCEED)
- {
- zabbix_log( LOG_LEVEL_DEBUG, "No configuration changes of node %d",
- nodeid);
- DBfree_result(result);
- return SUCCEED;
- }
- ZBX_STR2UINT64(maxlogid,row[0]);
- DBfree_result(result);
-
-
- master_nodeid=get_master_node(nodeid);
- slave_nodeid=get_slave_node(nodeid);
+ DB_ROW row2;
- if(master_nodeid != 0)
- {
- master_result = send_config_data(nodeid, master_nodeid, maxlogid, ZBX_NODE_MASTER);
- }
+ char *data = NULL, *hex = NULL, *sql = NULL, c, sync[131], *s, *r[2], *d[2];
+ int data_offset=0, sql_offset = 0;
+ int data_allocated=1024, hex_allocated=1024, sql_allocated=8*1024;
+ int t, f, j, rowlen, found = 0;
+
+ zabbix_log( LOG_LEVEL_DEBUG, "In get_config_data(synked_node:%d,dest_nodetype:%s)",
+ synked_nodeid,
+ dest_nodetype == ZBX_NODE_MASTER ? "MASTER" : "SLAVE");
+
+ data = zbx_malloc(data, data_allocated);
+ hex = zbx_malloc(hex, hex_allocated);
+ sql = zbx_malloc(sql, sql_allocated);
+ c = '1';
+
+ /* Find updated records */
+ result = DBselect("select curr.tablename,curr.recordid,prev.cksum,curr.cksum,prev.sync "
+ "from node_cksum curr, node_cksum prev "
+ "where curr.nodeid=%1$d and prev.nodeid=%1$d and "
+ "curr.tablename=prev.tablename and curr.recordid=prev.recordid and "
+ "curr.cksumtype=%3$d and prev.cksumtype=%2$d "
+ /*" and curr.tablename='hosts' "*/
+ "union all "
+ /* Find new records */
+ "select curr.tablename,curr.recordid,prev.cksum,curr.cksum,curr.sync "
+ "from node_cksum curr left join node_cksum prev "
+ "on prev.nodeid=%1$d and prev.tablename=curr.tablename and "
+ "prev.recordid=curr.recordid and prev.cksumtype=%2$d "
+ "where curr.nodeid=%1$d and curr.cksumtype=%3$d and prev.tablename is null "
+ /*" and curr.tablename='hosts' "*/
+ "union all "
+ /* Find deleted records */
+ "select prev.tablename,prev.recordid,prev.cksum,curr.cksum,prev.sync "
+ "from node_cksum prev left join node_cksum curr "
+ "on prev.nodeid=curr.nodeid and curr.nodeid=%1$d and curr.tablename=prev.tablename and "
+ "curr.recordid=prev.recordid and curr.cksumtype=%3$d "
+ "where prev.nodeid=%1$d and prev.cksumtype=%2$d and curr.tablename is null"
+ /*" and prev.tablename='hosts' "*/,
+ synked_nodeid,
+ NODE_CKSUM_TYPE_OLD, /* prev */
+ NODE_CKSUM_TYPE_NEW); /* curr */
+
+ zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "Data%c%d%c%d\n",
+ ZBX_DM_DELIMITER,
+ CONFIG_NODEID,
+ ZBX_DM_DELIMITER,
+ synked_nodeid);
- if(slave_nodeid != 0)
- {
- slave_result = send_config_data(nodeid, slave_nodeid, maxlogid, ZBX_NODE_SLAVE);
- }
+ while (NULL != (row = DBfetch(result))) {
+ for (t = 0; tables[t].table != 0 && strcmp(tables[t].table, row[0]) != 0; t++)
+ ;
- if( (master_nodeid!=0) && (slave_nodeid != 0))
- {
- if((master_result == SUCCEED) && (slave_result == SUCCEED))
- {
- DBexecute("delete from node_configlog where nodeid=%d and sync_slave=1 and sync_master=1 and conflogid<=" ZBX_FS_UI64,
- nodeid,
- maxlogid);
+ /* Found table */
+ if (tables[t].table == 0) {
+ zabbix_log( LOG_LEVEL_WARNING, "Cannot find table [%s]",
+ row[0]);
+ continue;
}
- }
- if(master_nodeid!=0)
- {
- if(master_result == SUCCEED)
- {
- DBexecute("delete from node_configlog where nodeid=%d and sync_master=1 and conflogid<=" ZBX_FS_UI64,
- nodeid,
- maxlogid);
- }
- }
+ if (DBis_null(row[4]) == FAIL)
+ strcpy(sync, row[4]);
+ else
+ memset(sync, ' ', sizeof(sync));
+ s = sync;
- if(slave_nodeid!=0)
- {
- if(slave_result == SUCCEED)
- {
- DBexecute("delete from node_configlog where nodeid=%d and sync_slave=1 and conflogid<=" ZBX_FS_UI64,
- nodeid,
- maxlogid);
+ /* Special (simpler) processing for operation DELETE */
+ if (DBis_null(row[2]) == FAIL && DBis_null(row[3]) == SUCCEED &&
+ ((dest_nodetype == ZBX_NODE_SLAVE && *s != c) ||
+ (dest_nodetype == ZBX_NODE_MASTER && *(s+1) != c))) {
+ zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "%s%c%s%c%d\n",
+ row[0],
+ ZBX_DM_DELIMITER,
+ row[1],
+ ZBX_DM_DELIMITER,
+ NODE_CONFIGLOG_OP_DELETE);
+ found = 1;
+ continue;
}
- }
-
- return SUCCEED;
-}
-
-
-/******************************************************************************
- * *
- * Function: process_node *
- * *
- * Purpose: select all related nodes and send config changes *
- * *
- * Parameters: *
- * *
- * Return value: SUCCESS - processed succesfully *
- * FAIL - an error occured *
- * *
- * Author: Alexei Vladishev *
- * *
- * Comments: *
- * *
- ******************************************************************************/
-static int process_node(int nodeid)
-{
- DB_RESULT result;
- DB_ROW row;
- zabbix_log( LOG_LEVEL_DEBUG, "In process_node(node:%d)",
- nodeid);
+ r[0] = row[2];
+ r[1] = row[3];
+ f = 0;
+ sql_offset = 0;
+ s += 2;
+
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "select ");
+ do {
+ if ((tables[t].fields[f].flags & ZBX_SYNC) == 0)
+ f++;
+
+ if (strcmp(tables[t].recid, tables[t].fields[f].name) == 0)
+ f++;
+
+ d[0] = NULL;
+ d[1] = NULL;
+ if (NULL != r[0] && NULL != (d[0] = strchr(r[0], ZBX_CKSUM_DELIMITER)))
+ *d[0] = '\0';
+ if (NULL != r[1] && NULL != (d[1] = strchr(r[1], ZBX_CKSUM_DELIMITER)))
+ *d[1] = '\0';
+
+ if (r[0] == NULL || r[1] == NULL || (dest_nodetype == ZBX_NODE_SLAVE && *s != c) ||
+ (dest_nodetype == ZBX_NODE_MASTER && *(s+1) != c) || strcmp(r[0], r[1]) != 0) {
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "%s,length(%1$s),",
+ tables[t].fields[f].name);
+ }
+ s += 2;
+
+ if (d[0] != NULL) {
+ *d[0] = ZBX_CKSUM_DELIMITER;
+ r[0] = d[0] + 1;
+ }
+ if (d[1] != NULL) {
+ *d[1] = ZBX_CKSUM_DELIMITER;
+ r[1] = d[1] + 1;
+ }
+
+ if (d[0] == NULL && d[1] == NULL)
+ break;
+ f++;
+ } while (1);
+
+ if (sql[sql_offset-1] != ',')
+ continue;
+
+ sql_offset--;
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, " from %s where %s=%s",
+ row[0],
+ tables[t].recid,
+ row[1]);
+
+ result2 = DBselect("%s", sql);
+ if (NULL == (row2=DBfetch(result2)))
+ goto out;
+
+ zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "%s%c%s%c%d",
+ row[0],
+ ZBX_DM_DELIMITER,
+ row[1],
+ ZBX_DM_DELIMITER,
+ NODE_CONFIGLOG_OP_UPDATE);
+
+ r[0] = row[2];
+ r[1] = row[3];
+ s = sync + 2;
+ f = 0;
+ j = 0;
+
+ do {
+ if ((tables[t].fields[f].flags & ZBX_SYNC) == 0)
+ f++;
+
+ if (strcmp(tables[t].recid, tables[t].fields[f].name) == 0)
+ f++;
+
+ d[0] = NULL;
+ d[1] = NULL;
+ if (NULL != r[0] && NULL != (d[0] = strchr(r[0], ZBX_CKSUM_DELIMITER)))
+ *d[0] = '\0';
+ if (NULL != r[1] && NULL != (d[1] = strchr(r[1], ZBX_CKSUM_DELIMITER)))
+ *d[1] = '\0';
+
+ if (r[0] == NULL || r[1] == NULL || (dest_nodetype == ZBX_NODE_SLAVE && *s != c) ||
+ (dest_nodetype == ZBX_NODE_MASTER && *(s+1) != c) || strcmp(r[0], r[1]) != 0) {
+
+ zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "%c%s%c%d%c",
+ ZBX_DM_DELIMITER,
+ tables[t].fields[f].name,
+ ZBX_DM_DELIMITER,
+ tables[t].fields[f].type,
+ ZBX_DM_DELIMITER);
+
+ /* Fieldname, type, value */
+ if (DBis_null(row2[j*2]) == SUCCEED) {
+ zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "NULL");
+ } else if(tables[t].fields[f].type == ZBX_TYPE_INT ||
+ tables[t].fields[f].type == ZBX_TYPE_UINT ||
+ tables[t].fields[f].type == ZBX_TYPE_ID ||
+ tables[t].fields[f].type == ZBX_TYPE_FLOAT) {
+
+ zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "%s", row2[j*2]);
+ } else {
+ rowlen = atoi(row2[j*2+1]);
+ zbx_binary2hex((u_char *)row2[j*2], rowlen, &hex, &hex_allocated);
+ zbx_snprintf_alloc(&data, &data_allocated, &data_offset, strlen(hex)+128, "%s", hex);
+/*zabbix_log(LOG_LEVEL_CRIT, "----- [field:%s][type:%d][row:%s][hex:%s]",tables[t].fields[f].name,tables[t].fields[f].type,row2[j*2],hex);*/
+ }
+ found = 1;
+ j++;
+ }
+ s += 2;
+
+ if (d[0] != NULL) {
+ *d[0] = ZBX_CKSUM_DELIMITER;
+ r[0] = d[0] + 1;
+ }
+ if (d[1] != NULL) {
+ *d[1] = ZBX_CKSUM_DELIMITER;
+ r[1] = d[1] + 1;
+ }
+
+ if (d[0] == NULL && d[1] == NULL)
+ break;
+ f++;
+ } while (1);
+ zbx_snprintf_alloc(&data, &data_allocated, &data_offset, 128, "\n");
+out:
+ DBfree_result(result2);
+ }
+ DBfree_result(result);
- send_to_master_and_slave(nodeid);
+ zbx_free(hex);
+ zbx_free(sql);
- result = DBselect("select nodeid from nodes where masterid=%d and nodeid not in (%d)",
- nodeid,
- nodeid);
- while((row=DBfetch(result)))
- {
- process_node(atoi(row[0]));
+ if (0 == found) {
+ zbx_free(data);
+ data = NULL;
}
- DBfree_result(result);
- return SUCCEED;
+ return data;
}
/******************************************************************************
@@ -472,29 +364,40 @@ static int process_node(int nodeid)
* Comments: never returns *
* *
******************************************************************************/
-void main_nodesender()
+void main_nodesender(int synked_nodeid, int *synked_slave, int *synked_master)
{
DB_RESULT result;
DB_ROW row;
+ int nodeid, slave_nodeid, master_nodeid;
+ char *data;
+
+ zabbix_log(LOG_LEVEL_DEBUG, "In main_nodesender()");
- zabbix_log( LOG_LEVEL_DEBUG, "In main_nodesender()");
+ *synked_slave = FAIL;
+ *synked_master = FAIL;
result = DBselect("select nodeid from nodes where nodetype=%d",
ZBX_NODE_TYPE_LOCAL);
- row = DBfetch(result);
-
- if(row)
- {
- if(CONFIG_NODEID != atoi(row[0]))
- {
- zabbix_log( LOG_LEVEL_WARNING, "NodeID does not match configuration settings. Processing of the node is disabled.");
- }
- else
- {
- process_node(atoi(row[0]));
+ if (NULL != (row = DBfetch(result))) {
+ nodeid = atoi(row[0]);
+ if (CONFIG_NODEID != nodeid) {
+ zabbix_log(LOG_LEVEL_WARNING, "NodeID does not match configuration settings."
+ " Processing of the node is disabled.");
+ } else {
+ slave_nodeid = get_slave_node(nodeid, synked_nodeid);
+ master_nodeid = CONFIG_MASTER_NODEID;
+ if (0 != slave_nodeid && NULL != (data = get_config_data(synked_nodeid, ZBX_NODE_SLAVE))) {
+/*zabbix_log(LOG_LEVEL_CRIT, "-----> [%s]", data);*/
+ *synked_slave = send_to_node("configuration changes", slave_nodeid, synked_nodeid, data);
+ zbx_free(data);
+ }
+ if (0 != master_nodeid && NULL != (data = get_config_data(synked_nodeid, ZBX_NODE_MASTER))) {
+/*zabbix_log(LOG_LEVEL_CRIT, "-----> [%s]", data);*/
+ *synked_master = send_to_node("configuration changes", master_nodeid, synked_nodeid, data);
+ zbx_free(data);
+ }
}
}
-
DBfree_result(result);
}
diff --git a/src/zabbix_server/nodewatcher/nodesender.h b/src/zabbix_server/nodewatcher/nodesender.h
index ff9ba5c7..30c9f346 100644
--- a/src/zabbix_server/nodewatcher/nodesender.h
+++ b/src/zabbix_server/nodewatcher/nodesender.h
@@ -20,7 +20,7 @@
#ifndef ZABBIX_NODESENDER_H
#define ZABBIX_NODESENDER_H
-void main_nodesender();
+void main_nodesender(int nodeid, int *synked_slave, int *synked_master);
int get_master_node(int nodeid);
#endif
diff --git a/src/zabbix_server/nodewatcher/nodewatcher.c b/src/zabbix_server/nodewatcher/nodewatcher.c
index d4534ca3..486148ef 100644
--- a/src/zabbix_server/nodewatcher/nodewatcher.c
+++ b/src/zabbix_server/nodewatcher/nodewatcher.c
@@ -18,7 +18,6 @@
**/
#include "common.h"
-
#include "cfg.h"
#include "db.h"
#include "log.h"
@@ -46,129 +45,109 @@
* Comments: *
* *
******************************************************************************/
-static int calculate_checksums()
+int calculate_checksums(int nodeid, const char *tablename, const zbx_uint64_t id)
{
+ char *sql = NULL;
+ int sql_allocated = 16*1024, sql_offset = 0;
+ int t, f, res = SUCCEED;
- char *sql = NULL;
- int sql_allocated, sql_offset;
-
- int i = 0;
- int j;
- DB_RESULT result;
- DB_RESULT result2;
- DB_ROW row;
- DB_ROW row2;
- int nodeid;
-
- int now;
-
- zabbix_log( LOG_LEVEL_DEBUG, "In calculate_checksums");
+ zabbix_log(LOG_LEVEL_DEBUG, "In calculate_checksums");
- DBexecute("delete from node_cksum where cksumtype=%d",
- NODE_CKSUM_TYPE_NEW);
+ sql = zbx_malloc(sql, sql_allocated);
- /* Select all nodes */
- result =DBselect("select nodeid from nodes");
- while((row=DBfetch(result)))
- {
- sql_allocated=64*1024;
- sql_offset=0;
- sql=zbx_malloc(sql, sql_allocated);
-
- now = time(NULL);
- nodeid = atoi(row[0]);
-
- zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128,
- "select 'table ','field ',itemid, '012345678901234' from items where 1=0\n");
+ for (t = 0; tables[t].table != 0; t++) {
+ /* Do not sync some of tables */
+ if ((tables[t].flags & ZBX_SYNC) == 0)
+ continue;
- for(i=0;tables[i].table!=0;i++)
- {
-/* zabbix_log( LOG_LEVEL_WARNING, "In calculate_checksums2 [%s]", tables[i].table ); */
- /* Do not sync some of tables */
- if( (tables[i].flags & ZBX_SYNC) ==0) continue;
+ if (NULL != tablename && 0 != strcmp(tablename, tables[t].table))
+ continue;
- zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096,
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512,
#ifdef HAVE_MYSQL
- "union all select '%s','%s',%s,md5(concat(",
+ "%s select %d,'%s',%s,%d,concat(",
#else
- "union all select '%s','%s',%s,md5(",
+ "%s select %d,'%s',%s,%d,",
#endif
- tables[i].table,
- tables[i].recid,
- tables[i].recid);
-
- j=0;
- while(tables[i].fields[j].name != 0)
- {
- if( (tables[i].fields[j].flags & ZBX_SYNC) ==0)
- {
- j++;
- continue;
- }
-#ifdef HAVE_MYSQL
- zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "coalesce(%s,'1234567890'),",
- tables[i].fields[j].name);
-#else
- if(tables[i].fields[j].type == ZBX_TYPE_BLOB) /* postgresql is not work: coalesce(blob,'1234567890')||coalesce(varchar,'1234567890') */
- {
- zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "md5(coalesce(%s,'1234567890'))||",
- tables[i].fields[j].name);
+ sql_offset > 0 ? "union all" : "insert into node_cksum (nodeid,tablename,recordid,cksumtype,cksum)",
+ nodeid,
+ tables[t].table,
+ tables[t].recid,
+ NODE_CKSUM_TYPE_NEW);
+
+ for (f = 0; tables[t].fields[f].name != 0; f ++) {
+ if ((tables[t].fields[f].flags & ZBX_SYNC) == 0)
+ continue;
+
+ if (strcmp(tables[t].recid, tables[t].fields[f].name) == 0)
+ continue;
+
+ if (tables[t].fields[f].flags & ZBX_NOTNULL) {
+ switch ( tables[t].fields[f].type ) {
+ case ZBX_TYPE_ID :
+ case ZBX_TYPE_INT :
+ case ZBX_TYPE_UINT :
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128,
+ "%s",
+ tables[t].fields[f].name);
+ break;
+ default :
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128,
+ "md5(%s)",
+ tables[t].fields[f].name);
+ break;
}
- else
- {
- zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "coalesce(%s,'1234567890')||",
- tables[i].fields[j].name);
+ } else {
+ switch ( tables[t].fields[f].type ) {
+ case ZBX_TYPE_ID :
+ case ZBX_TYPE_INT :
+ case ZBX_TYPE_UINT :
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128,
+ "case when %s is null then 'NULL' else %1$s end",
+ tables[t].fields[f].name);
+ break;
+ default :
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128,
+ "case when %s is null then 'NULL' else md5(%1$s) end",
+ tables[t].fields[f].name);
+ break;
}
-#endif
- j++;
}
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 16,
#ifdef HAVE_MYSQL
- if(j>0) sql_offset--; /* Remove last */
+ ",'%c',",
#else
- if(j>0) sql_offset-=2; /* Remove last */
+ "||'%c'||",
#endif
+ ZBX_CKSUM_DELIMITER);
+ }
- /* select table,recid,md5(fields) from table union all ... */
- zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 4096,
+ /* remove last delimiter */
+ if (f > 0) {
#ifdef HAVE_MYSQL
- ")) from %s where %s>=" ZBX_FS_UI64 " and %s<=" ZBX_FS_UI64 "\n",
+ sql_offset -= 5;
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 16, ")");
#else
- ") from %s where %s>=" ZBX_FS_UI64 " and %s<=" ZBX_FS_UI64 "\n",
+ sql_offset -= 7;
#endif
- tables[i].table,
- tables[i].recid,
- (zbx_uint64_t)__UINT64_C(100000000000000)*(zbx_uint64_t)nodeid,
- tables[i].recid,
- (zbx_uint64_t)__UINT64_C(100000000000000)*(zbx_uint64_t)nodeid+__UINT64_C(99999999999999));
-
}
-/* zabbix_log( LOG_LEVEL_WARNING, "SQL DUMP [%s]", sql);*/
- result2 =DBselect("%s",sql);
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 512,
+ " from %s where"ZBX_COND_NODEID,
+ tables[t].table,
+ ZBX_NODE(tables[t].recid,nodeid));
-/* zabbix_log( LOG_LEVEL_WARNING, "Selected records in %d seconds", time(NULL)-now);*/
- now = time(NULL);
- i=0;
- while((row2=DBfetch(result2)))
- {
- DBexecute("insert into node_cksum (cksumid,nodeid,tablename,fieldname,recordid,cksumtype,cksum) "\
- "values (" ZBX_FS_UI64 ",%d,'%s','%s',%s,%d,'%s')",
-/* DBget_nextid("node_cksum","cksumid"),*/
- DBget_maxid("node_cksum","cksumid"),
- nodeid,
- row2[0],
- row2[1],
- row2[2],
- NODE_CKSUM_TYPE_NEW,
- row2[3]);
- i++;
+ if (0 != id) {
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128,
+ "and %s="ZBX_FS_UI64,
+ tables[t].recid,
+ id);
}
- DBfree_result(result2);
- zbx_free(sql);
+ zbx_snprintf_alloc(&sql, &sql_allocated, &sql_offset, 128, "\n");
}
- DBfree_result(result);
-
- return SUCCEED;
+ if (DBexecute("%s", sql) < ZBX_DB_OK)
+ res = FAIL;
+ return res;
}
/******************************************************************************
@@ -187,102 +166,237 @@ static int calculate_checksums()
* Comments: *
* *
******************************************************************************/
-static int update_checksums()
+int update_checksums(int nodeid, int synked_slave, int synked_master, const char *tablename, const zbx_uint64_t id, char *fields)
{
- DBexecute("delete from node_cksum where cksumtype=%d",
- NODE_CKSUM_TYPE_OLD);
- DBexecute("update node_cksum set cksumtype=%d where cksumtype=%d",
- NODE_CKSUM_TYPE_OLD,
- NODE_CKSUM_TYPE_NEW);
+ char *r[2], *d[2], sync[131], *s;
+ char cs, cm, sql[2][256];
+ DB_RESULT result;
+ DB_ROW row;
+ int t, f;
+
+ zabbix_log(LOG_LEVEL_DEBUG, "In update_checksums");
+
+ cs = synked_slave == SUCCEED ? '1' : ' ';
+ cm = synked_master == SUCCEED ? '1' : ' ';
+
+ if (NULL != tablename) {
+ zbx_snprintf(sql[0], sizeof(sql[0]), " and curr.tablename='%s' and curr.recordid="ZBX_FS_UI64,
+ tablename, id);
+ zbx_snprintf(sql[1], sizeof(sql[1]), " and prev.tablename='%s' and prev.recordid="ZBX_FS_UI64,
+ tablename, id);
+ } else {
+ *sql[0] = '\0';
+ *sql[1] = '\0';
+ }
+
+ /* Find updated records */
+ result = DBselect("select curr.tablename,curr.recordid,prev.cksum,curr.cksum,prev.sync "
+ "from node_cksum curr, node_cksum prev "
+ "where curr.nodeid=%1$d and prev.nodeid=%1$d and "
+ "curr.tablename=prev.tablename and curr.recordid=prev.recordid and "
+ "curr.cksumtype=%3$d and prev.cksumtype=%2$d%4$s "
+ "union all "
+ /* Find new records */
+ "select curr.tablename,curr.recordid,prev.cksum,curr.cksum,NULL "
+ "from node_cksum curr left join node_cksum prev "
+ "on prev.nodeid=%1$d and prev.tablename=curr.tablename and "
+ "prev.recordid=curr.recordid and prev.cksumtype=%2$d "
+ "where curr.nodeid=%1$d and curr.cksumtype=%3$d and prev.tablename is null%4$s "
+ "union all "
+ /* Find deleted records */
+ "select prev.tablename,prev.recordid,prev.cksum,curr.cksum,prev.sync "
+ "from node_cksum prev left join node_cksum curr "
+ "on prev.nodeid=curr.nodeid and curr.nodeid=%1$d and curr.tablename=prev.tablename and "
+ "curr.recordid=prev.recordid and curr.cksumtype=%3$d "
+ "where prev.nodeid=%1$d and prev.cksumtype=%2$d and curr.tablename is null%5$s",
+ nodeid,
+ NODE_CKSUM_TYPE_OLD, /* prev */
+ NODE_CKSUM_TYPE_NEW, /* curr */
+ sql[0],
+ sql[1]);
+
+ while (NULL != (row = DBfetch(result))) {
+ for (t = 0; tables[t].table != 0 && strcmp(tables[t].table, row[0]) != 0; t++)
+ ;
+
+ /* Found table */
+ if (tables[t].table == 0) {
+ zabbix_log(LOG_LEVEL_WARNING, "Cannot find table [%s]",
+ row[0]);
+ continue;
+ }
+
+ if (DBis_null(row[4]) == FAIL)
+ strcpy(sync, row[4]);
+ else
+ memset(sync, ' ', sizeof(sync));
+ s = sync;
+
+ /* Special (simpler) processing for operation DELETE */
+ if (DBis_null(row[3]) == SUCCEED) {
+ if (synked_slave == SUCCEED && *s != cs)
+ *s = cs;
+ if (synked_master == SUCCEED && *(s+1) != cm)
+ *(s+1) = cm;
+ s += 2;
+ } else {
+ r[0] = row[2];
+ r[1] = row[3];
+ s += 2;
+ f = 0;
+
+ do {
+ if ((tables[t].fields[f].flags & ZBX_SYNC) == 0)
+ f++;
+
+ if (strcmp(tables[t].recid, tables[t].fields[f].name) == 0)
+ f++;
+
+ d[0] = NULL;
+ d[1] = NULL;
+ if (NULL != r[0] && NULL != (d[0] = strchr(r[0], ZBX_CKSUM_DELIMITER)))
+ *d[0] = '\0';
+ if (NULL != r[1] && NULL != (d[1] = strchr(r[1], ZBX_CKSUM_DELIMITER)))
+ *d[1] = '\0';
+
+ if (NULL == tablename || SUCCEED == str_in_list(fields, tables[t].fields[f].name, ',')) {
+ if (r[0] == NULL || r[1] == NULL || strcmp(r[0], r[1]) != 0) {
+ *s = cs;
+ *(s+1) = cm;
+ } else {
+ if (synked_slave == SUCCEED && *s != cs)
+ *s = cs;
+ if (synked_master == SUCCEED && *(s+1) != cm)
+ *(s+1) = cm;
+ }
+ }
+ s += 2;
+
+ if (d[0] != NULL) {
+ *d[0] = ZBX_CKSUM_DELIMITER;
+ r[0] = d[0] + 1;
+ }
+ if (d[1] != NULL) {
+ *d[1] = ZBX_CKSUM_DELIMITER;
+ r[1] = d[1] + 1;
+ }
+
+ if (d[0] == NULL && d[1] == NULL)
+ break;
+ f++;
+ } while (1);
+ }
+ *s = '\0';
+
+ if (DBis_null(row[2]) == SUCCEED || DBis_null(row[3]) == SUCCEED ||
+ strcmp(row[4], sync) != 0 || strcmp(row[2], row[3]) != 0) {
+ DBexecute("update node_cksum set cksumtype=%d,cksum=\'%s\',sync=\'%s\' "
+ "where nodeid=%d and tablename=\'%s\' and recordid=%s and cksumtype=%d",
+ NODE_CKSUM_TYPE_OLD,
+ DBis_null(row[3]) == SUCCEED ? row[2] : row[3],
+ sync,
+ nodeid,
+ row[0],
+ row[1],
+ DBis_null(row[2]) == SUCCEED ? NODE_CKSUM_TYPE_NEW : NODE_CKSUM_TYPE_OLD);
+ }
+ }
+ DBfree_result(result);
return SUCCEED;
}
/******************************************************************************
* *
- * Function: compare_checksums *
+ * Function: lock_node *
* *
- * Purpose: compare new checksums with old ones. Write difference to *
- * table 'node_config' *
+ * Purpose: *
* *
* Parameters: *
* *
- * Return value: SUCCESS - calculated succesfully *
- * FAIL - an error occured *
+ * Return value: *
* *
- * Author: Alexei Vladishev *
+ * Author: Aleksander Vladishev *
* *
* Comments: *
* *
******************************************************************************/
-static int compare_checksums()
+int lock_sync_node(int nodeid)
{
DB_RESULT result;
DB_ROW row;
-
- /* Find updated records */
- result = DBselect("select curr.nodeid,curr.tablename,curr.recordid from node_cksum prev, node_cksum curr where curr.tablename=prev.tablename and curr.recordid=prev.recordid and curr.fieldname=prev.fieldname and curr.nodeid=prev.nodeid and curr.cksum<>prev.cksum and curr.cksumtype=%d and prev.cksumtype=%d",
- NODE_CKSUM_TYPE_NEW,
- NODE_CKSUM_TYPE_OLD);
- while((row=DBfetch(result)))
- {
- zabbix_log( LOG_LEVEL_DEBUG, "Adding record to node_configlog NODE_CONFIGLOG_OP_UPDATE");
- DBexecute("insert into node_configlog (conflogid,nodeid,tablename,recordid,operation)" \
- "values (" ZBX_FS_UI64 ",%s,'%s',%s,%d)",
-/* DBget_nextid("node_configlog","conflogid"),*/
- DBget_maxid("node_configlog","conflogid"),
- row[0],
- row[1],
- row[2],
- NODE_CONFIGLOG_OP_UPDATE);
+ int sync;
+ int res = FAIL;
+
+ if (DBexecute("update nodes set sync=sync+1 where nodeid=%d",
+ nodeid) >= ZBX_DB_OK) {
+
+ result = DBselect("select sync from nodes where nodeid=%d",
+ nodeid);
+ if (NULL != (row=DBfetch(result))) {
+ sync = atoi(row[0]);
+ if (sync == 1 || sync > 25) {
+ if (DBexecute("delete from node_cksum where nodeid=%d and cksumtype=%d",
+ nodeid,
+ NODE_CKSUM_TYPE_NEW) >= ZBX_DB_OK) {
+ res = SUCCEED;
+ }
+ }
+ }
+ DBfree_result(result);
}
- DBfree_result(result);
+ return res;
+}
- /* Find new records */
- result = DBselect("select curr.nodeid,curr.tablename,curr.recordid from node_cksum curr" \
- " left join node_cksum prev" \
- " on curr.tablename=prev.tablename and curr.recordid=prev.recordid and curr.fieldname=prev.fieldname and curr.nodeid=prev.nodeid and curr.cksumtype<>prev.cksumtype" \
- " where prev.cksumid is null and curr.cksumtype=%d",
- NODE_CKSUM_TYPE_NEW);
+/******************************************************************************
+ * *
+ * Function: process_nodes *
+ * *
+ * Purpose: calculates checks sum of config data *
+ * *
+ * Parameters: *
+ * *
+ * Return value: *
+ * *
+ * Author: Aleksander Vladishev *
+ * *
+ * Comments: never returns *
+ * *
+ ******************************************************************************/
+void process_nodes()
+{
+ DB_RESULT result;
+ DB_ROW row;
+ int nodeid, synked_slave, synked_master;
+/* int now = time(NULL);*/
- while((row=DBfetch(result)))
- {
- zabbix_log( LOG_LEVEL_DEBUG, "Adding record to node_configlog NODE_CONFIGLOG_OP_ADD");
- DBexecute("insert into node_configlog (conflogid,nodeid,tablename,recordid,operation)" \
- "values (" ZBX_FS_UI64 ",%s,'%s',%s,%d)",
-/* DBget_nextid("node_configlog","conflogid"),*/
- DBget_maxid("node_configlog","conflogid"),
- row[0],
- row[1],
- row[2],
- NODE_CONFIGLOG_OP_ADD);
- }
- DBfree_result(result);
+/* DBbegin();*/
- /* Find deleted records */
- result = DBselect("select curr.nodeid,curr.tablename,curr.recordid from node_cksum curr" \
- " left join node_cksum prev" \
- " on curr.tablename=prev.tablename and curr.recordid=prev.recordid and curr.fieldname=prev.fieldname and curr.nodeid=prev.nodeid and curr.cksumtype<>prev.cksumtype" \
- " where prev.cksumid is null and curr.cksumtype=%d",
- NODE_CKSUM_TYPE_OLD);
+ /* Select all nodes */
+ result = DBselect("select nodeid from nodes");
+ while (NULL != (row=DBfetch(result))) {
+ nodeid = atoi(row[0]);
- while((row=DBfetch(result)))
- {
- zabbix_log( LOG_LEVEL_DEBUG, "Adding record to node_configlog NODE_CONFIGLOG_OP_DELETE");
- DBexecute("insert into node_configlog (conflogid,nodeid,tablename,recordid,operation)" \
- "values (" ZBX_FS_UI64 ",%s,'%s',%s,%d)",
-/* DBget_nextid("node_configlog","conflogid"),*/
- DBget_maxid("node_configlog","conflogid"),
- row[0],
- row[1],
- row[2],
- NODE_CONFIGLOG_OP_DELETE);
+ if (FAIL == lock_sync_node(nodeid))
+ continue;
+
+ if (FAIL == calculate_checksums(nodeid, NULL, 0))
+ continue;
+
+ /* Send configuration changes to required nodes */
+ main_nodesender(nodeid, &synked_slave, &synked_master);
+ if (synked_slave == SUCCEED || synked_master == SUCCEED)
+ update_checksums(nodeid, synked_slave, synked_master, NULL, 0, NULL);
+
+ DBexecute("update nodes set sync=0 where nodeid=%d",
+ nodeid);
}
DBfree_result(result);
- return SUCCEED;
-}
+/* DBcommit();*/
+/* zabbix_log(LOG_LEVEL_CRIT, "----- process_nodes [Selected records in %d seconds]", time(NULL)-now);*/
+}
/******************************************************************************
* *
@@ -316,15 +430,7 @@ int main_nodewatcher_loop()
if(lastrun + 120 < start)
{
-
- DBbegin();
- calculate_checksums();
- compare_checksums();
- update_checksums();
-
- /* Send configuration changes to required nodes */
- main_nodesender();
- DBcommit();
+ process_nodes();
lastrun = start;
}