summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/dht-common.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src/dht-common.c')
-rw-r--r--xlators/cluster/dht/src/dht-common.c391
1 files changed, 374 insertions, 17 deletions
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index 72ca77ea5a..092ddebb8f 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -32,6 +32,13 @@
int dht_link2 (xlator_t *this, call_frame_t *frame, int op_ret);
int
+dht_removexattr2 (xlator_t *this, call_frame_t *frame, int op_ret);
+
+int
+dht_setxattr2 (xlator_t *this, call_frame_t *frame, int op_ret);
+
+
+int
dht_aggregate_quota_xattr (dict_t *dst, char *key, data_t *value)
{
int ret = -1;
@@ -96,6 +103,8 @@ out:
return ret;
}
+
+
int
dht_aggregate (dict_t *this, char *key, data_t *value, void *data)
{
@@ -3265,6 +3274,78 @@ err:
}
int
+dht_file_setxattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, dict_t *xdata)
+{
+ int ret = -1;
+ dht_local_t *local = NULL;
+ call_frame_t *prev = NULL;
+ struct iatt *stbuf = NULL;
+ inode_t *inode = NULL;
+ xlator_t *subvol = NULL;
+
+
+ local = frame->local;
+ prev = cookie;
+
+ local->op_errno = op_errno;
+
+ if ((op_ret == -1) && !dht_inode_missing (op_errno)) {
+ gf_msg_debug (this->name, op_errno,
+ "subvolume %s returned -1.",
+ prev->this->name);
+ goto out;
+ }
+
+ if (local->call_cnt != 1)
+ goto out;
+
+ ret = dict_get_bin (xdata, DHT_IATT_IN_XDATA_KEY, (void **) &stbuf);
+
+ if ((!op_ret) && !stbuf) {
+ goto out;
+ }
+
+ local->op_ret = 0;
+
+ local->rebalance.target_op_fn = dht_setxattr2;
+
+ /* Phase 2 of migration */
+ if ((op_ret == -1) || IS_DHT_MIGRATION_PHASE2 (stbuf)) {
+ ret = dht_rebalance_complete_check (this, frame);
+ if (!ret)
+ return 0;
+ }
+
+ /* Phase 1 of migration */
+ if (IS_DHT_MIGRATION_PHASE1 (stbuf)) {
+ inode = (local->fd) ? local->fd->inode : local->loc.inode;
+ dht_inode_ctx_get1 (this, inode, &subvol);
+ if (subvol) {
+ dht_setxattr2 (this, frame, 0);
+ return 0;
+ }
+ ret = dht_rebalance_in_progress_check (this, frame);
+ if (!ret)
+ return 0;
+ }
+
+out:
+ if (local->rebalance.xdata)
+ dict_unref (local->rebalance.xdata);
+
+ if (local->fop == GF_FOP_SETXATTR) {
+ DHT_STACK_UNWIND (setxattr, frame, op_ret, op_errno, NULL);
+ } else {
+ DHT_STACK_UNWIND (fsetxattr, frame, op_ret, op_errno, NULL);
+ }
+
+ return 0;
+}
+
+
+
+int
dht_fsetxattr (call_frame_t *frame, xlator_t *this,
fd_t *fd, dict_t *xattr, int flags, dict_t *xdata)
{
@@ -3272,6 +3353,10 @@ dht_fsetxattr (call_frame_t *frame, xlator_t *this,
dht_local_t *local = NULL;
int op_errno = EINVAL;
dht_conf_t *conf = NULL;
+ dht_layout_t *layout = NULL;
+ int ret = -1;
+ int call_cnt = 0;
+ int i = 0;
VALIDATE_OR_GOTO (frame, err);
VALIDATE_OR_GOTO (this, err);
@@ -3299,11 +3384,47 @@ dht_fsetxattr (call_frame_t *frame, xlator_t *this,
goto err;
}
- local->call_cnt = 1;
+ layout = local->layout;
+ if (!layout) {
+ gf_msg_debug (this->name, 0,
+ "no layout for fd=%p", fd);
+ op_errno = EINVAL;
+ goto err;
+ }
+
+ local->call_cnt = call_cnt = layout->cnt;
+
+ if (IA_ISDIR (fd->inode->ia_type)) {
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND (frame, dht_err_cbk,
+ layout->list[i].xlator,
+ layout->list[i].xlator->fops->fsetxattr,
+ fd, xattr, flags, NULL);
+ }
+
+ } else {
+
+ local->call_cnt = 1;
+ local->rebalance.xdata = dict_ref (xattr);
+ local->rebalance.flags = flags;
+
+ xdata = xdata ? dict_ref (xdata) : dict_new ();
+ if (xdata)
+ ret = dict_set_dynstr_with_alloc (xdata,
+ DHT_IATT_IN_XDATA_KEY, "yes");
+ if (ret) {
+ gf_msg_debug (this->name, 0,
+ "Failed to set dictionary key %s for fd=%p",
+ DHT_IATT_IN_XDATA_KEY, fd);
+ }
- STACK_WIND (frame, dht_err_cbk, subvol, subvol->fops->fsetxattr,
- fd, xattr, flags, NULL);
+ STACK_WIND (frame, dht_file_setxattr_cbk, subvol,
+ subvol->fops->fsetxattr, fd, xattr, flags, xdata);
+
+ if (xdata)
+ dict_unref (xdata);
+ }
return 0;
err:
@@ -3324,6 +3445,7 @@ dht_common_setxattr_cbk (call_frame_t *frame, void *cookie,
return 0;
}
+
int
dht_checking_pathinfo_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, dict_t *xattr,
@@ -3365,6 +3487,55 @@ out:
}
+
+int
+dht_setxattr2 (xlator_t *this, call_frame_t *frame, int op_ret)
+{
+ dht_local_t *local = NULL;
+ xlator_t *subvol = NULL;
+ int op_errno = EINVAL;
+ inode_t *inode = NULL;
+
+ local = frame->local;
+
+ inode = (local->fd) ? local->fd->inode : local->loc.inode;
+
+ dht_inode_ctx_get1 (this, inode, &subvol);
+
+ /* In phase2, dht_migration_complete_check_task will
+ * reset inode_ctx_reset1 and update local->cached_subvol
+ * with the dst subvol.
+ */
+ if (!subvol)
+ subvol = local->cached_subvol;
+
+ if (!subvol) {
+ op_errno = EINVAL;
+ goto err;
+ }
+
+ local->call_cnt = 2; /* This is the second attempt */
+
+ if (local->fop == GF_FOP_SETXATTR) {
+ STACK_WIND (frame, dht_file_setxattr_cbk, subvol,
+ subvol->fops->setxattr, &local->loc,
+ local->rebalance.xdata, local->rebalance.flags,
+ NULL);
+ } else {
+ STACK_WIND (frame, dht_file_setxattr_cbk, subvol,
+ subvol->fops->fsetxattr, local->fd,
+ local->rebalance.xdata, local->rebalance.flags,
+ NULL);
+ }
+
+ return 0;
+
+err:
+ DHT_STACK_UNWIND (setxattr, frame, local->op_ret, op_errno, NULL);
+ return 0;
+}
+
+
int
dht_setxattr (call_frame_t *frame, xlator_t *this,
loc_t *loc, dict_t *xattr, int flags, dict_t *xdata)
@@ -3588,11 +3759,32 @@ dht_setxattr (call_frame_t *frame, xlator_t *this,
goto err;
}
- for (i = 0; i < call_cnt; i++) {
- STACK_WIND (frame, dht_err_cbk,
- layout->list[i].xlator,
- layout->list[i].xlator->fops->setxattr,
+ if (IA_ISDIR (loc->inode->ia_type)) {
+
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND (frame, dht_err_cbk,
+ layout->list[i].xlator,
+ layout->list[i].xlator->fops->setxattr,
+ loc, xattr, flags, xdata);
+ }
+
+ } else {
+
+ local->rebalance.xdata = dict_ref (xattr);
+ local->rebalance.flags = flags;
+ local->call_cnt = 1;
+
+ xdata = xdata ? dict_ref (xdata) : dict_new ();
+ if (xdata)
+ ret = dict_set_dynstr_with_alloc (xdata,
+ DHT_IATT_IN_XDATA_KEY, "yes");
+
+ STACK_WIND (frame, dht_file_setxattr_cbk,
+ subvol, subvol->fops->setxattr,
loc, xattr, flags, xdata);
+
+ if (xdata)
+ dict_unref (xdata);
}
return 0;
@@ -3605,6 +3797,123 @@ err:
}
+
+
+int
+dht_file_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, dict_t *xdata)
+{
+ int ret = -1;
+ dht_local_t *local = NULL;
+ call_frame_t *prev = NULL;
+ struct iatt *stbuf = NULL;
+ inode_t *inode = NULL;
+ xlator_t *subvol = NULL;
+
+
+ local = frame->local;
+ prev = cookie;
+
+ local->op_errno = op_errno;
+
+ if ((op_ret == -1) && !dht_inode_missing (op_errno)) {
+ gf_msg_debug (this->name, op_errno,
+ "subvolume %s returned -1",
+ prev->this->name);
+ goto out;
+ }
+
+ if (local->call_cnt != 1)
+ goto out;
+
+ ret = dict_get_bin (xdata, DHT_IATT_IN_XDATA_KEY, (void **) &stbuf);
+
+ if ((!op_ret) && !stbuf) {
+ goto out;
+ }
+
+ local->op_ret = 0;
+
+ local->rebalance.target_op_fn = dht_removexattr2;
+
+ /* Phase 2 of migration */
+ if ((op_ret == -1) || IS_DHT_MIGRATION_PHASE2 (stbuf)) {
+ ret = dht_rebalance_complete_check (this, frame);
+ if (!ret)
+ return 0;
+ }
+
+ /* Phase 1 of migration */
+ if (IS_DHT_MIGRATION_PHASE1 (stbuf)) {
+ inode = (local->fd) ? local->fd->inode : local->loc.inode;
+ dht_inode_ctx_get1 (this, inode, &subvol);
+ if (subvol) {
+ dht_removexattr2 (this, frame, 0);
+ return 0;
+ }
+ ret = dht_rebalance_in_progress_check (this, frame);
+ if (!ret)
+ return 0;
+ }
+
+out:
+ if (local->fop == GF_FOP_REMOVEXATTR) {
+ DHT_STACK_UNWIND (removexattr, frame, op_ret, op_errno, NULL);
+ } else {
+ DHT_STACK_UNWIND (fremovexattr, frame, op_ret, op_errno, NULL);
+ }
+ return 0;
+
+}
+
+int
+dht_removexattr2 (xlator_t *this, call_frame_t *frame, int op_ret)
+{
+ dht_local_t *local = NULL;
+ xlator_t *subvol = NULL;
+ int op_errno = EINVAL;
+ inode_t *inode = NULL;
+
+ local = frame->local;
+
+ inode = (local->fd) ? local->fd->inode : local->loc.inode;
+
+ dht_inode_ctx_get1 (this, inode, &subvol);
+
+ /* In phase2, dht_migration_complete_check_task will
+ * reset inode_ctx_reset1 and update local->cached_subvol
+ * with the dst subvol.
+ */
+ if (!subvol)
+ subvol = local->cached_subvol;
+
+ if (!subvol) {
+ op_errno = EINVAL;
+ goto err;
+
+ }
+
+
+ local->call_cnt = 2; /* This is the second attempt */
+
+ if (local->fop == GF_FOP_REMOVEXATTR) {
+ STACK_WIND (frame, dht_file_removexattr_cbk, subvol,
+ subvol->fops->removexattr, &local->loc,
+ local->key, NULL);
+ } else {
+ STACK_WIND (frame, dht_file_removexattr_cbk, subvol,
+ subvol->fops->fremovexattr, local->fd,
+ local->key, NULL);
+ }
+
+ return 0;
+
+err:
+ DHT_STACK_UNWIND (removexattr, frame, local->op_ret, op_errno, NULL);
+ return 0;
+}
+
+
int
dht_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, dict_t *xdata)
@@ -3631,6 +3940,8 @@ dht_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
unlock:
UNLOCK (&frame->lock);
+
+
this_call_cnt = dht_frame_return (frame);
if (is_last_call (this_call_cnt)) {
DHT_STACK_UNWIND (removexattr, frame, local->op_ret,
@@ -3652,6 +3963,7 @@ dht_removexattr (call_frame_t *frame, xlator_t *this,
int call_cnt = 0;
dht_conf_t *conf = NULL;
int i;
+ int ret = 0;
VALIDATE_OR_GOTO (this, err);
VALIDATE_OR_GOTO (this->private, err);
@@ -3689,11 +4001,33 @@ dht_removexattr (call_frame_t *frame, xlator_t *this,
local->call_cnt = call_cnt = layout->cnt;
local->key = gf_strdup (key);
- for (i = 0; i < call_cnt; i++) {
- STACK_WIND (frame, dht_removexattr_cbk,
- layout->list[i].xlator,
- layout->list[i].xlator->fops->removexattr,
- loc, key, NULL);
+ if (IA_ISDIR (loc->inode->ia_type)) {
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND (frame, dht_removexattr_cbk,
+ layout->list[i].xlator,
+ layout->list[i].xlator->fops->removexattr,
+ loc, key, NULL);
+ }
+
+ } else {
+
+ local->call_cnt = 1;
+ xdata = xdata ? dict_ref (xdata) : dict_new ();
+ if (xdata)
+ ret = dict_set_dynstr_with_alloc (xdata,
+ DHT_IATT_IN_XDATA_KEY, "yes");
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to "
+ "set dictionary key %s for %s",
+ DHT_IATT_IN_XDATA_KEY, loc->path);
+ }
+
+ STACK_WIND (frame, dht_file_removexattr_cbk,
+ subvol, subvol->fops->removexattr,
+ loc, key, xdata);
+
+ if (xdata)
+ dict_unref (xdata);
}
return 0;
@@ -3715,6 +4049,7 @@ dht_fremovexattr (call_frame_t *frame, xlator_t *this,
dht_layout_t *layout = NULL;
int call_cnt = 0;
dht_conf_t *conf = 0;
+ int ret = 0;
int i;
@@ -3754,11 +4089,33 @@ dht_fremovexattr (call_frame_t *frame, xlator_t *this,
local->call_cnt = call_cnt = layout->cnt;
local->key = gf_strdup (key);
- for (i = 0; i < call_cnt; i++) {
- STACK_WIND (frame, dht_removexattr_cbk,
- layout->list[i].xlator,
- layout->list[i].xlator->fops->fremovexattr,
- fd, key, NULL);
+ if (IA_ISDIR (fd->inode->ia_type)) {
+ for (i = 0; i < call_cnt; i++) {
+ STACK_WIND (frame, dht_removexattr_cbk,
+ layout->list[i].xlator,
+ layout->list[i].xlator->fops->fremovexattr,
+ fd, key, NULL);
+ }
+
+ } else {
+
+ local->call_cnt = 1;
+ xdata = xdata ? dict_ref (xdata) : dict_new ();
+ if (xdata)
+ ret = dict_set_dynstr_with_alloc (xdata,
+ DHT_IATT_IN_XDATA_KEY, "yes");
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to "
+ "set dictionary key %s for fd=%p",
+ DHT_IATT_IN_XDATA_KEY, fd);
+ }
+
+ STACK_WIND (frame, dht_file_removexattr_cbk,
+ subvol, subvol->fops->fremovexattr,
+ fd, key, xdata);
+
+ if (xdata)
+ dict_unref (xdata);
}
return 0;