/*
Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <libgen.h>
#include <unistd.h>
#include <fnmatch.h>
#include <sys/time.h>
#include <stdlib.h>
#include <signal.h>
#include "glusterfs.h"
#include "afr.h"
#include "dict.h"
#include "xlator.h"
#include "hashfn.h"
#include "logging.h"
#include "stack.h"
#include "list.h"
#include "call-stub.h"
#include "defaults.h"
#include "common-utils.h"
#include "compat-errno.h"
#include "compat.h"
#include "protocol-common.h"
#include "byte-order.h"
#include "afr-transaction.h"
#include "afr-self-heal.h"
#include "afr-messages.h"
static void
__afr_inode_write_finalize (call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
int read_subvol = 0;
int i = 0;
afr_read_subvol_args_t args = {0,};
struct iatt *stbuf = NULL;
int ret = 0;
local = frame->local;
priv = this->private;
/*This code needs to stay till DHT sends fops on linked
* inodes*/
if (local->inode && !inode_is_linked (local->inode)) {
for (i = 0; i < priv->child_count; i++) {
if (!local->replies[i].valid)
continue;
if (local->replies[i].op_ret == -1)
continue;
if (!gf_uuid_is_null
(local->replies[i].poststat.ia_gfid)) {
gf_uuid_copy (args.gfid,
local->replies[i].poststat.ia_gfid);
args.ia_type =
local->replies[i].poststat.ia_type;
break;
} else {
ret = dict_get_bin (local->replies[i].xdata,
DHT_IATT_IN_XDATA_KEY,
(void **) &stbuf);
if (ret)
continue;
gf_uuid_copy (args.gfid, stbuf->ia_gfid);
args.ia_type = stbuf->ia_type;
break;
}
}
}
if (local->inode) {
if (local->transaction.type == AFR_METADATA_TRANSACTION)
read_subvol = afr_metadata_subvol_get (local->inode, this,
NULL, NULL,
&args);
else
read_subvol = afr_data_subvol_get (local->inode, this,
NULL, NULL, NULL, &args);
}
local->op_ret = -1;
local->op_errno = afr_final_errno (local, priv);
for (i = 0; i < priv->child_count; i++) {
if (!local->replies[i].valid)
continue;
if (local->replies[i].op_ret < 0) {
afr_inode_read_subvol_reset (local->inode, this);
continue;
}
/* Order of checks in the compound conditional
below is important.
- Highest precedence: largest op_ret
- Next precendence: if all op_rets are equal, read subvol
- Least precedence: any succeeded subvol
*/
if ((local->op_ret < local->replies[i].op_ret) ||
((local->op_ret == local->replies[i].op_ret) &&
(i == read_subvol))) {
local->op_ret = local->replies[i].op_ret;
local->op_errno = local->replies[i].op_errno;
local->cont.inode_wfop.prebuf =
local->replies[i].prestat;
local->cont.inode_wfop.postbuf =
local->replies[i].poststat;
if (local->replies[i].xdata) {
if (local->xdata_rsp)
dict_unref (local->xdata_rsp);
local->xdata_rsp =
dict_ref (local->replies[i].xdata);
}
if (local->replies[i].xattr) {
if (local->xattr_rsp)
dict_unref (local->xattr_rsp);
local->xattr_rsp =
dict_ref (local->replies[i].xattr);
}
}
}
afr_txn_arbitrate_fop_cbk (frame, this);
}
static void
__afr_inode_write_fill (call_frame_t *frame, xlator_t *this, int child_index,
int op_ret, int op_errno,
struct iatt *prebuf, struct iatt *postbuf,
dict_t *xattr, dict_t *xdata)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
local = frame->local;
priv = this->private;
local->replies[child_index].valid = 1;
if (AFR_IS_ARBITER_BRICK(priv, child_index) && op_ret == 1)
op_ret = iov_length (local->cont.writev.vector,
local->cont.writev.count);
local->replies[child_index].op_ret = op_ret;
local->replies[child_index].op_errno = op_errno;
if (op_ret >= 0) {
if (prebuf)
local->replies[child_index].prestat = *prebuf;
if (postbuf)
local->replies[child_index].poststat = *postbuf;
if (xattr)
local->replies[child_index].xattr = dict_ref (xattr);
if (xdata)
local->replies[child_index].xdata = dict_ref (xdata);
} else {
afr_transaction_fop_failed (frame, this, child_index);
}
return;
}
static int
__afr_inode_write_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xattr, dict_t *xdata)
{
afr_local_t *local = NULL;
int child_index = (long) cookie;
int call_count = -1;
afr_private_t *priv = NULL;
priv = this->private;
local = frame->local;
LOCK (&frame->lock);
{
__afr_inode_write_fill (frame, this, child_index, op_ret,
op_errno, prebuf, postbuf, xattr,
xdata);
}
UNLOCK (&frame->lock);
call_count = afr_frame_return (frame);
if (call_count == 0) {
__afr_inode_write_finalize (frame, this);
if (afr_txn_nothing_failed (frame, this)) {
/*if it did pre-op, it will do post-op changing ctime*/
if (priv->consistent_metadata &&
afr_needs_changelog_update (local))
afr_zero_fill_stat (local);
local->transaction.unwind (frame, this);
}
local->transaction.resume (frame, this);
}
return 0;
}
/* {{{ writev */
void
afr_writev_copy_outvars (call_frame_t *src_frame, call_frame_t *dst_frame)
{
afr_local_t *src_local = NULL;
afr_local_t *dst_local = NULL;
src_local = src_frame->local;
dst_local = dst_frame->local;
dst_local->op_ret = src_local->op_ret;
dst_local->op_errno = src_local->op_errno;
dst_local->cont.inode_wfop.prebuf = src_local->cont.inode_wfop.prebuf;
dst_local->cont.inode_wfop.postbuf = src_local->cont.inode_wfop.postbuf;
if (src_local->xdata_rsp)
dst_local->xdata_rsp = dict_ref (src_local->xdata_rsp);
}
void
afr_writev_unwind (call_frame_t *frame, xlator_t *this)
{
afr_local_t * local = NULL;
afr_private_t *priv = this->private;
local = frame->local;
if (priv->consistent_metadata)
afr_zero_fill_stat (local);
AFR_STACK_UNWIND (writev, frame,
local->op_ret, local->op_errno,
&local->cont.inode_wfop.prebuf,
&local->cont.inode_wfop.postbuf,
local->xdata_rsp);
}
int
afr_transaction_writev_unwind (call_frame_t *frame, xlator_t *this)
{
call_frame_t *fop_frame = NULL;
fop_frame = afr_transaction_detach_fop_frame (frame);
if (fop_frame) {
afr_writev_copy_outvars (frame, fop_frame);
afr_writev_unwind (fop_frame, this);
}
return 0;
}
static void
afr_writev_handle_short_writes (call_frame_t *frame, xlator_t *this)
{
afr_local_t *local = NULL;
afr_private_t *priv = NULL;
int i = 0;
local = frame->local;
priv = this->private;
/*
* We already have the best case result of the writev calls staged
* as the return value. Any writev that returns some value less
* than the best case is now out of sync, so mark the fop as
* failed. Note that fops that have returned with errors have
* already been marked as failed.
*/
for (i = 0; i < priv->child_count; i++) {
if ((!local->replies[i].valid) ||
(local->replies[i].op_ret == -1))
continue;
if (local->replies[i].op_ret < local->op_ret)
afr_transaction_fop_failed (frame, this, i);
}
}
int
afr_writev_wind_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
struct iatt *postbuf, dict_t *xdata)
{
afr_local_t * local = NULL;
call_frame_t *fop_frame = NULL;
int child_index = (long) cookie;
int call_count = -1;
int ret = 0;
uint32_t open_fd_count = 0;
uint32_t write_is_append = 0;
local = frame->local;
LOCK (&frame->lock);
{
__afr_inode_write_fill (frame, this, child_index, op_ret,
op_errno, prebuf, postbuf, NULL, xdata);
if (op_ret == -1 || !xdata)
goto unlock;
write_is_append = 0;
ret = dict_get_uint32 (xdata, GLUSTERFS_WRITE_IS_APPEND,
&write_is_append);
if (ret || !write_is_append)
local->append_write = _gf_false;
ret = dict_get_uint32 (xdata, GLUSTERFS_OPEN_FD_COUNT,
&open_fd_count);
if (ret == -1)
goto unlock;
if ((open_fd_count > local->open_fd_count)) {
local->open_fd_count = open_fd_count;
local->update_open_fd_count = _gf_true;
}
}
unlock:
UNLOCK (&frame->lock);
call_count = afr_frame_return (frame);
if (call_count == 0) {
if (!local->stable_write && !local->append_write)
/* An appended write removes the necessity to
fsync() the file. This is because self-heal
has the logic to check for larger file when
the xattrs are not reliably pointing at
a stale file.
*/
afr_fd_report_unstable_write (this, local->fd);
__afr_inode_write_finalize (frame, this);
afr_writev_handle_short_writes (frame, this);
if (local->update_open_fd_count)
afr_handle_open_fd_count (frame, this);
if (!afr_txn_nothing_failed (frame, this)) {
//Don't unwind until post-op is complete
local->transaction.resume (frame, this);
} else {
/*
* Generally inode-write fops do transaction.unwind then
* transaction.resume, but writev needs to make sure that
* delayed post-op frame is placed in fdctx before unwind
* happens. This prevents the race of flush doing the
* changelog wakeup first in fuse thread and then this
* writev placing its delayed post-op frame in fdctx.
* This helps flush make sure all the delayed post-ops are
* completed.
*/
fop_frame = afr_transaction_detach_fop_frame (frame);
afr_writev_copy_outvars (frame, fop_frame);
local->transaction.resume (frame, this);
afr_writev_unwind (fop_frame, this);
}
}
return 0;
}
static int
afr_arbiter_writev_wind (call_frame_t *frame, xlator_t *this, int subvol)
{
afr_local_t *local = frame->local;
afr_private_t *priv = this->private;
static char byte = 0xFF;
static struct iovec vector = {&byte, 1};
int32_t count = 1;
STACK_WIND_COOKIE (frame, afr_writev_wind_cbk, (void *) (long) subvol,
priv->children[subvol],
|