summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2010-03-05 15:22:10 +0100
committerStefan Metzmacher <metze@samba.org>2010-03-07 16:52:36 +0100
commit2450fc1c271b9f944455370510062164e68a8d59 (patch)
treeae0030b2fa07948601f720ba3d411c2bbb51385b
parentcb4f2699453b8092be0c0ff42fa0f31582d38da5 (diff)
downloadsamba-2450fc1c271b9f944455370510062164e68a8d59.tar.gz
samba-2450fc1c271b9f944455370510062164e68a8d59.tar.xz
samba-2450fc1c271b9f944455370510062164e68a8d59.zip
s4:libcli/wrepl: implement wrepl_request_send as a tevent_req based wrapper
metze
-rw-r--r--source4/libcli/wrepl/winsrepl.c157
-rw-r--r--source4/torture/nbt/winsreplication.c12
-rw-r--r--source4/wrepl_server/wrepl_out_helpers.c28
3 files changed, 144 insertions, 53 deletions
diff --git a/source4/libcli/wrepl/winsrepl.c b/source4/libcli/wrepl/winsrepl.c
index 2b14de30f4b..39d801d606e 100644
--- a/source4/libcli/wrepl/winsrepl.c
+++ b/source4/libcli/wrepl/winsrepl.c
@@ -500,8 +500,8 @@ static int wrepl_send_ctrl_destructor(struct wrepl_send_ctrl_state *s)
send a generic wins replication request
*/
static struct wrepl_request *wrepl_request_internal_send(struct wrepl_socket *wrepl_socket,
- struct wrepl_packet *packet,
- struct wrepl_send_ctrl *ctrl)
+ const struct wrepl_packet *packet,
+ const struct wrepl_send_ctrl *ctrl)
{
struct wrepl_request *req;
struct wrepl_wrap wrap;
@@ -576,18 +576,80 @@ static NTSTATUS wrepl_request_internal_recv(struct wrepl_request *req,
return status;
}
-struct wrepl_request *wrepl_request_send(struct wrepl_socket *wrepl_socket,
- struct wrepl_packet *packet,
- struct wrepl_send_ctrl *ctrl)
+struct wrepl_request_state {
+ struct wrepl_packet *packet;
+};
+
+static void wrepl_request_done(struct wrepl_request *subreq);
+
+struct tevent_req *wrepl_request_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct wrepl_socket *wrepl_socket,
+ const struct wrepl_packet *packet,
+ const struct wrepl_send_ctrl *ctrl)
{
- return wrepl_request_internal_send(wrepl_socket, packet, ctrl);
+ struct tevent_req *req;
+ struct wrepl_request_state *state;
+ struct wrepl_request *subreq;
+
+ if (wrepl_socket->event.ctx != ev) {
+ /* TODO: remove wrepl_socket->event.ctx !!! */
+ smb_panic("wrepl_associate_stop_send event context mismatch!");
+ return NULL;
+ }
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct wrepl_request_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ subreq = wrepl_request_internal_send(wrepl_socket, packet, ctrl);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ subreq->async.fn = wrepl_request_done;
+ subreq->async.private_data = req;
+
+ return req;
}
-NTSTATUS wrepl_request_recv(struct wrepl_request *req,
+static void wrepl_request_done(struct wrepl_request *subreq)
+{
+ struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+ struct tevent_req);
+ struct wrepl_request_state *state = tevent_req_data(req,
+ struct wrepl_request_state);
+ NTSTATUS status;
+
+ status = wrepl_request_internal_recv(subreq, state, &state->packet);
+ if (!NT_STATUS_IS_OK(status)) {
+ tevent_req_nterror(req, status);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+NTSTATUS wrepl_request_recv(struct tevent_req *req,
TALLOC_CTX *mem_ctx,
struct wrepl_packet **packet)
{
- return wrepl_request_internal_recv(req, mem_ctx, packet);
+ struct wrepl_request_state *state = tevent_req_data(req,
+ struct wrepl_request_state);
+ NTSTATUS status;
+
+ if (tevent_req_is_nterror(req, &status)) {
+ tevent_req_received(req);
+ return status;
+ }
+
+ if (packet) {
+ *packet = talloc_move(mem_ctx, &state->packet);
+ }
+
+ tevent_req_received(req);
+ return NT_STATUS_OK;
}
/*
@@ -595,11 +657,28 @@ NTSTATUS wrepl_request_recv(struct wrepl_request *req,
*/
NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket,
TALLOC_CTX *mem_ctx,
- struct wrepl_packet *req_packet,
+ const struct wrepl_packet *req_packet,
struct wrepl_packet **reply_packet)
{
- struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet, NULL);
- return wrepl_request_recv(req, mem_ctx, reply_packet);
+ struct tevent_req *subreq;
+ bool ok;
+ NTSTATUS status;
+
+ subreq = wrepl_request_send(mem_ctx, wrepl_socket->event.ctx,
+ wrepl_socket, req_packet, NULL);
+ NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+ ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+ if (!ok) {
+ TALLOC_FREE(subreq);
+ return NT_STATUS_INTERNAL_ERROR;
+ }
+
+ status = wrepl_request_recv(subreq, mem_ctx, reply_packet);
+ TALLOC_FREE(subreq);
+ NT_STATUS_NOT_OK_RETURN(status);
+
+ return NT_STATUS_OK;
}
@@ -609,7 +688,7 @@ struct wrepl_associate_state {
uint16_t major_version;
};
-static void wrepl_associate_done(struct wrepl_request *subreq);
+static void wrepl_associate_done(struct tevent_req *subreq);
struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@@ -618,7 +697,7 @@ struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
{
struct tevent_req *req;
struct wrepl_associate_state *state;
- struct wrepl_request *subreq;
+ struct tevent_req *subreq;
if (wrepl_socket->event.ctx != ev) {
/* TODO: remove wrepl_socket->event.ctx !!! */
@@ -651,19 +730,18 @@ struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
}
memset(state->packet.padding.data, 0, state->packet.padding.length);
- subreq = wrepl_request_send(wrepl_socket, &state->packet, NULL);
+ subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
- subreq->async.fn = wrepl_associate_done;
- subreq->async.private_data = req;
+ tevent_req_set_callback(subreq, wrepl_associate_done, req);
return req;
}
-static void wrepl_associate_done(struct wrepl_request *subreq)
+static void wrepl_associate_done(struct tevent_req *subreq)
{
- struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+ struct tevent_req *req = tevent_req_callback_data(subreq,
struct tevent_req);
struct wrepl_associate_state *state = tevent_req_data(req,
struct wrepl_associate_state);
@@ -671,6 +749,7 @@ static void wrepl_associate_done(struct wrepl_request *subreq)
struct wrepl_packet *packet;
status = wrepl_request_recv(subreq, state, &packet);
+ TALLOC_FREE(subreq);
if (!NT_STATUS_IS_OK(status)) {
tevent_req_nterror(req, status);
return;
@@ -741,7 +820,7 @@ struct wrepl_associate_stop_state {
struct wrepl_send_ctrl ctrl;
};
-static void wrepl_associate_stop_done(struct wrepl_request *subreq);
+static void wrepl_associate_stop_done(struct tevent_req *subreq);
struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@@ -750,7 +829,7 @@ struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
{
struct tevent_req *req;
struct wrepl_associate_stop_state *state;
- struct wrepl_request *subreq;
+ struct tevent_req *subreq;
if (wrepl_socket->event.ctx != ev) {
/* TODO: remove wrepl_socket->event.ctx !!! */
@@ -774,19 +853,18 @@ struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
state->ctrl.disconnect_after_send = true;
}
- subreq = wrepl_request_send(wrepl_socket, &state->packet, &state->ctrl);
+ subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, &state->ctrl);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
- subreq->async.fn = wrepl_associate_stop_done;
- subreq->async.private_data = req;
+ tevent_req_set_callback(subreq, wrepl_associate_stop_done, req);
return req;
}
-static void wrepl_associate_stop_done(struct wrepl_request *subreq)
+static void wrepl_associate_stop_done(struct tevent_req *subreq)
{
- struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+ struct tevent_req *req = tevent_req_callback_data(subreq,
struct tevent_req);
struct wrepl_associate_stop_state *state = tevent_req_data(req,
struct wrepl_associate_stop_state);
@@ -794,6 +872,7 @@ static void wrepl_associate_stop_done(struct wrepl_request *subreq)
/* currently we don't care about a possible response */
status = wrepl_request_recv(subreq, state, NULL);
+ TALLOC_FREE(subreq);
if (!NT_STATUS_IS_OK(status)) {
tevent_req_nterror(req, status);
return;
@@ -852,7 +931,7 @@ struct wrepl_pull_table_state {
struct wrepl_wins_owner *partners;
};
-static void wrepl_pull_table_done(struct wrepl_request *subreq);
+static void wrepl_pull_table_done(struct tevent_req *subreq);
struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@@ -861,7 +940,7 @@ struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
{
struct tevent_req *req;
struct wrepl_pull_table_state *state;
- struct wrepl_request *subreq;
+ struct tevent_req *subreq;
if (wrepl_socket->event.ctx != ev) {
/* TODO: remove wrepl_socket->event.ctx !!! */
@@ -880,19 +959,18 @@ struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
state->packet.mess_type = WREPL_REPLICATION;
state->packet.message.replication.command = WREPL_REPL_TABLE_QUERY;
- subreq = wrepl_request_send(wrepl_socket, &state->packet, NULL);
+ subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
- subreq->async.fn = wrepl_pull_table_done;
- subreq->async.private_data = req;
+ tevent_req_set_callback(subreq, wrepl_pull_table_done, req);
return req;
}
-static void wrepl_pull_table_done(struct wrepl_request *subreq)
+static void wrepl_pull_table_done(struct tevent_req *subreq)
{
- struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+ struct tevent_req *req = tevent_req_callback_data(subreq,
struct tevent_req);
struct wrepl_pull_table_state *state = tevent_req_data(req,
struct wrepl_pull_table_state);
@@ -901,6 +979,7 @@ static void wrepl_pull_table_done(struct wrepl_request *subreq)
struct wrepl_table *table;
status = wrepl_request_recv(subreq, state, &packet);
+ TALLOC_FREE(subreq);
if (!NT_STATUS_IS_OK(status)) {
tevent_req_nterror(req, status);
return;
@@ -985,7 +1064,7 @@ struct wrepl_pull_names_state {
struct wrepl_name *names;
};
-static void wrepl_pull_names_done(struct wrepl_request *subreq);
+static void wrepl_pull_names_done(struct tevent_req *subreq);
struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@@ -994,7 +1073,7 @@ struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
{
struct tevent_req *req;
struct wrepl_pull_names_state *state;
- struct wrepl_request *subreq;
+ struct tevent_req *subreq;
if (wrepl_socket->event.ctx != ev) {
/* TODO: remove wrepl_socket->event.ctx !!! */
@@ -1015,19 +1094,18 @@ struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
state->packet.message.replication.command = WREPL_REPL_SEND_REQUEST;
state->packet.message.replication.info.owner = io->in.partner;
- subreq = wrepl_request_send(wrepl_socket, &state->packet, NULL);
+ subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
- subreq->async.fn = wrepl_pull_names_done;
- subreq->async.private_data = req;
+ tevent_req_set_callback(subreq, wrepl_pull_names_done, req);
return req;
}
-static void wrepl_pull_names_done(struct wrepl_request *subreq)
+static void wrepl_pull_names_done(struct tevent_req *subreq)
{
- struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+ struct tevent_req *req = tevent_req_callback_data(subreq,
struct tevent_req);
struct wrepl_pull_names_state *state = tevent_req_data(req,
struct wrepl_pull_names_state);
@@ -1036,6 +1114,7 @@ static void wrepl_pull_names_done(struct wrepl_request *subreq)
uint32_t i;
status = wrepl_request_recv(subreq, state, &packet);
+ TALLOC_FREE(subreq);
if (!NT_STATUS_IS_OK(status)) {
tevent_req_nterror(req, status);
return;
diff --git a/source4/torture/nbt/winsreplication.c b/source4/torture/nbt/winsreplication.c
index 1655135a334..9a7be031995 100644
--- a/source4/torture/nbt/winsreplication.c
+++ b/source4/torture/nbt/winsreplication.c
@@ -83,7 +83,7 @@ static const char *wrepl_name_state_string(enum wrepl_name_state state)
static bool test_assoc_ctx1(struct torture_context *tctx)
{
bool ret = true;
- struct wrepl_request *req;
+ struct tevent_req *subreq;
struct wrepl_socket *wrepl_socket1;
struct wrepl_associate associate1;
struct wrepl_socket *wrepl_socket2;
@@ -95,6 +95,7 @@ static bool test_assoc_ctx1(struct torture_context *tctx)
NTSTATUS status;
struct nbt_name name;
const char *address;
+ bool ok;
if (!torture_nbt_get_name(tctx, &name, &address))
return false;
@@ -131,8 +132,13 @@ static bool test_assoc_ctx1(struct torture_context *tctx)
packet.message.replication.command = WREPL_REPL_TABLE_QUERY;
ZERO_STRUCT(ctrl);
ctrl.send_only = true;
- req = wrepl_request_send(wrepl_socket2, &packet, &ctrl);
- status = wrepl_request_recv(req, tctx, &rep_packet);
+ subreq = wrepl_request_send(tctx, tctx->ev, wrepl_socket2, &packet, &ctrl);
+ ok = tevent_req_poll(subreq, tctx->ev);
+ if (!ok) {
+ CHECK_STATUS(tctx, NT_STATUS_INTERNAL_ERROR, NT_STATUS_OK);
+ }
+ status = wrepl_request_recv(subreq, tctx, &rep_packet);
+ TALLOC_FREE(subreq);
CHECK_STATUS(tctx, status, NT_STATUS_OK);
torture_comment(tctx, "Send a association request (conn2), to make sure the last request was ignored\n");
diff --git a/source4/wrepl_server/wrepl_out_helpers.c b/source4/wrepl_server/wrepl_out_helpers.c
index de3fb72318d..19cabd1e12b 100644
--- a/source4/wrepl_server/wrepl_out_helpers.c
+++ b/source4/wrepl_server/wrepl_out_helpers.c
@@ -848,15 +848,15 @@ struct wreplsrv_push_notify_state {
enum wrepl_replication_cmd command;
bool full_table;
struct wrepl_send_ctrl ctrl;
- struct wrepl_request *req;
struct wrepl_packet req_packet;
struct wrepl_packet *rep_packet;
struct composite_context *creq;
struct wreplsrv_out_connection *wreplconn;
+ struct tevent_req *subreq;
};
static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
-static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
+static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq);
static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
{
@@ -880,8 +880,10 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
NT_STATUS_NOT_OK_RETURN(status);
/* queue the request */
- state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
- NT_STATUS_HAVE_NO_MEMORY(state->req);
+ state->subreq = wrepl_request_send(state,
+ state->wreplconn->service->task->event_ctx,
+ state->wreplconn->sock, req, NULL);
+ NT_STATUS_HAVE_NO_MEMORY(state->subreq);
/*
* now we need to convert the wrepl_socket (client connection)
@@ -951,11 +953,14 @@ static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *s
/* we won't get a reply to a inform message */
state->ctrl.send_only = true;
- state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
- NT_STATUS_HAVE_NO_MEMORY(state->req);
+ state->subreq = wrepl_request_send(state,
+ state->wreplconn->service->task->event_ctx,
+ state->wreplconn->sock, req, &state->ctrl);
+ NT_STATUS_HAVE_NO_MEMORY(state->subreq);
- state->req->async.fn = wreplsrv_push_notify_handler_req;
- state->req->async.private_data = state;
+ tevent_req_set_callback(state->subreq,
+ wreplsrv_push_notify_handler_treq,
+ state);
state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
@@ -1009,7 +1014,8 @@ static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_sta
{
NTSTATUS status;
- status = wrepl_request_recv(state->req, state, NULL);
+ status = wrepl_request_recv(state->subreq, state, NULL);
+ TALLOC_FREE(state->subreq);
NT_STATUS_NOT_OK_RETURN(status);
state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
@@ -1052,9 +1058,9 @@ static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
return;
}
-static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
+static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq)
{
- struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private_data,
+ struct wreplsrv_push_notify_state *state = tevent_req_callback_data(subreq,
struct wreplsrv_push_notify_state);
wreplsrv_push_notify_handler(state);
return;