diff options
author | alex <alex@97f52cf1-0a1b-0410-bd0e-c28be96e8082> | 2008-02-22 14:21:56 +0000 |
---|---|---|
committer | alex <alex@97f52cf1-0a1b-0410-bd0e-c28be96e8082> | 2008-02-22 14:21:56 +0000 |
commit | 2a35ecbf642a185e3b53f66f783272e03fe99e93 (patch) | |
tree | 64ebe097b211b7aa3f64d69103e3f4169973146e | |
parent | 7db4f3b57bdf9c1d32c28abd98e4e7ab713718e9 (diff) | |
download | zabbix-2a35ecbf642a185e3b53f66f783272e03fe99e93.tar.gz zabbix-2a35ecbf642a185e3b53f66f783272e03fe99e93.tar.xz zabbix-2a35ecbf642a185e3b53f66f783272e03fe99e93.zip |
- [DEV-117] support of data buffering on agent side (Alexei)
git-svn-id: svn://svn.zabbix.com/trunk@5386 97f52cf1-0a1b-0410-bd0e-c28be96e8082
-rw-r--r-- | ChangeLog | 1 | ||||
-rwxr-xr-x | go | 2 | ||||
-rw-r--r-- | include/zbxjson.h | 6 | ||||
-rw-r--r-- | src/zabbix_agent/Makefile.am | 3 | ||||
-rw-r--r-- | src/zabbix_agent/active.c | 233 | ||||
-rw-r--r-- | src/zabbix_sender/zabbix_sender.c | 10 | ||||
-rw-r--r-- | src/zabbix_server/trapper/active.c | 88 | ||||
-rw-r--r-- | src/zabbix_server/trapper/active.h | 2 | ||||
-rw-r--r-- | src/zabbix_server/trapper/trapper.c | 11 |
9 files changed, 286 insertions, 70 deletions
@@ -1,5 +1,6 @@ Changes for 1.5: + - [DEV-117] support of data buffering on agent side (Alexei) - [DEV-115] enhanced sender to use only one TCP connection (Alexei) - [DEV-112] added themes option in frontend (Artem) - [DEV-114] special processing of simple SNMP OIDs like ifDescr, ifInOctets, etc (Alexei) @@ -33,7 +33,7 @@ cd - #export CFLAGS="-Wall -pedantic" #for db in sqlite3 pgsql mysql; do for db in mysql; do - ./configure --enable-agent --enable-server --enable-proxy --with-jabber --with-ldap --with-libcurl --with-$db --with-net-snmp --prefix=`pwd` --enable-ipv6 2>>WARNINGS >/dev/null + ./configure --enable-agent --enable-server --with-jabber --with-ldap --with-libcurl --with-$db --with-net-snmp --prefix=`pwd` --enable-ipv6 2>>WARNINGS >/dev/null echo Cleaning... make clean 2>>WARNINGS >/dev/null echo Making... diff --git a/include/zbxjson.h b/include/zbxjson.h index 8fe235bc..a9bb33a9 100644 --- a/include/zbxjson.h +++ b/include/zbxjson.h @@ -24,6 +24,7 @@ #define ZBX_PROTO_TAG_CLOCK "clock" #define ZBX_PROTO_TAG_DATA "data" +#define ZBX_PROTO_TAG_DELAY "delay" #define ZBX_PROTO_TAG_HOST "host" #define ZBX_PROTO_TAG_INFO "info" #define ZBX_PROTO_TAG_KEY "key" @@ -39,8 +40,9 @@ #define ZBX_PROTO_VALUE_FAILED "failed" #define ZBX_PROTO_VALUE_SUCCESS "success" -#define ZBX_PROTO_VALUE_PROXY_CONFIG "proxy config" -#define ZBX_PROTO_VALUE_SENDER_DATA "sender data" +#define ZBX_PROTO_VALUE_GET_ACTIVE_CHECKS "active checks" +#define ZBX_PROTO_VALUE_PROXY_CONFIG "proxy config" +#define ZBX_PROTO_VALUE_SENDER_DATA "sender data" typedef enum { diff --git a/src/zabbix_agent/Makefile.am b/src/zabbix_agent/Makefile.am index 34cd0b90..7b4004b8 100644 --- a/src/zabbix_agent/Makefile.am +++ b/src/zabbix_agent/Makefile.am @@ -11,7 +11,8 @@ agents_ldadd = \ $(top_srcdir)/src/libs/zbxcomms/libzbxcomms.a \ $(top_srcdir)/src/libs/zbxconf/libzbxconf.a \ $(top_srcdir)/src/libs/zbxcommon/libzbxcommon.a \ - $(top_srcdir)/src/libs/zbxcrypto/libzbxcrypto.a + $(top_srcdir)/src/libs/zbxcrypto/libzbxcrypto.a \ + $(top_srcdir)/src/libs/zbxjson/libzbxjson.a sbin_PROGRAMS = \ zabbix_agent \ diff --git a/src/zabbix_agent/active.c b/src/zabbix_agent/active.c index 25c3ac36..e611cd32 100644 --- a/src/zabbix_agent/active.c +++ b/src/zabbix_agent/active.c @@ -27,6 +27,7 @@ #include "eventlog.h" #include "comms.h" #include "threads.h" +#include "zbxjson.h" #if defined(ZABBIX_SERVICE) # include "service.h" @@ -117,7 +118,7 @@ static void add_check(char *key, int refresh, long lastlogsize) { int i; - zabbix_log( LOG_LEVEL_DEBUG, "In add_check('%s', %i, %li)", key, refresh, lastlogsize); + zabbix_log( LOG_LEVEL_WARNING, "In add_check('%s', %i, %li)", key, refresh, lastlogsize); for(i=0; NULL != active_metrics[i].key; i++) { @@ -164,7 +165,7 @@ static void add_check(char *key, int refresh, long lastlogsize) * Return value: returns SUCCEED on succesfull parsing, * * FAIL on an incoorrect format of string * * * - * Author: Eugene Grigorjev * + * Author: Eugene Grigorjev, Alexei Vladishev (new json protocol) * * * * Comments: * * String reprents as "ZBX_EOF" termination list * @@ -175,58 +176,106 @@ static void add_check(char *key, int refresh, long lastlogsize) ******************************************************************************/ static int parse_list_of_checks(char *str) { - char *p, *pstrend, *refresh, *lastlogsize; + const char *p; + char key[MAX_STRING_LEN], delay[MAX_STRING_LEN], lastlogsize[MAX_STRING_LEN]; + char result[MAX_STRING_LEN]; + int ret = SUCCEED; + + struct zbx_json_parse jp; + struct zbx_json_parse jp_data, jp_row; - zabbix_log(LOG_LEVEL_DEBUG, "In parse_list_of_checks() [%s]", + zabbix_log(LOG_LEVEL_WARNING, "In parse_list_of_checks() [%s]", str); disable_all_metrics(); - while (NULL != str) { - if (NULL != (pstrend = strchr(str,'\n'))) - *pstrend = '\0'; /* prepare line */ - - zabbix_log(LOG_LEVEL_DEBUG, "Parsed [%s]", str); - - if (0 == strcmp(str, "ZBX_EOF")) - break; + if(SUCCEED == ret && SUCCEED != zbx_json_open(str, &jp)) + { + zabbix_log(LOG_LEVEL_WARNING, "Can't open jason object"); + ret = FAIL; + } + zabbix_log(LOG_LEVEL_WARNING, "A"); - refresh = NULL; - lastlogsize = NULL; + if(SUCCEED == ret && SUCCEED != zbx_json_value_by_name(&jp, ZBX_PROTO_TAG_RESPONSE, result, sizeof(result))) + { + zabbix_log(LOG_LEVEL_WARNING, "%s", + zbx_json_strerror()); + ret = FAIL; + } + zabbix_log(LOG_LEVEL_WARNING, "B"); - /* parse string from end of line */ - /* line format "key:refresh:lastlogsize" */ + if(SUCCEED == ret && 0 != strcmp(result,ZBX_PROTO_VALUE_SUCCESS)) + { + zabbix_log(LOG_LEVEL_WARNING, "Unsucesfull response received from server"); + ret = FAIL; + } + zabbix_log(LOG_LEVEL_WARNING, "C"); - p = (NULL != pstrend) ? pstrend : str + strlen(str); + if(SUCCEED == ret && NULL == (p = zbx_json_pair_by_name(&jp, ZBX_PROTO_TAG_DATA))) + { + zabbix_log(LOG_LEVEL_WARNING, "Can't find \"%s\" tag", + ZBX_PROTO_TAG_DATA); + ret = FAIL; + } + zabbix_log(LOG_LEVEL_WARNING, "D"); - /* Lastlogsize */ - for (; p != str; p--) { - if (*p == ':') { - *p = '\0'; + if(SUCCEED == ret && FAIL == zbx_json_brackets_open(p, &jp_data)) + { + zabbix_log(LOG_LEVEL_WARNING, "Can't proceed jason request. %s", + zbx_json_strerror()); + ret = FAIL; + } + zabbix_log(LOG_LEVEL_WARNING, "E"); - lastlogsize = p + 1; + if(SUCCEED == ret) + { + zabbix_log(LOG_LEVEL_WARNING, "F"); + p = NULL; + while (SUCCEED == ret && NULL != (p = zbx_json_next(&jp_data, p))) + { + zabbix_log(LOG_LEVEL_WARNING, "G1"); +/* {"request":"ZBX_SENDER_DATA","data":[{"key":"system.cpu.num",...,...},{...},...]} + * ^------------------------------^ + */ if (FAIL == (ret = zbx_json_brackets_open(p, &jp_row))) + { + zabbix_log(LOG_LEVEL_WARNING, "%s", + zbx_json_strerror()); + ret = FAIL; break; } - } + zabbix_log(LOG_LEVEL_WARNING, "G2"); - /* Refresh */ - for (; p != str; p--) { - if (*p == ':') { - *p = '\0'; + *delay = '\0'; + *key = '\0'; + *lastlogsize = '\0'; - refresh = p + 1; - break; + if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_KEY, key, sizeof(key))) + { + zabbix_log(LOG_LEVEL_WARNING, "Unable to retrieve value of tag \"%s\"", + ZBX_PROTO_TAG_KEY); + continue; } - } - if (str && refresh && lastlogsize) - add_check(str, atoi(refresh), atoi(lastlogsize)); + if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_DELAY, delay, sizeof(delay))) + { + zabbix_log(LOG_LEVEL_WARNING, "Unable to retrieve value of tag \"%s\"", + ZBX_PROTO_TAG_DELAY); + continue; + } - if (pstrend == NULL) - break; + if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_LOGLASTSIZE, lastlogsize, sizeof(lastlogsize))) + { + zabbix_log(LOG_LEVEL_WARNING, "Unable to retrieve value of tag \"%s\"", + ZBX_PROTO_TAG_LOGLASTSIZE); + continue; + } + zabbix_log(LOG_LEVEL_WARNING, "H"); - str = pstrend + 1; + if (*key && *delay && *lastlogsize) + add_check(key, atoi(delay), atoi(lastlogsize)); + } } + return SUCCEED; } @@ -242,7 +291,7 @@ static int parse_list_of_checks(char *str) * Return value: returns SUCCEED on succesfull parsing, * * FAIL on other cases * * * - * Author: Eugene Grigorjev * + * Author: Eugene Grigorjev, Alexei Vladishev (new json protocol) * * * * Comments: * * * @@ -255,24 +304,29 @@ static int get_active_checks( zbx_sock_t s; - char - *buf, - packet[MAX_BUF_LEN]; + char *buf; int ret; - zabbix_log( LOG_LEVEL_DEBUG, "get_active_checks('%s',%u)", host, port); + struct zbx_json json; + + zabbix_log( LOG_LEVEL_WARNING, "get_active_checks('%s',%u)", host, port); + + zbx_json_init(&json, 8*1024); + + zbx_json_addstring(&json, ZBX_PROTO_TAG_REQUEST, ZBX_PROTO_VALUE_GET_ACTIVE_CHECKS, ZBX_JSON_TYPE_STRING); + zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, CONFIG_HOSTNAME, ZBX_JSON_TYPE_STRING); if (SUCCEED == (ret = zbx_tcp_connect(&s, host, port, CONFIG_TIMEOUT))) { - zbx_snprintf(packet, sizeof(packet), "%s\n%s\n","ZBX_GET_ACTIVE_CHECKS", CONFIG_HOSTNAME); - zabbix_log(LOG_LEVEL_DEBUG, "Sending [%s]", packet); + zabbix_log(LOG_LEVEL_WARNING, "Sending [%s]", json.buffer); - if( SUCCEED == (ret = zbx_tcp_send(&s, packet)) ) + if( SUCCEED == (ret = zbx_tcp_send(&s, json.buffer)) ) { zabbix_log(LOG_LEVEL_DEBUG, "Before read"); if( SUCCEED == (ret = zbx_tcp_recv_ext(&s, &buf, ZBX_TCP_READ_UNTIL_CLOSE)) ) { + zabbix_log(LOG_LEVEL_WARNING, "Got [%s]", buf); parse_list_of_checks(buf); } } @@ -282,10 +336,66 @@ static int get_active_checks( if( FAIL == ret ) { zabbix_log(LOG_LEVEL_DEBUG, "Get active checks error: %s", zbx_tcp_strerror()); - } + } + + zbx_json_free(&json); + return ret; } +/****************************************************************************** + * * + * Function: check_response * + * * + * Purpose: Check if json response is SUCCEED * + * * + * Parameters: result SUCCEED or FAIL * + * * + * Return value: SUCCEED - processed successfully * + * FAIL - an error occured * + * * + * Author: Alexei Vladishev * + * * + * Comments: zabbix_sender has exactly the same function! * + * * + ******************************************************************************/ +static int check_response(char *response) +{ + struct zbx_json_parse jp; + const char *p; + char value[MAX_STRING_LEN]; + char info[MAX_STRING_LEN]; + + int ret = SUCCEED; + + ret = zbx_json_open(response, &jp); + + if(SUCCEED == ret) + { + if (NULL == (p = zbx_json_pair_by_name(&jp, ZBX_PROTO_TAG_RESPONSE)) + || NULL == zbx_json_decodevalue(p, value, sizeof(value))) + { + ret = FAIL; + } + } + + if(SUCCEED == ret) + { + if(strcmp(value, ZBX_PROTO_VALUE_SUCCESS) != 0) + { + ret = FAIL; + } + } + + if (NULL != (p = zbx_json_pair_by_name(&jp, ZBX_PROTO_TAG_INFO)) + && NULL != zbx_json_decodevalue(p, info, sizeof(info))) + { + zabbix_log(LOG_LEVEL_WARNING, "Info from server: %s", + info); + } + + return ret; +} /****************************************************************************** * * @@ -324,41 +434,54 @@ static int send_value( ) { zbx_sock_t s; - char *buf = NULL, - *request = NULL; + char *buf = NULL; int ret; + struct zbx_json json; if (SUCCEED == (ret = zbx_tcp_connect(&s, host, port, CONFIG_TIMEOUT))) { - request = comms_create_request(hostname, key, value, lastlogsize, timestamp, source, severity); + zbx_json_init(&json, 8*1024); + + zbx_json_addstring(&json, ZBX_PROTO_TAG_REQUEST, ZBX_PROTO_VALUE_SENDER_DATA, ZBX_JSON_TYPE_STRING); + zbx_json_addarray(&json, ZBX_PROTO_TAG_DATA); - zabbix_log(LOG_LEVEL_DEBUG, "XML before sending [%s]",request); + zbx_json_addobject(&json, NULL); + zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, hostname, ZBX_JSON_TYPE_STRING); + zbx_json_addstring(&json, ZBX_PROTO_TAG_KEY, key, ZBX_JSON_TYPE_STRING); + zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, value, ZBX_JSON_TYPE_STRING); + zbx_json_close(&json); - ret = zbx_tcp_send(&s, request); + zabbix_log(LOG_LEVEL_WARNING, "JSON before sending [%s]", + json.buffer); - zbx_free(request); + ret = zbx_tcp_send(&s, json.buffer); if( SUCCEED == ret ) { if( SUCCEED == (ret = zbx_tcp_recv(&s, &buf)) ) { - /* !!! REMOVE '\n' AT THE AND (always must be present) !!! */ - zbx_rtrim(buf, "\r\n\0"); - if(strcmp(buf,"OK") == 0) + if( !buf || check_response(buf) != SUCCEED ) { - zabbix_log( LOG_LEVEL_DEBUG, "OK"); + zabbix_log( LOG_LEVEL_WARNING, "NOT OK [%s:%s]", + hostname, + key); } else { - zabbix_log( LOG_LEVEL_DEBUG, "NOT OK [%s:%s] [%s]", host, key, buf); + zabbix_log( LOG_LEVEL_WARNING, "OK [%s:%s]", + hostname, + key); } } else zabbix_log(LOG_LEVEL_DEBUG, "Send value error: [recv] %s", zbx_tcp_strerror()); } else zabbix_log(LOG_LEVEL_DEBUG, "Send value error: [send] %s", zbx_tcp_strerror()); + + zbx_json_free(&json); zbx_tcp_close(&s); } else zabbix_log(LOG_LEVEL_DEBUG, "Send value error: [connect] %s", zbx_tcp_strerror()); + return ret; } diff --git a/src/zabbix_sender/zabbix_sender.c b/src/zabbix_sender/zabbix_sender.c index d7735c6c..a76ccfcc 100644 --- a/src/zabbix_sender/zabbix_sender.c +++ b/src/zabbix_sender/zabbix_sender.c @@ -197,8 +197,6 @@ static ZBX_THREAD_ENTRY(send_value, args) { ZBX_THREAD_SENDVAL_ARGS *sentdval_args; - char *tosend = NULL; - zbx_sock_t sock; char *answer = NULL; @@ -217,16 +215,8 @@ static ZBX_THREAD_ENTRY(send_value, args) #endif /* NOT _WINDOWS */ if (SUCCEED == (tcp_ret = zbx_tcp_connect(&sock, sentdval_args->server, sentdval_args->port, SENDER_TIMEOUT))) { -/* tosend = comms_create_request(sentdval_args->server, sentdval_args->server, sentdval_args->server, - NULL, NULL, NULL, NULL); - - zabbix_log( LOG_LEVEL_DEBUG, "Send data: '%s'", tosend);*/ - -/* tcp_ret = zbx_tcp_send(&sock, tosend); */ tcp_ret = zbx_tcp_send(&sock, sentdval_args->json.buffer); -/* zbx_free(tosend);*/ - if( SUCCEED == tcp_ret ) { if( SUCCEED == (tcp_ret = zbx_tcp_recv(&sock, &answer)) ) diff --git a/src/zabbix_server/trapper/active.c b/src/zabbix_server/trapper/active.c index 0e8e4c68..af92413a 100644 --- a/src/zabbix_server/trapper/active.c +++ b/src/zabbix_server/trapper/active.c @@ -90,6 +90,7 @@ int send_list_of_active_checks(zbx_sock_t *sock, const char *host) while((row=DBfetch(result))) { + zbx_snprintf(s,sizeof(s),"%s:%s:%s\n", row[0], row[1], @@ -118,3 +119,90 @@ int send_list_of_active_checks(zbx_sock_t *sock, const char *host) return SUCCEED; } + +/****************************************************************************** + * * + * Function: send_list_of_active_checks_json * + * * + * Purpose: send list of active checks to the host * + * * + * Parameters: sockfd - open socket of server-agent connection * + * json - request buffer * + * * + * Return value: SUCCEED - list of active checks sent succesfully * + * FAIL - an error occured * + * * + * Author: Alexei Vladishev * + * * + * Comments: * + * * + ******************************************************************************/ +int send_list_of_active_checks_json(zbx_sock_t *sock, struct zbx_json_parse *json) +{ + char host[MAX_STRING_LEN]; + char s[MAX_STRING_LEN]; + const char *pf; + DB_RESULT result; + DB_ROW row; + + struct zbx_json response; + + zabbix_log( LOG_LEVEL_WARNING, "In send_list_of_active_checks()"); + + if (NULL == (pf = zbx_json_pair_by_name(json, ZBX_PROTO_TAG_HOST)) || + NULL == (pf = zbx_json_decodevalue(pf, host, sizeof(host)))) + { + zabbix_log( LOG_LEVEL_WARNING, "No tag \"%s\" in JSON request", + ZBX_PROTO_TAG_HOST); + return FAIL; + } + + zabbix_log( LOG_LEVEL_WARNING, "Host:%s", host); + + if (0 != CONFIG_REFRESH_UNSUPPORTED) { + result = DBselect("select i.key_,i.delay,i.lastlogsize from items i,hosts h" + " where i.hostid=h.hostid and h.status=%d and i.type=%d and h.host='%s'" + " and h.proxyid=0 and (i.status=%d or (i.status=%d and i.nextcheck<=%d))" DB_NODE, + HOST_STATUS_MONITORED, + ITEM_TYPE_ZABBIX_ACTIVE, + host, + ITEM_STATUS_ACTIVE, ITEM_STATUS_NOTSUPPORTED, time(NULL), + DBnode_local("h.hostid")); + } else { + result = DBselect("select i.key_,i.delay,i.lastlogsize from items i,hosts h" + " where i.hostid=h.hostid and h.status=%d and i.type=%d and h.host='%s'" + " and h.proxyid=0 and i.status=%d" DB_NODE, + HOST_STATUS_MONITORED, + ITEM_TYPE_ZABBIX_ACTIVE, + host, + ITEM_STATUS_ACTIVE, + DBnode_local("h.hostid")); + } + + zbx_json_init(&response, 8*1024); + zbx_json_addstring(&response, ZBX_PROTO_TAG_RESPONSE, ZBX_PROTO_VALUE_SUCCESS, ZBX_JSON_TYPE_STRING); + zbx_json_addarray(&response, ZBX_PROTO_TAG_DATA); + + while((row=DBfetch(result))) + { + zbx_json_addobject(&response, NULL); + zbx_json_addstring(&response, ZBX_PROTO_TAG_KEY, row[0], ZBX_JSON_TYPE_STRING); + zbx_json_addstring(&response, ZBX_PROTO_TAG_DELAY, row[1], ZBX_JSON_TYPE_STRING); + zbx_json_addstring(&response, ZBX_PROTO_TAG_LOGLASTSIZE, row[2], ZBX_JSON_TYPE_STRING); + zbx_json_close(&response); + } + DBfree_result(result); + + zabbix_log( LOG_LEVEL_DEBUG, "Sending [%s]", + response.buffer); + + if( zbx_tcp_send_raw(sock, response.buffer) != SUCCEED ) + { + zabbix_log( LOG_LEVEL_WARNING, "Error while sending list of active checks"); + return FAIL; + } + + zbx_json_free(&response); + + return SUCCEED; +} diff --git a/src/zabbix_server/trapper/active.h b/src/zabbix_server/trapper/active.h index 3b31ea74..7acb259f 100644 --- a/src/zabbix_server/trapper/active.h +++ b/src/zabbix_server/trapper/active.h @@ -24,7 +24,9 @@ #include "common.h" #include "db.h" #include "comms.h" +#include "zbxjson.h" int send_list_of_active_checks(zbx_sock_t *sock, const char *host); +int send_list_of_active_checks_json(zbx_sock_t *sock, struct zbx_json_parse *json); #endif diff --git a/src/zabbix_server/trapper/trapper.c b/src/zabbix_server/trapper/trapper.c index 2db2f55c..66ac3905 100644 --- a/src/zabbix_server/trapper/trapper.c +++ b/src/zabbix_server/trapper/trapper.c @@ -544,10 +544,19 @@ static int process_trap(zbx_sock_t *sock,char *s, int max_len) { send_proxyconfig(sock, &jp); } - else if (0 == strcmp(value, ZBX_PROTO_VALUE_SENDER_DATA) && zbx_process == ZBX_PROCESS_SERVER) + else if (0 == strcmp(value, ZBX_PROTO_VALUE_SENDER_DATA)) { ret = process_new_values(sock, &jp); } + else if (0 == strcmp(value, ZBX_PROTO_VALUE_GET_ACTIVE_CHECKS)) + { + ret = send_list_of_active_checks_json(sock, &jp); + } + else + { + zabbix_log( LOG_LEVEL_WARNING, "Unknow request received [%s]", + value); + } } return ret; } |