summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configure.ac2
-rw-r--r--glusterfs.spec.in1
-rw-r--r--libglusterfs/src/glfs-message-id.h1
-rw-r--r--xlators/features/Makefile.am7
-rw-r--r--xlators/features/sdfs/Makefile.am3
-rw-r--r--xlators/features/sdfs/src/Makefile.am17
-rw-r--r--xlators/features/sdfs/src/sdfs-messages.h69
-rw-r--r--xlators/features/sdfs/src/sdfs.c1351
-rw-r--r--xlators/features/sdfs/src/sdfs.h49
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volgen.c26
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-volume-set.c10
11 files changed, 1532 insertions, 4 deletions
diff --git a/configure.ac b/configure.ac
index 81c56d4e68..ed84c66179 100644
--- a/configure.ac
+++ b/configure.ac
@@ -152,6 +152,8 @@ AC_CONFIG_FILES([Makefile
xlators/features/marker/src/Makefile
xlators/features/selinux/Makefile
xlators/features/selinux/src/Makefile
+ xlators/features/sdfs/Makefile
+ xlators/features/sdfs/src/Makefile
xlators/features/read-only/Makefile
xlators/features/read-only/src/Makefile
xlators/features/compress/Makefile
diff --git a/glusterfs.spec.in b/glusterfs.spec.in
index 2dbd013c1d..f7f8cbae6b 100644
--- a/glusterfs.spec.in
+++ b/glusterfs.spec.in
@@ -1256,6 +1256,7 @@ exit 0
%{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/arbiter.so
%{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/bit-rot.so
%{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/bitrot-stub.so
+ %{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/sdfs.so
%if ( 0%{!?_without_tiering:1} )
%{_libdir}/glusterfs/%{version}%{?prereltag}/xlator/features/changetimerecorder.so
%{_libdir}/libgfdb.so.*
diff --git a/libglusterfs/src/glfs-message-id.h b/libglusterfs/src/glfs-message-id.h
index 241913d5b4..6a7d07bef8 100644
--- a/libglusterfs/src/glfs-message-id.h
+++ b/libglusterfs/src/glfs-message-id.h
@@ -85,6 +85,7 @@ enum _msgid_comp {
GLFS_MSGID_COMP(NLC, 1),
GLFS_MSGID_COMP(SL, 1),
GLFS_MSGID_COMP(HAM, 1),
+ GLFS_MSGID_COMP(SDFS, 1),
/* --- new segments for messages goes above this line --- */
diff --git a/xlators/features/Makefile.am b/xlators/features/Makefile.am
index f7791b0cc3..1e24679903 100644
--- a/xlators/features/Makefile.am
+++ b/xlators/features/Makefile.am
@@ -1,6 +1,5 @@
SUBDIRS = locks quota read-only quiesce marker index barrier \
- arbiter compress changelog changetimerecorder \
- gfid-access $(GLUPY_SUBDIR) upcall snapview-client snapview-server \
- trash shard bit-rot leases selinux
-
+ arbiter compress changelog changetimerecorder \
+ gfid-access $(GLUPY_SUBDIR) upcall snapview-client snapview-server \
+ trash shard bit-rot leases selinux sdfs
CLEANFILES =
diff --git a/xlators/features/sdfs/Makefile.am b/xlators/features/sdfs/Makefile.am
new file mode 100644
index 0000000000..a985f42a87
--- /dev/null
+++ b/xlators/features/sdfs/Makefile.am
@@ -0,0 +1,3 @@
+SUBDIRS = src
+
+CLEANFILES =
diff --git a/xlators/features/sdfs/src/Makefile.am b/xlators/features/sdfs/src/Makefile.am
new file mode 100644
index 0000000000..ec9ed804b3
--- /dev/null
+++ b/xlators/features/sdfs/src/Makefile.am
@@ -0,0 +1,17 @@
+xlator_LTLIBRARIES = sdfs.la
+xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/features
+
+sdfs_la_LDFLAGS = -module $(GF_XLATOR_DEFAULT_LDFLAGS)
+
+sdfs_la_SOURCES = sdfs.c
+sdfs_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
+
+noinst_HEADERS = sdfs.h sdfs-messages.h $(top_builddir)/xlators/lib/src/libxlator.h
+
+AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/xlators/lib/src \
+ -I$(top_srcdir)/rpc/xdr/src/ -I$(top_builddir)/rpc/xdr/src/
+
+AM_CFLAGS = -Wall -fno-strict-aliasing $(GF_CFLAGS)
+
+CLEANFILES =
diff --git a/xlators/features/sdfs/src/sdfs-messages.h b/xlators/features/sdfs/src/sdfs-messages.h
new file mode 100644
index 0000000000..6c7a9d9066
--- /dev/null
+++ b/xlators/features/sdfs/src/sdfs-messages.h
@@ -0,0 +1,69 @@
+/*
+ Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+ */
+
+#ifndef _DFS_MESSAGES_H_
+#define _DFS_MESSAGES_H_
+
+#include "glfs-message-id.h"
+
+/* file bit-rot-bitd-messages.h
+ * brief SDFS log-message IDs and their descriptions
+ */
+
+/* NOTE: Rules for message additions
+ * 1) Each instance of a message is _better_ left with a unique message ID, even
+ * if the message format is the same. Reasoning is that, if the message
+ * format needs to change in one instance, the other instances are not
+ * impacted or the new change does not change the ID of the instance being
+ * modified.
+ * 2) Addition of a message,
+ * - Should increment the GLFS_NUM_MESSAGES
+ * - Append to the list of messages defined, towards the end
+ * - Retain macro naming as glfs_msg_X (for redability across developers)
+ * NOTE: Rules for message format modifications
+ * 3) Check acorss the code if the message ID macro in question is reused
+ * anywhere. If reused then then the modifications should ensure correctness
+ * everywhere, or needs a new message ID as (1) above was not adhered to. If
+ * not used anywhere, proceed with the required modification.
+ * NOTE: Rules for message deletion
+ * 4) Check (3) and if used anywhere else, then cannot be deleted. If not used
+ * anywhere, then can be deleted, but will leave a hole by design, as
+ * addition rules specify modification to the end of the list and not filling
+ * holes.
+ */
+
+#define GLFS_SDFS_BASE GLFS_MSGID_COMP_SDFS
+#define GLFS_SDFS_NUM_MESSAGES 2
+#define GLFS_MSGID_END (GLFS_SDFS_BASE + \
+ GLFS_SDFS_NUM_MESSAGES + 1)
+/* Messaged with message IDs */
+#define glfs_msg_start_x GLFS_DFS_BASE, "Invalid: Start of messages"
+/*------------*/
+
+
+#define SDFS_MSG_ENTRYLK_ERROR (GLFS_SDFS_BASE + 1)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
+
+#define SDFS_MSG_MKDIR_ERROR (GLFS_SDFS_BASE + 2)
+/*!
+ * @messageid
+ * @diagnosis
+ * @recommendedaction
+ *
+ */
+/*------------*/
+
+#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
+#endif /* !_SDFS_MESSAGES_H_ */
diff --git a/xlators/features/sdfs/src/sdfs.c b/xlators/features/sdfs/src/sdfs.c
new file mode 100644
index 0000000000..3b70dce5d2
--- /dev/null
+++ b/xlators/features/sdfs/src/sdfs.c
@@ -0,0 +1,1351 @@
+/*
+ Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+#include <libgen.h>
+#include "sdfs.h"
+
+static int
+sdfs_frame_return (call_frame_t *frame)
+{
+ sdfs_local_t *local = NULL;
+
+ if (!frame)
+ return -1;
+
+ local = frame->local;
+
+ return GF_ATOMIC_DEC (local->call_cnt);
+}
+
+static void
+sdfs_lock_free (sdfs_entry_lock_t *entrylk)
+{
+ if (entrylk == NULL)
+ goto out;
+
+ loc_wipe (&entrylk->parent_loc);
+ GF_FREE (entrylk->basename);
+
+out:
+ return;
+}
+
+static void
+sdfs_lock_array_free (sdfs_lock_t *lock)
+{
+ sdfs_entry_lock_t *entrylk = NULL;
+ int i = 0;
+
+ if (lock == NULL)
+ goto out;
+
+ for (i = 0; i < lock->lock_count; i++) {
+ entrylk = &lock->entrylk[i];
+ sdfs_lock_free (entrylk);
+ }
+
+out:
+ return;
+}
+
+static void
+sdfs_local_cleanup (sdfs_local_t *local)
+{
+ if (!local)
+ return;
+
+ loc_wipe (&local->loc);
+ loc_wipe (&local->parent_loc);
+
+ if (local->stub) {
+ call_stub_destroy (local->stub);
+ local->stub = NULL;
+ }
+
+ sdfs_lock_array_free (local->lock);
+ GF_FREE (local->lock);
+
+ mem_put (local);
+}
+
+static int
+sdfs_build_parent_loc (loc_t *parent, loc_t *child)
+{
+ int ret = -1;
+ char *path = NULL;
+
+ if (!child->parent) {
+ goto out;
+ }
+ parent->inode = inode_ref (child->parent);
+ path = gf_strdup (child->path);
+ if (!path) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ parent->path = dirname(path);
+ if (!parent->path) {
+ goto out;
+ }
+
+ gf_uuid_copy (parent->gfid, child->pargfid);
+ return 0;
+
+out:
+ GF_FREE (path);
+ return ret;
+}
+
+static sdfs_local_t *
+sdfs_local_init (call_frame_t *frame, xlator_t *this)
+{
+ sdfs_local_t *local = NULL;
+
+ local = mem_get0 (this->local_pool);
+ if (!local)
+ goto out;
+
+ frame->local = local;
+out:
+ return local;
+}
+
+static int
+sdfs_get_new_frame (call_frame_t *frame, loc_t *loc, call_frame_t **new_frame)
+{
+ int ret = -1;
+ sdfs_local_t *local = NULL;
+ client_t *client = NULL;
+
+ *new_frame = copy_frame (frame);
+ if (!*new_frame) {
+ goto err;
+ }
+
+ client = frame->root->client;
+ gf_client_ref (client);
+ (*new_frame)->root->client = client;
+ local = sdfs_local_init (*new_frame, THIS);
+ if (!local) {
+ goto err;
+ }
+
+ local->main_frame = frame;
+
+ ret = sdfs_build_parent_loc (&local->parent_loc, loc);
+ if (ret) {
+ goto err;
+ }
+
+ ret = loc_copy (&local->loc, loc);
+ if (ret == -1) {
+ goto err;
+ }
+
+ ret = 0;
+err:
+ if (ret == -1) {
+ SDFS_STACK_DESTROY (frame);
+ }
+ return ret;
+}
+
+int
+sdfs_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_stub_t *stub = NULL;
+
+ local = frame->local;
+
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+
+ if (local->stub) {
+ stub = local->stub;
+ local->stub = NULL;
+ call_resume (stub);
+ } else {
+ if (op_ret < 0)
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Unlocking entry lock failed for %s",
+ local->loc.name);
+
+ SDFS_STACK_DESTROY (frame);
+ }
+
+ return 0;
+}
+
+int
+sdfs_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (mkdir, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_mkdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ mode_t mode, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+ int op_errno = -1;
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ op_errno = local->op_errno;
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_mkdir_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->mkdir, loc,
+ mode, umask, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (mkdir, local->main_frame, -1, op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_mkdir_stub (new_frame, sdfs_mkdir_helper, loc, mode,
+ umask, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (mkdir, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (rmdir, local->main_frame, op_ret, op_errno,
+ preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_rmdir_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int flags, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_rmdir_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->rmdir, loc,
+ flags, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (rmdir, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_rmdir (call_frame_t *frame, xlator_t *this, loc_t *loc, int flags,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_rmdir_stub (new_frame, sdfs_rmdir_helper, loc, flags, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (rmdir, frame, -1, op_errno, NULL, NULL,
+ NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ fd_t *fd, inode_t *inode, struct iatt *stbuf,
+ struct iatt *preparent, struct iatt *postparent,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (create, local->main_frame, op_ret, op_errno, fd,
+ inode, stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_create_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int32_t flags, mode_t mode, mode_t umask, fd_t *fd,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_create_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->create, loc, flags,
+ mode, umask, fd, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (create, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_create (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int32_t flags, mode_t mode, mode_t umask,
+ fd_t *fd, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_create_stub (new_frame, sdfs_create_helper, loc,
+ flags, mode, umask, fd, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (create, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (unlink, local->main_frame, op_ret, op_errno,
+ preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_unlink_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int flags, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_unlink_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->unlink, loc, flags, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (unlink, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ int flags, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_unlink_stub (new_frame, sdfs_unlink_helper, loc,
+ flags, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (unlink, frame, -1, op_errno, NULL, NULL,
+ NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (link, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_symlink_helper (call_frame_t *frame, xlator_t *this,
+ const char *linkname, loc_t *loc, mode_t umask,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_symlink_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->symlink, linkname, loc,
+ umask, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (link, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_symlink (call_frame_t *frame, xlator_t *this, const char *linkname,
+ loc_t *loc, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_symlink_stub (new_frame, sdfs_symlink_helper, linkname, loc,
+ umask, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (link, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_common_entrylk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ int this_call_cnt = 0;
+ int lk_index = 0;
+ sdfs_lock_t *locks = NULL;
+ call_stub_t *stub = NULL;
+
+ local = frame->local;
+ locks = local->lock;
+ lk_index = (long) cookie;
+
+ if (op_ret < 0) {
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+ } else {
+ locks->entrylk->locked[lk_index] = _gf_true;
+ }
+
+ this_call_cnt = sdfs_frame_return (frame);
+ if (this_call_cnt > 0) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "As there are more callcnt (%d) returning without WIND",
+ this_call_cnt);
+ return 0;
+ }
+
+ if (local->stub) {
+ stub = local->stub;
+ local->stub = NULL;
+ call_resume (stub);
+ } else {
+ if (local->op_ret < 0)
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "unlocking entry lock failed ");
+ SDFS_STACK_DESTROY (frame);
+ }
+
+ return 0;
+}
+
+int
+sdfs_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ inode_t *inode, struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ int i = 0;
+ int lock_count = 0;
+
+ local = frame->local;
+ lock = local->lock;
+
+ STACK_UNWIND_STRICT (link, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ lock_count = lock->lock_count;
+ for (i = 0; i < lock_count; i++) {
+ STACK_WIND_COOKIE (frame, sdfs_common_entrylk_cbk,
+ (void *)(long) i,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ return 0;
+}
+
+int
+sdfs_link_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *locks = NULL;
+ gf_boolean_t stack_destroy = _gf_true;
+ int lock_count = 0;
+ int i = 0;
+
+ local = frame->local;
+ locks = local->lock;
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed");
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_link_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->link, oldloc, newloc,
+ xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (link, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ for (i = 0; i < locks->lock_count && locks->entrylk->locked[i]; i++) {
+ lock_count++;
+ }
+ GF_ATOMIC_INIT (local->call_cnt, lock_count);
+
+ for (i = 0; i < lock_count; i++) {
+ if (!locks->entrylk->locked[i]) {
+ lock_count++;
+ continue;
+ }
+
+ stack_destroy = _gf_false;
+ STACK_WIND (frame, sdfs_common_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &locks->entrylk[i].parent_loc,
+ locks->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ if (stack_destroy)
+ SDFS_STACK_DESTROY (frame);
+
+ return 0;
+}
+
+static int
+sdfs_init_entry_lock (sdfs_entry_lock_t *lock, loc_t *loc)
+{
+ int ret = 0;
+
+ ret = sdfs_build_parent_loc (&lock->parent_loc, loc);
+ if (ret)
+ return -1;
+
+ lock->basename = gf_strdup (loc->name);
+ if (!lock->basename)
+ return -1;
+
+ return 0;
+}
+
+int
+sdfs_entry_lock_cmp (const void *l1, const void *l2)
+{
+ const sdfs_entry_lock_t *r1 = l1;
+ const sdfs_entry_lock_t *r2 = l2;
+ int ret = 0;
+ uuid_t gfid1 = {0};
+ uuid_t gfid2 = {0};
+
+ loc_gfid ((loc_t *)&r1->parent_loc, gfid1);
+ loc_gfid ((loc_t *)&r2->parent_loc, gfid2);
+ ret = gf_uuid_compare (gfid1, gfid2);
+ /*Entrylks with NULL basename are the 'smallest'*/
+ if (ret == 0) {
+ if (!r1->basename)
+ return -1;
+ if (!r2->basename)
+ return 1;
+ ret = strcmp (r1->basename, r2->basename);
+ }
+
+ if (ret <= 0)
+ return -1;
+ else
+ return 1;
+}
+
+int
+sdfs_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ sdfs_lock_t *lock = NULL;
+ client_t *client = NULL;
+ int ret = 0;
+ int op_errno = 0;
+
+ new_frame = copy_frame (frame);
+ if (!new_frame) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ gf_client_ref (client);
+ new_frame->root->client = client;
+ local = sdfs_local_init (new_frame, this);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->main_frame = frame;
+
+
+ lock = GF_CALLOC (1, sizeof (*lock), gf_common_mt_char);
+ if (!lock)
+ goto err;
+
+ local->lock = lock;
+
+ ret = sdfs_init_entry_lock (&lock->entrylk[0], newloc);
+ if (ret)
+ goto err;
+
+ ++lock->lock_count;
+
+ local->lock = lock;
+ GF_ATOMIC_INIT (local->call_cnt, lock->lock_count);
+
+ loc_copy (&local->loc, newloc);
+ if (ret == -1) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_link_stub (new_frame, sdfs_link_helper, oldloc,
+ newloc, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->stub = stub;
+
+ STACK_WIND_COOKIE (new_frame, sdfs_common_entrylk_cbk,
+ 0, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[0].parent_loc,
+ lock->entrylk[0].basename, ENTRYLK_LOCK,
+ ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+
+ STACK_UNWIND_STRICT (link, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ STACK_UNWIND_STRICT (mknod, local->main_frame, op_ret, op_errno, inode,
+ stbuf, preparent, postparent, xdata);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_mknod_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ mode_t mode, dev_t rdev, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_mknod_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->mknod, loc, mode, rdev,
+ umask, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (mknod, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
+ dev_t rdev, mode_t umask, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_mknod_stub (new_frame, sdfs_mknod_helper, loc, mode,
+ rdev, umask, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (mknod, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *stbuf,
+ struct iatt *preoldparent, struct iatt *postoldparent,
+ struct iatt *prenewparent, struct iatt *postnewparent,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ int i = 0;
+ int call_cnt = 0;
+
+ local = frame->local;
+ lock = local->lock;
+ GF_ATOMIC_INIT (local->call_cnt, lock->lock_count);
+
+ STACK_UNWIND_STRICT (rename, local->main_frame, op_ret, op_errno, stbuf,
+ preoldparent, postoldparent, prenewparent,
+ postnewparent, xdata);
+
+ local->main_frame = NULL;
+ call_cnt = GF_ATOMIC_GET (local->call_cnt);
+
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND_COOKIE (frame, sdfs_common_entrylk_cbk,
+ (void *)(long) i,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ return 0;
+}
+
+int
+sdfs_rename_helper (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ gf_boolean_t stack_destroy = _gf_true;
+ int lock_count = 0;
+ int i = 0;
+
+ local = frame->local;
+ lock = local->lock;
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed ");
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_rename_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->rename, oldloc, newloc,
+ xdata);
+
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (rename, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL, NULL, NULL);
+
+ local->main_frame = NULL;
+ for (i = 0; i < lock->lock_count && lock->entrylk->locked[i]; i++) {
+ lock_count++;
+ }
+ GF_ATOMIC_INIT (local->call_cnt, lock_count);
+
+ for (i = 0; i < lock_count; i++) {
+ if (!lock->entrylk->locked[i]) {
+ lock_count++;
+ continue;
+ }
+ stack_destroy = _gf_false;
+ STACK_WIND (frame, sdfs_common_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename,
+ ENTRYLK_UNLOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ if (stack_destroy)
+ SDFS_STACK_DESTROY (frame);
+
+ return 0;
+}
+
+int
+sdfs_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ sdfs_lock_t *lock = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ client_t *client = NULL;
+ int ret = 0;
+ int op_errno = -1;
+ int i = 0;
+ int call_cnt = 0;
+
+ new_frame = copy_frame (frame);
+ if (!new_frame) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ gf_client_ref (client);
+ new_frame->root->client = client;
+ local = sdfs_local_init (new_frame, this);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->main_frame = frame;
+
+ lock = GF_CALLOC (1, sizeof (*lock), gf_common_mt_char);
+ if (!lock)
+ goto err;
+
+ local->lock = lock;
+
+ ret = sdfs_init_entry_lock (&lock->entrylk[0], oldloc);
+ if (ret)
+ goto err;
+ lock->entrylk->locked[0] = _gf_false;
+
+ ++lock->lock_count;
+
+ ret = sdfs_init_entry_lock (&lock->entrylk[1], newloc);
+ if (ret)
+ goto err;
+ lock->entrylk->locked[1] = _gf_false;
+
+ ++lock->lock_count;
+
+ qsort (lock->entrylk, lock->lock_count, sizeof (*lock->entrylk),
+ sdfs_entry_lock_cmp);
+
+ local->lock = lock;
+ GF_ATOMIC_INIT (local->call_cnt, lock->lock_count);
+
+ stub = fop_rename_stub (new_frame, sdfs_rename_helper, oldloc,
+ newloc, xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ local->stub = stub;
+ call_cnt = GF_ATOMIC_GET (local->call_cnt);
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND_COOKIE (new_frame, sdfs_common_entrylk_cbk,
+ (void *)(long) i,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &lock->entrylk[i].parent_loc,
+ lock->entrylk[i].basename,
+ ENTRYLK_LOCK, ENTRYLK_WRLCK, xdata);
+ }
+
+ return 0;
+err:
+
+ STACK_UNWIND_STRICT (rename, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL, NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+sdfs_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, inode_t *inode,
+ struct iatt *stbuf, dict_t *xdata,
+ struct iatt *postparent)
+{
+ sdfs_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (!local->loc.parent) {
+ sdfs_local_cleanup (local);
+ frame->local = NULL;
+ STACK_UNWIND_STRICT (lookup, frame, op_ret, op_errno,
+ inode, stbuf, xdata, postparent);
+ return 0;
+ }
+
+ STACK_UNWIND_STRICT (lookup, local->main_frame, op_ret, op_errno, inode,
+ stbuf, xdata, postparent);
+
+ local->main_frame = NULL;
+ STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_UNLOCK, ENTRYLK_RDLCK, xdata);
+ return 0;
+}
+
+int
+sdfs_lookup_helper (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ char gfid[GF_UUID_BUF_SIZE] = {0};
+
+ local = frame->local;
+
+ gf_uuid_unparse(loc->pargfid, gfid);
+
+ if (local->op_ret < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ SDFS_MSG_ENTRYLK_ERROR,
+ "Acquiring entry lock failed for directory %s "
+ "with parent gfid %s", local->loc.name, gfid);
+ goto err;
+ }
+
+ STACK_WIND (frame, sdfs_lookup_cbk, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lookup, loc, xdata);
+
+ return 0;
+err:
+ STACK_UNWIND_STRICT (lookup, local->main_frame, -1, local->op_errno,
+ NULL, NULL, NULL, NULL);
+ local->main_frame = NULL;
+
+ SDFS_STACK_DESTROY (frame);
+ return 0;
+}
+
+int
+sdfs_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc,
+ dict_t *xdata)
+{
+ sdfs_local_t *local = NULL;
+ call_frame_t *new_frame = NULL;
+ call_stub_t *stub = NULL;
+ int op_errno = 0;
+
+ if (!loc->parent) {
+ local = sdfs_local_init (frame, this);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ STACK_WIND_TAIL(frame, FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->lookup,
+ loc, xdata);
+ return 0;
+ }
+
+ if (-1 == sdfs_get_new_frame (frame, loc, &new_frame)) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ stub = fop_lookup_stub (new_frame, sdfs_lookup_helper, loc,
+ xdata);
+ if (!stub) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ ((sdfs_local_t *)new_frame->local)->stub = stub;
+
+ STACK_WIND (new_frame, sdfs_entrylk_cbk,
+ FIRST_CHILD (this),
+ FIRST_CHILD(this)->fops->entrylk,
+ this->name, &local->parent_loc, local->loc.name,
+ ENTRYLK_LOCK, ENTRYLK_RDLCK, xdata);
+
+ return 0;
+
+err:
+ STACK_UNWIND_STRICT (lookup, frame, -1, op_errno, NULL, NULL,
+ NULL, NULL);
+
+ if (new_frame)
+ SDFS_STACK_DESTROY (new_frame);
+
+ return 0;
+}
+
+int
+init (xlator_t *this)
+{
+ int ret = -1;
+
+ if (!this->children || this->children->next) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "'dentry-fop-serializer' not configured with exactly one child");
+ goto out;
+ }
+
+ if (!this->parents) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "dangling volume. check volfile ");
+ }
+
+ this->local_pool = mem_pool_new (sdfs_local_t, 512);
+ if (!this->local_pool) {
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+int
+fini (xlator_t *this)
+{
+ mem_pool_destroy (this->local_pool);
+
+ return 0;
+}
+
+
+struct xlator_fops fops = {
+ .mkdir = sdfs_mkdir,
+ .rmdir = sdfs_rmdir,
+ .create = sdfs_create,
+ .unlink = sdfs_unlink,
+ .symlink = sdfs_symlink,
+ .link = sdfs_link,
+ .mknod = sdfs_mknod,
+ .rename = sdfs_rename,
+ .lookup = sdfs_lookup,
+};
+
+struct xlator_cbks cbks;
+
diff --git a/xlators/features/sdfs/src/sdfs.h b/xlators/features/sdfs/src/sdfs.h
new file mode 100644
index 0000000000..d28257eda5
--- /dev/null
+++ b/xlators/features/sdfs/src/sdfs.h
@@ -0,0 +1,49 @@
+/*
+ Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include "xlator.h"
+#include "call-stub.h"
+#include "sdfs-messages.h"
+#include "atomic.h"
+
+#define SDFS_LOCK_COUNT_MAX 2
+
+typedef struct{
+ loc_t parent_loc;
+ char *basename;
+ int locked[SDFS_LOCK_COUNT_MAX];
+} sdfs_entry_lock_t;
+
+typedef struct {
+ sdfs_entry_lock_t entrylk[SDFS_LOCK_COUNT_MAX];
+ int lock_count;
+} sdfs_lock_t;
+
+struct sdfs_local {
+ call_frame_t *main_frame;
+ loc_t loc;
+ loc_t parent_loc;
+ call_stub_t *stub;
+ sdfs_lock_t *lock;
+ int op_ret;
+ int op_errno;
+ gf_atomic_t call_cnt;
+};
+typedef struct sdfs_local sdfs_local_t;
+
+#define SDFS_STACK_DESTROY(frame) do { \
+ sdfs_local_t *__local = NULL; \
+ __local = frame->local; \
+ frame->local = NULL; \
+ gf_client_unref (frame->root->client); \
+ STACK_DESTROY (frame->root); \
+ sdfs_local_cleanup (__local); \
+ } while (0)
+
diff --git a/xlators/mgmt/glusterd/src/glusterd-volgen.c b/xlators/mgmt/glusterd/src/glusterd-volgen.c
index 6a02da1791..235a4bd402 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volgen.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volgen.c
@@ -2024,6 +2024,31 @@ out:
return ret;
}
+static int
+brick_graph_add_sdfs (volgen_graph_t *graph, glusterd_volinfo_t *volinfo,
+ dict_t *set_dict, glusterd_brickinfo_t *brickinfo)
+{
+ xlator_t *xl = NULL;
+ int ret = -1;
+
+ if (!graph || !volinfo)
+ goto out;
+
+ if (!dict_get_str_boolean (set_dict, "features.sdfs", 0)) {
+ /* update only if option is enabled */
+ ret = 0;
+ goto out;
+ }
+
+ xl = volgen_graph_add (graph, "features/sdfs", volinfo->volname);
+ if (!xl)
+ goto out;
+
+ ret = 0;
+out:
+ return ret;
+}
+
xlator_t *
add_one_peer (volgen_graph_t *graph, glusterd_brickinfo_t *peer,
char *volname, uint16_t index)
@@ -2616,6 +2641,7 @@ static volgen_brick_xlator_t server_graph_table[] = {
{brick_graph_add_server, NULL},
{brick_graph_add_decompounder, "decompounder"},
{brick_graph_add_io_stats, "NULL"},
+ {brick_graph_add_sdfs, "sdfs"},
{brick_graph_add_cdc, NULL},
{brick_graph_add_quota, "quota"},
{brick_graph_add_index, "index"},
diff --git a/xlators/mgmt/glusterd/src/glusterd-volume-set.c b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
index acb493540d..1d4797afbe 100644
--- a/xlators/mgmt/glusterd/src/glusterd-volume-set.c
+++ b/xlators/mgmt/glusterd/src/glusterd-volume-set.c
@@ -3718,6 +3718,16 @@ struct volopt_map_entry glusterd_volopt_map[] = {
.op_version = GD_OP_VERSION_3_13_0,
.flags = VOLOPT_FLAG_CLIENT_OPT
},
+ { .key = "features.sdfs",
+ .voltype = "features/sdfs",
+ .value = "off",
+ .option = "!features",
+ .op_version = GD_OP_VERSION_4_0_0,
+ .description = "enable/disable dentry serialization xlator in volume",
+ .flags = VOLOPT_FLAG_CLIENT_OPT | VOLOPT_FLAG_XLATOR_OPT,
+ .type = NO_DOC,
+ },
+
{ .key = NULL
}
};