summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlasdair Kergon <agk@redhat.com>2007-02-02 17:08:51 +0000
committerAlasdair Kergon <agk@redhat.com>2007-02-02 17:08:51 +0000
commit112d09deb14c5e64a4104d65fbd750b73358505e (patch)
treeabed53972f2bfb7017fd2204e3a739e764f18074
parentaa350ef4265cbcf92a6562d9f90e6a460c7c3329 (diff)
downloadlvm2-112d09deb14c5e64a4104d65fbd750b73358505e.tar.gz
lvm2-112d09deb14c5e64a4104d65fbd750b73358505e.tar.xz
lvm2-112d09deb14c5e64a4104d65fbd750b73358505e.zip
Improve dmeventd messaging protocol: drain pipe and tag messages.
-rw-r--r--WHATS_NEW_DM1
-rw-r--r--daemons/dmeventd/dmeventd.c48
-rw-r--r--daemons/dmeventd/dmeventd.h1
-rw-r--r--daemons/dmeventd/libdevmapper-event.c100
4 files changed, 114 insertions, 36 deletions
diff --git a/WHATS_NEW_DM b/WHATS_NEW_DM
index 7359398c..7b64fa00 100644
--- a/WHATS_NEW_DM
+++ b/WHATS_NEW_DM
@@ -1,5 +1,6 @@
Version 1.02.18 -
===================================
+ Improve dmeventd messaging protocol: drain pipe and tag messages.
Version 1.02.17 - 29th January 2007
===================================
diff --git a/daemons/dmeventd/dmeventd.c b/daemons/dmeventd/dmeventd.c
index 536d6471..f1462b38 100644
--- a/daemons/dmeventd/dmeventd.c
+++ b/daemons/dmeventd/dmeventd.c
@@ -146,6 +146,7 @@ static LIST_INIT(_dso_registry);
/* Structure to keep parsed register variables from client message. */
struct message_data {
+ char *id;
char *dso_name; /* Name of DSO. */
char *device_uuid; /* Mapped device path. */
union {
@@ -320,6 +321,8 @@ static int _fetch_string(char **ptr, char **src, const int delimiter)
/* Free message memory. */
static void _free_message(struct message_data *message_data)
{
+ if (message_data->id)
+ dm_free(message_data->id);
if (message_data->dso_name)
dm_free(message_data->dso_name);
@@ -342,7 +345,8 @@ static int _parse_message(struct message_data *message_data)
* Retrieve application identifier, mapped device
* path and events # string from message.
*/
- if (_fetch_string(&message_data->dso_name, &p, ' ') &&
+ if (_fetch_string(&message_data->id, &p, ' ') &&
+ _fetch_string(&message_data->dso_name, &p, ' ') &&
_fetch_string(&message_data->device_uuid, &p, ' ') &&
_fetch_string(&message_data->events.str, &p, ' ') &&
_fetch_string(&message_data->timeout.str, &p, ' ')) {
@@ -875,8 +879,8 @@ static struct dso_data *_load_dso(struct message_data *data)
syslog(LOG_ERR, "dmeventd %s dlopen failed: %s", data->dso_name,
dlerr);
data->msg->size =
- dm_asprintf(&(data->msg->data), "%s dlopen failed: %s",
- data->dso_name, dlerr);
+ dm_asprintf(&(data->msg->data), "%s %s dlopen failed: %s",
+ data->id, data->dso_name, dlerr);
return NULL;
}
@@ -1056,7 +1060,8 @@ static int _registered_device(struct message_data *message_data,
{
struct dm_event_daemon_message *msg = message_data->msg;
- const char *fmt = "%s %s %u";
+ const char *fmt = "%s %s %s %u";
+ const char *id = message_data->id;
const char *dso = thread->dso_data->dso_name;
const char *dev = thread->device.uuid;
unsigned events = ((thread->status == DM_THREAD_RUNNING)
@@ -1066,7 +1071,7 @@ static int _registered_device(struct message_data *message_data,
if (msg->data)
dm_free(msg->data);
- msg->size = dm_asprintf(&(msg->data), fmt, dso, dev, events);
+ msg->size = dm_asprintf(&(msg->data), fmt, id, dso, dev, events);
_unlock_mutex();
@@ -1180,7 +1185,8 @@ static int _get_timeout(struct message_data *message_data)
_lock_mutex();
if ((thread = _lookup_thread_status(message_data))) {
msg->size =
- dm_asprintf(&(msg->data), "%" PRIu32, thread->timeout);
+ dm_asprintf(&(msg->data), "%s %" PRIu32, message_data->id,
+ thread->timeout);
} else {
msg->data = NULL;
msg->size = 0;
@@ -1375,17 +1381,32 @@ static int _handle_request(struct dm_event_daemon_message *msg,
static int _do_process_request(struct dm_event_daemon_message *msg)
{
int ret;
+ char *answer;
static struct message_data message_data;
/* Parse the message. */
memset(&message_data, 0, sizeof(message_data));
message_data.msg = msg;
- if (msg->cmd != DM_EVENT_CMD_ACTIVE && !_parse_message(&message_data)) {
+ if (msg->cmd == DM_EVENT_CMD_HELLO) {
+ ret = 0;
+ answer = dm_strdup(msg->data);
+ if (answer) {
+ msg->size = dm_asprintf(&(msg->data), "%s HELLO", answer);
+ dm_free(answer);
+ } else {
+ msg->size = 0;
+ msg->data = NULL;
+ }
+ } else if (msg->cmd != DM_EVENT_CMD_ACTIVE && !_parse_message(&message_data)) {
stack;
ret = -EINVAL;
} else
ret = _handle_request(msg, &message_data);
+ msg->cmd = ret;
+ if (!msg->data)
+ msg->size = dm_asprintf(&(msg->data), "%s %s", message_data.id, strerror(-ret));
+
_free_message(&message_data);
return ret;
@@ -1405,16 +1426,9 @@ static void _process_request(struct dm_event_fifos *fifos)
if (!_client_read(fifos, &msg))
return;
- msg.cmd = _do_process_request(&msg);
- if (!msg.data) {
- msg.data = dm_strdup(strerror(-msg.cmd));
- if (msg.data)
- msg.size = strlen(msg.data) + 1;
- else {
- msg.size = 0;
- stack;
- }
- }
+ /* _do_process_request fills in msg (if memory allows for
+ data, otherwise just cmd and size = 0) */
+ _do_process_request(&msg);
if (!_client_write(fifos, &msg))
stack;
diff --git a/daemons/dmeventd/dmeventd.h b/daemons/dmeventd/dmeventd.h
index 084d1362..d7474b83 100644
--- a/daemons/dmeventd/dmeventd.h
+++ b/daemons/dmeventd/dmeventd.h
@@ -20,6 +20,7 @@ enum dm_event_command {
DM_EVENT_CMD_GET_NEXT_REGISTERED_DEVICE,
DM_EVENT_CMD_SET_TIMEOUT,
DM_EVENT_CMD_GET_TIMEOUT,
+ DM_EVENT_CMD_HELLO,
};
/* Message passed between client and daemon. */
diff --git a/daemons/dmeventd/libdevmapper-event.c b/daemons/dmeventd/libdevmapper-event.c
index 6a1d31c1..d2f8ac50 100644
--- a/daemons/dmeventd/libdevmapper-event.c
+++ b/daemons/dmeventd/libdevmapper-event.c
@@ -30,6 +30,8 @@
#include <sys/wait.h>
#include <arpa/inet.h> /* for htonl, ntohl */
+static int _sequence_nr = 0;
+
struct dm_event_handler {
char *dso;
@@ -182,6 +184,21 @@ enum dm_event_mask dm_event_handler_get_event_mask(const struct dm_event_handler
return dmevh->mask;
}
+static int _check_message_id(struct dm_event_daemon_message *msg)
+{
+ int pid, seq_nr;
+
+ if ((sscanf(msg->data, "%d:%d", &pid, &seq_nr) != 2) ||
+ (pid != getpid()) || (seq_nr != _sequence_nr)) {
+ log_error("Ignoring out-of-sequence reply from dmeventd. "
+ "Expected %d:%d but received %s", getpid(),
+ _sequence_nr, msg->data);
+ return 0;
+ }
+
+ return 1;
+}
+
/*
* daemon_read
* @fifos
@@ -260,11 +277,28 @@ static int _daemon_write(struct dm_event_fifos *fifos,
size_t size = 2 * sizeof(uint32_t) + msg->size;
char *buf = alloca(size);
+ char drainbuf[128];
+ struct timeval tval = { 0, 0 };
*((uint32_t *)buf) = htonl(msg->cmd);
*((uint32_t *)buf + 1) = htonl(msg->size);
memcpy(buf + 2 * sizeof(uint32_t), msg->data, msg->size);
+ /* drain the answer fifo */
+ while (1) {
+ FD_ZERO(&fds);
+ FD_SET(fifos->server, &fds);
+ tval.tv_usec = 100;
+ ret = select(fifos->server + 1, &fds, NULL, NULL, &tval);
+ if ((ret < 0) && (errno != EINTR)) {
+ log_error("Unable to talk to event daemon");
+ return 0;
+ }
+ if (ret == 0)
+ break;
+ read(fifos->server, drainbuf, 127);
+ }
+
while (bytes < size) {
do {
/* Watch daemon write FIFO to be ready for output. */
@@ -301,7 +335,7 @@ static int _daemon_talk(struct dm_event_fifos *fifos,
{
const char *dso = dso_name ? dso_name : "";
const char *dev = dev_name ? dev_name : "";
- const char *fmt = "%s %s %u %" PRIu32;
+ const char *fmt = "%d:%d %s %s %u %" PRIu32;
int msg_size;
memset(msg, 0, sizeof(*msg));
@@ -310,8 +344,10 @@ static int _daemon_talk(struct dm_event_fifos *fifos,
* into ASCII message string.
*/
msg->cmd = cmd;
- if ((msg_size = dm_asprintf(&(msg->data), fmt, dso, dev, evmask,
- timeout)) < 0) {
+ if (cmd == DM_EVENT_CMD_HELLO)
+ fmt = "%d:%d HELLO";
+ if ((msg_size = dm_asprintf(&(msg->data), fmt, getpid(), _sequence_nr,
+ dso, dev, evmask, timeout)) < 0) {
log_error("_daemon_talk: message allocation failed");
return -ENOMEM;
}
@@ -326,10 +362,14 @@ static int _daemon_talk(struct dm_event_fifos *fifos,
return -EIO;
}
- if (!_daemon_read(fifos, msg)) {
- stack;
- return -EIO;
- }
+ do {
+ if (!_daemon_read(fifos, msg)) {
+ stack;
+ return -EIO;
+ }
+ } while (!_check_message_id(msg));
+
+ _sequence_nr++;
return (int32_t) msg->cmd;
}
@@ -507,7 +547,9 @@ static int _do_event(int cmd, struct dm_event_daemon_message *msg,
return -ESRCH;
}
- ret = _daemon_talk(&fifos, msg, cmd, dso_name, dev_name, evmask, timeout);
+ ret = _daemon_talk(&fifos, msg, DM_EVENT_CMD_HELLO, 0, 0, 0, 0);
+ if (!ret)
+ ret = _daemon_talk(&fifos, msg, cmd, dso_name, dev_name, evmask, timeout);
/* what is the opposite of init? */
_dtr_client(&fifos);
@@ -521,7 +563,7 @@ int dm_event_register_handler(const struct dm_event_handler *dmevh)
int ret = 1, err;
const char *uuid;
struct dm_task *dmt;
- struct dm_event_daemon_message msg;
+ struct dm_event_daemon_message msg = { 0, 0, NULL };
if (!(dmt = _get_device_info(dmevh))) {
stack;
@@ -551,7 +593,7 @@ int dm_event_unregister_handler(const struct dm_event_handler *dmevh)
int ret = 1, err;
const char *uuid;
struct dm_task *dmt;
- struct dm_event_daemon_message msg;
+ struct dm_event_daemon_message msg = { 0, 0, NULL };
if (!(dmt = _get_device_info(dmevh))) {
stack;
@@ -598,15 +640,20 @@ static char *_fetch_string(char **src, const int delimiter)
static int _parse_message(struct dm_event_daemon_message *msg, char **dso_name,
char **uuid, enum dm_event_mask *evmask)
{
+ char *id = NULL;
char *p = msg->data;
- if ((*dso_name = _fetch_string(&p, ' ')) &&
+ if ((id = _fetch_string(&p, ' ')) &&
+ (*dso_name = _fetch_string(&p, ' ')) &&
(*uuid = _fetch_string(&p, ' '))) {
*evmask = atoi(p);
+ dm_free(id);
return 0;
}
+ if (id)
+ dm_free(id);
return -ENOMEM;
}
@@ -621,12 +668,12 @@ static int _parse_message(struct dm_event_daemon_message *msg, char **dso_name,
*/
int dm_event_get_registered_device(struct dm_event_handler *dmevh, int next)
{
- int ret;
+ int ret = 0;
const char *uuid = NULL;
char *reply_dso = NULL, *reply_uuid = NULL;
- enum dm_event_mask reply_mask;
- struct dm_task *dmt;
- struct dm_event_daemon_message msg;
+ enum dm_event_mask reply_mask = 0;
+ struct dm_task *dmt = NULL;
+ struct dm_event_daemon_message msg = { 0, 0, NULL };
if (!(dmt = _get_device_info(dmevh))) {
stack;
@@ -696,9 +743,17 @@ int dm_event_get_registered_device(struct dm_event_handler *dmevh, int next)
#if 0 /* left out for now */
+static char *_skip_string(char *src, const int delimiter)
+{
+ src = srtchr(src, delimiter);
+ if (src && *(src + 1))
+ return src + 1;
+ return NULL;
+}
+
int dm_event_set_timeout(const char *device_path, uint32_t timeout)
{
- struct dm_event_daemon_message msg;
+ struct dm_event_daemon_message msg = { 0, 0, NULL };
if (!device_exists(device_path))
return -ENODEV;
@@ -710,13 +765,20 @@ int dm_event_set_timeout(const char *device_path, uint32_t timeout)
int dm_event_get_timeout(const char *device_path, uint32_t *timeout)
{
int ret;
- struct dm_event_daemon_message msg;
+ struct dm_event_daemon_message msg = { 0, 0, NULL };
if (!device_exists(device_path))
return -ENODEV;
if (!(ret = _do_event(DM_EVENT_CMD_GET_TIMEOUT, &msg, NULL, device_path,
- 0, 0)))
- *timeout = atoi(msg.data);
+ 0, 0))) {
+ char *p = _skip_string(msg.data, ' ');
+ if (!p) {
+ log_error("malformed reply from dmeventd '%s'\n",
+ msg.data);
+ return -EIO;
+ }
+ *timeout = atoi(p);
+ }
if (msg.data)
dm_free(msg.data);
return ret;