summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/Makefile.am2
-rw-r--r--tools/glusterfind/Makefile.am7
-rw-r--r--tools/glusterfind/glusterfind.in17
-rw-r--r--tools/glusterfind/src/Makefile.am14
-rw-r--r--tools/glusterfind/src/__init__.py9
-rw-r--r--tools/glusterfind/src/brickfind.py97
-rw-r--r--tools/glusterfind/src/changelog.py309
-rw-r--r--tools/glusterfind/src/conf.py28
-rw-r--r--tools/glusterfind/src/libgfchangelog.py83
-rw-r--r--tools/glusterfind/src/main.py468
-rw-r--r--tools/glusterfind/src/nodecleanup.py51
-rw-r--r--tools/glusterfind/src/tool.conf.in11
-rw-r--r--tools/glusterfind/src/utils.py203
13 files changed, 1298 insertions, 1 deletions
diff --git a/tools/Makefile.am b/tools/Makefile.am
index 74229ab41e..d689f60fa5 100644
--- a/tools/Makefile.am
+++ b/tools/Makefile.am
@@ -1,3 +1,3 @@
-SUBDIRS = gfind_missing_files
+SUBDIRS = gfind_missing_files glusterfind
CLEANFILES =
diff --git a/tools/glusterfind/Makefile.am b/tools/glusterfind/Makefile.am
new file mode 100644
index 0000000000..c99a3ddcb3
--- /dev/null
+++ b/tools/glusterfind/Makefile.am
@@ -0,0 +1,7 @@
+SUBDIRS = src
+
+EXTRA_DIST =
+
+bin_SCRIPTS = glusterfind
+
+CLEANFILES = $(bin_SCRIPTS)
diff --git a/tools/glusterfind/glusterfind.in b/tools/glusterfind/glusterfind.in
new file mode 100644
index 0000000000..cff8973980
--- /dev/null
+++ b/tools/glusterfind/glusterfind.in
@@ -0,0 +1,17 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sys
+sys.path.insert(1, '@GLUSTERFS_LIBEXECDIR@/')
+
+from glusterfind.main import main
+
+if __name__ == "__main__":
+ main()
diff --git a/tools/glusterfind/src/Makefile.am b/tools/glusterfind/src/Makefile.am
new file mode 100644
index 0000000000..458b820fd1
--- /dev/null
+++ b/tools/glusterfind/src/Makefile.am
@@ -0,0 +1,14 @@
+glusterfinddir = $(libexecdir)/glusterfs/glusterfind
+
+glusterfind_PYTHON = conf.py utils.py __init__.py \
+ main.py libgfchangelog.py
+
+glusterfind_SCRIPTS = changelog.py nodecleanup.py \
+ brickfind.py
+
+glusterfind_DATA = tool.conf
+
+EXTRA_DIST = changelog.py nodecleanup.py brickfind.py \
+ tool.conf
+
+CLEANFILES =
diff --git a/tools/glusterfind/src/__init__.py b/tools/glusterfind/src/__init__.py
new file mode 100644
index 0000000000..eb941c6d67
--- /dev/null
+++ b/tools/glusterfind/src/__init__.py
@@ -0,0 +1,9 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
diff --git a/tools/glusterfind/src/brickfind.py b/tools/glusterfind/src/brickfind.py
new file mode 100644
index 0000000000..4aee225d22
--- /dev/null
+++ b/tools/glusterfind/src/brickfind.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import os
+import sys
+import logging
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+from errno import ENOENT
+
+from utils import mkdirp, setup_logger, create_file, output_write, find
+import conf
+
+
+PROG_DESCRIPTION = """
+Changelog Crawler
+"""
+
+logger = logging.getLogger()
+
+
+def brickfind_crawl(brick, args):
+ if brick.endswith("/"):
+ brick = brick[0:len(brick)-1]
+
+ working_dir = os.path.dirname(args.outfile)
+ mkdirp(working_dir, exit_on_err=True, logger=logger)
+ create_file(args.outfile, exit_on_err=True, logger=logger)
+
+ with open(args.outfile, "a+") as fout:
+ brick_path_len = len(brick)
+
+ def mtime_filter(path):
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError) as e:
+ if e.errno == ENOENT:
+ st = None
+ else:
+ raise
+
+ if st and (st.st_mtime > args.start or st.st_ctime > args.start):
+ return True
+
+ return False
+
+ def output_callback(path):
+ path = path.strip()
+ path = path[brick_path_len+1:]
+ output_write(fout, path, args.output_prefix)
+
+ if args.full:
+ find(brick, callback_func=output_callback,
+ ignore_dirs=[".glusterfs"])
+ else:
+ find(brick, callback_func=output_callback,
+ filter_func=mtime_filter,
+ ignore_dirs=[".glusterfs"])
+
+ fout.flush()
+ os.fsync(fout.fileno())
+
+
+def _get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description=PROG_DESCRIPTION)
+
+ parser.add_argument("session", help="Session Name")
+ parser.add_argument("volume", help="Volume Name")
+ parser.add_argument("brick", help="Brick Name")
+ parser.add_argument("outfile", help="Output File")
+ parser.add_argument("start", help="Start Time", type=float)
+ parser.add_argument("--debug", help="Debug", action="store_true")
+ parser.add_argument("--full", help="Full Find", action="store_true")
+ parser.add_argument("--output-prefix", help="File prefix in output",
+ default=".")
+
+ return parser.parse_args()
+
+
+if __name__ == "__main__":
+ args = _get_args()
+ mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "brickfind.log")
+ setup_logger(logger, log_file, args.debug)
+ brickfind_crawl(args.brick, args)
+ sys.exit(0)
diff --git a/tools/glusterfind/src/changelog.py b/tools/glusterfind/src/changelog.py
new file mode 100644
index 0000000000..b7697ea503
--- /dev/null
+++ b/tools/glusterfind/src/changelog.py
@@ -0,0 +1,309 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import os
+import sys
+import time
+import xattr
+from errno import ENOENT
+import logging
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+import hashlib
+
+import libgfchangelog
+from utils import create_file, mkdirp, execute, symlink_gfid_to_path
+from utils import fail, setup_logger, output_write, find
+import conf
+
+
+CHANGELOG_LOG_LEVEL = 9
+CHANGELOG_CONN_RETRIES = 5
+CHANGELOGAPI_NUM_WORKERS = 3
+PROG_DESCRIPTION = """
+Changelog Crawler
+"""
+history_turns = 0
+history_turn_time = 0
+
+logger = logging.getLogger()
+
+
+def gfid_to_path_using_batchfind(brick, gfids_file, output_file):
+ """
+ find -samefile gets the inode number and crawls entire namespace
+ to get the list of files/dirs having same inode number.
+ Do find without any option, except the ignore directory option,
+ print the output in <INODE_NUM> <PATH> format, use this output
+ to look into in-memory dictionary of inode numbers got from the
+ list of GFIDs
+ """
+ with open(output_file, "a+") as fout:
+ inode_dict = {}
+ with open(gfids_file) as f:
+ for gfid in f:
+ gfid = gfid.strip()
+ backend_path = os.path.join(brick, ".glusterfs",
+ gfid[0:2], gfid[2:4], gfid)
+
+ try:
+ inode_dict[str(os.stat(backend_path).st_ino)] = 1
+ except (IOError, OSError) as e:
+ if e.errno == ENOENT:
+ continue
+ else:
+ fail("%s Failed to convert to path from "
+ "GFID %s: %s" % (brick, gfid, e), logger=logger)
+
+ if not inode_dict:
+ return
+
+ def inode_filter(path):
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError) as e:
+ if e.errno == ENOENT:
+ st = None
+ else:
+ raise
+
+ if st and inode_dict.get(str(st.st_ino), None):
+ return True
+
+ return False
+
+ brick_path_len = len(brick)
+
+ def output_callback(path):
+ path = path.strip()
+ path = path[brick_path_len+1:]
+ output_write(fout, path, args.output_prefix)
+
+ # Length of brick path, to remove from output path
+ find(brick, callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=[".glusterfs"])
+
+ fout.flush()
+ os.fsync(fout.fileno())
+
+
+def gfid_to_path_using_pgfid(brick, gfids_file, output_file, outfile_failures):
+ """
+ Parent GFID is saved as xattr, collect Parent GFIDs from all
+ the files from gfids_file. Convert parent GFID to path and Crawl
+ each directories to get the list of files/dirs having same inode number.
+ Do find with maxdepth as 1 and print the output in <INODE_NUM> <PATH>
+ format, use this output to look into in memory dictionary of inode
+ numbers got from the list of GFIDs
+ """
+ with open(output_file, "a+") as fout:
+ pgfids = set()
+ inode_dict = {}
+ with open(gfids_file) as f:
+ for gfid in f:
+ gfid = gfid.strip()
+ p = os.path.join(brick,
+ ".glusterfs",
+ gfid[0:2],
+ gfid[2:4],
+ gfid)
+ if os.path.islink(p):
+ path = symlink_gfid_to_path(brick, gfid)
+ output_write(fout, path, args.output_prefix)
+ else:
+ try:
+ inode_dict[str(os.stat(p).st_ino)] = 1
+ file_xattrs = xattr.list(p)
+ num_parent_gfid = 0
+ for x in file_xattrs:
+ if x.startswith("trusted.pgfid."):
+ num_parent_gfid += 1
+ pgfids.add(x.split(".")[-1])
+
+ if num_parent_gfid == 0:
+ with open(outfile_failures, "a") as f:
+ f.write("%s\n" % gfid)
+ f.flush()
+ os.fsync(f.fileno())
+
+ except (IOError, OSError) as e:
+ if e.errno == ENOENT:
+ continue
+ else:
+ fail("%s Failed to convert to path from "
+ "GFID %s: %s" % (brick, gfid, e),
+ logger=logger)
+
+ if not inode_dict:
+ return
+
+ def inode_filter(path):
+ try:
+ st = os.lstat(path)
+ except (OSError, IOError) as e:
+ if e.errno == ENOENT:
+ st = None
+ else:
+ raise
+
+ if st and inode_dict.get(str(st.st_ino), None):
+ return True
+
+ return False
+
+ # Length of brick path, to remove from output path
+ brick_path_len = len(brick)
+
+ def output_callback(path):
+ path = path.strip()
+ path = path[brick_path_len+1:]
+ output_write(fout, path, args.output_prefix)
+
+ for pgfid in pgfids:
+ path = symlink_gfid_to_path(brick, pgfid)
+ find(os.path.join(brick, path),
+ callback_func=output_callback,
+ filter_func=inode_filter,
+ ignore_dirs=[".glusterfs"],
+ subdirs_crawl=False)
+
+ fout.flush()
+ os.fsync(fout.fileno())
+
+
+def sort_unique(filename):
+ execute(["sort", "-u", "-o", filename, filename],
+ exit_msg="Sort failed", logger=logger)
+
+
+def get_changes(brick, hash_dir, log_file, end, args):
+ """
+ Makes use of libgfchangelog's history API to get changelogs
+ containing changes from start and end time. Further collects
+ the modified gfids from the changelogs and writes the list
+ of gfid to 'gfid_list' file.
+ """
+ try:
+ libgfchangelog.cl_register(brick, hash_dir, log_file,
+ CHANGELOG_LOG_LEVEL, CHANGELOG_CONN_RETRIES)
+ except libgfchangelog.ChangelogException as e:
+ fail("%s Changelog register failed: %s" % (brick, e), logger=logger)
+
+ # Output files to record GFIDs and GFID to Path failure GFIDs
+ gfid_list_path = args.outfile + ".gfids"
+ gfid_list_failures_file = gfid_list_path + ".failures"
+ create_file(gfid_list_path, exit_on_err=True, logger=logger)
+ create_file(gfid_list_failures_file, exit_on_err=True, logger=logger)
+
+ # Changelogs path(Hard coded to BRICK/.glusterfs/changelogs
+ cl_path = os.path.join(brick, ".glusterfs/changelogs")
+
+ # Fail if History fails for requested Start and End
+ try:
+ actual_end = libgfchangelog.cl_history_changelog(
+ cl_path, args.start, end, CHANGELOGAPI_NUM_WORKERS)
+ except libgfchangelog.ChangelogException as e:
+ fail("%s Historical Changelogs not available: %s" % (brick, e),
+ logger=logger)
+
+ try:
+ # scan followed by getchanges till scan returns zero.
+ # history_scan() is blocking call, till it gets the number
+ # of changelogs to process. Returns zero when no changelogs
+ # to be processed. returns positive value as number of changelogs
+ # to be processed, which will be fetched using
+ # history_getchanges()
+ changes = []
+ while libgfchangelog.cl_history_scan() > 0:
+ changes += libgfchangelog.cl_history_getchanges()
+
+ if changes:
+ with open(gfid_list_path, 'a+') as fgfid:
+ for change in changes:
+ with open(change) as f:
+ for line in f:
+ # Space delimited list, collect GFID
+ details = line.split()
+ fgfid.write("%s\n" % details[1])
+
+ libgfchangelog.cl_history_done(change)
+ fgfid.flush()
+ os.fsync(fgfid.fileno())
+ except libgfchangelog.ChangelogException as e:
+ fail("%s Error during Changelog Crawl: %s" % (brick, e),
+ logger=logger)
+
+ # If TS returned from history_changelog is < end time
+ # then FS crawl may be required, since history is only available
+ # till TS returned from history_changelog
+ if actual_end < end:
+ fail("Partial History available with Changelog", 2, logger=logger)
+
+ sort_unique(gfid_list_path)
+ gfid_to_path_using_pgfid(brick, gfid_list_path,
+ args.outfile, gfid_list_failures_file)
+ gfid_to_path_using_batchfind(brick, gfid_list_failures_file, args.outfile)
+
+
+def changelog_crawl(brick, end, args):
+ """
+ Init function, prepares working dir and calls Changelog query
+ """
+ if brick.endswith("/"):
+ brick = brick[0:len(brick)-1]
+
+ # WORKING_DIR/BRICKHASH/OUTFILE
+ working_dir = os.path.dirname(args.outfile)
+ brickhash = hashlib.sha1(brick)
+ brickhash = str(brickhash.hexdigest())
+ working_dir = os.path.join(working_dir, brickhash)
+
+ mkdirp(working_dir, exit_on_err=True, logger=logger)
+ create_file(args.outfile, exit_on_err=True, logger=logger)
+ create_file(args.outfile + ".gfids", exit_on_err=True, logger=logger)
+
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "changelog.%s.log" % brickhash)
+
+ logger.info("%s Started Changelog Crawl. Start: %s, End: %s"
+ % (brick, args.start, end))
+ get_changes(brick, working_dir, log_file, end, args)
+
+
+def _get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description=PROG_DESCRIPTION)
+
+ parser.add_argument("session", help="Session Name")
+ parser.add_argument("volume", help="Volume Name")
+ parser.add_argument("brick", help="Brick Name")
+ parser.add_argument("outfile", help="Output File")
+ parser.add_argument("start", help="Start Time", type=int)
+ parser.add_argument("--debug", help="Debug", action="store_true")
+ parser.add_argument("--output-prefix", help="File prefix in output",
+ default=".")
+
+ return parser.parse_args()
+
+
+if __name__ == "__main__":
+ args = _get_args()
+ mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "changelog.log")
+ setup_logger(logger, log_file, args.debug)
+ end = int(time.time()) - int(conf.get_opt("changelog_rollover_time"))
+ changelog_crawl(args.brick, end, args)
+ sys.exit(0)
diff --git a/tools/glusterfind/src/conf.py b/tools/glusterfind/src/conf.py
new file mode 100644
index 0000000000..2c6eac2bb1
--- /dev/null
+++ b/tools/glusterfind/src/conf.py
@@ -0,0 +1,28 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import os
+import ConfigParser
+
+config = ConfigParser.ConfigParser()
+config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)),
+ "tool.conf"))
+
+
+def list_change_detectors():
+ return dict(config.items("change_detectors")).keys()
+
+
+def get_opt(opt):
+ return config.get("vars", opt)
+
+
+def get_change_detector(opt):
+ return config.get("change_detectors", opt)
diff --git a/tools/glusterfind/src/libgfchangelog.py b/tools/glusterfind/src/libgfchangelog.py
new file mode 100644
index 0000000000..e54a16a474
--- /dev/null
+++ b/tools/glusterfind/src/libgfchangelog.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import os
+from ctypes import CDLL, get_errno, create_string_buffer, c_ulong, byref
+from ctypes import RTLD_GLOBAL
+from ctypes.util import find_library
+
+
+class ChangelogException(OSError):
+ pass
+
+
+libgfc = CDLL(find_library("gfchangelog"), use_errno=True, mode=RTLD_GLOBAL)
+
+
+def raise_oserr():
+ errn = get_errno()
+ raise ChangelogException(errn, os.strerror(errn))
+
+
+def cl_register(brick, path, log_file, log_level, retries=0):
+ ret = libgfc.gf_changelog_register(brick, path, log_file,
+ log_level, retries)
+ if ret == -1:
+ raise_oserr()
+
+
+def cl_history_scan():
+ ret = libgfc.gf_history_changelog_scan()
+ if ret == -1:
+ raise_oserr()
+
+ return ret
+
+
+def cl_history_changelog(changelog_path, start, end, num_parallel):
+ actual_end = c_ulong()
+ ret = libgfc.gf_history_changelog(changelog_path, start, end,
+ num_parallel,
+ byref(actual_end))
+ if ret == -1:
+ raise_oserr()
+
+ return actual_end.value
+
+
+def cl_history_startfresh():
+ ret = libgfc.gf_history_changelog_start_fresh()
+ if ret == -1:
+ raise_oserr()
+
+
+def cl_history_getchanges():
+ """ remove hardcoding for path name length """
+ def clsort(f):
+ return f.split('.')[-1]
+
+ changes = []
+ buf = create_string_buffer('\0', 4096)
+
+ while True:
+ ret = libgfc.gf_history_changelog_next_change(buf, 4096)
+ if ret in (0, -1):
+ break
+ changes.append(buf.raw[:ret - 1])
+ if ret == -1:
+ raise_oserr()
+
+ return sorted(changes, key=clsort)
+
+
+def cl_history_done(clfile):
+ ret = libgfc.gf_history_changelog_done(clfile)
+ if ret == -1:
+ raise_oserr()
diff --git a/tools/glusterfind/src/main.py b/tools/glusterfind/src/main.py
new file mode 100644
index 0000000000..d6b9a24dec
--- /dev/null
+++ b/tools/glusterfind/src/main.py
@@ -0,0 +1,468 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sys
+from errno import ENOENT
+import time
+from multiprocessing import Process
+import os
+import xml.etree.cElementTree as etree
+from argparse import ArgumentParser, RawDescriptionHelpFormatter
+import logging
+import shutil
+
+from utils import execute, is_host_local, mkdirp, fail
+from utils import setup_logger, human_time
+import conf
+
+
+PROG_DESCRIPTION = """
+GlusterFS Incremental API
+"""
+ParseError = etree.ParseError if hasattr(etree, 'ParseError') else SyntaxError
+
+logger = logging.getLogger()
+
+
+def node_run(volume, host, path, start, outfile, args, fallback=False):
+ """
+ If host is local node, execute the command locally. If not local
+ execute the CHANGE_DETECTOR command via ssh and copy the output file from
+ remote node using scp.
+ """
+ localdir = is_host_local(host)
+
+ # If Full backup is requested or start time is zero, use brickfind
+ change_detector = conf.get_change_detector(args.change_detector)
+ if ((start == 0 or args.full) and args.change_detector == "changelog") or \
+ fallback:
+ change_detector = conf.get_change_detector("brickfind")
+
+ # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
+ # --gfidpath <TYPE>
+ cmd = [change_detector,
+ args.session,
+ volume,
+ path,
+ outfile,
+ str(start),
+ "--output-prefix",
+ args.output_prefix] + \
+ (["--debug"] if args.debug else []) + \
+ (["--full"] if args.full else [])
+
+ if not localdir:
+ # prefix with ssh command if not local node
+ cmd = ["ssh",
+ "-i", conf.get_opt("secret_pem"),
+ "root@%s" % host] + cmd
+
+ rc, out, err = execute(cmd, logger=logger)
+ if rc == 2:
+ # Partial History Fallback
+ logger.info("%s %s Fallback to brickfind" % (host, err.strip()))
+ # Exit only from process, handled in main.
+ sys.exit(rc)
+ elif rc != 0:
+ fail("%s - Change detection failed" % host, logger=logger)
+
+ if not localdir:
+ cmd_copy = ["scp",
+ "-i", conf.get_opt("secret_pem"),
+ "root@%s:/%s" % (host, outfile),
+ os.path.dirname(outfile)]
+ execute(cmd_copy, exit_msg="%s - Copy command failed" % host,
+ logger=logger)
+
+
+def node_cleanup(host, args):
+ localdir = is_host_local(host)
+
+ # CHANGE_DETECTOR <SESSION> <VOLUME> <BRICK> <OUTFILE> <START> --debug
+ # --gfidpath <TYPE>
+ cmd = [conf.get_opt("nodecleanup"),
+ args.session,
+ args.volume] + (["--debug"] if args.debug else [])
+
+ if not localdir:
+ # prefix with ssh command if not local node
+ cmd = ["ssh",
+ "-i", conf.get_opt("secret_pem"),
+ "root@%s" % host] + cmd
+
+ execute(cmd, exit_msg="%s - Cleanup failed" % host, logger=logger)
+
+
+def cleanup(nodes, args):
+ pool = []
+ for num, node in enumerate(nodes):
+ host, brick = node[1].split(":")
+ # temp output file
+ node_outfile = os.path.join(conf.get_opt("working_dir"),
+ args.session,
+ args.volume,
+ "tmp_output_%s.txt" % num)
+
+ try:
+ os.remove(node_outfile)
+ except (OSError, IOError):
+ # TODO: Cleanup Failure, Handle
+ pass
+
+ p = Process(target=node_cleanup,
+ args=(host, args))
+ p.start()
+ pool.append(p)
+
+ exit_codes = 0
+ for p in pool:
+ p.join()
+ exit_codes += (0 if p.exitcode == 0 else 1)
+
+ if exit_codes != 0:
+ sys.exit(1)
+
+
+def failback_node_run(brick_details, idx, volume, start, outfile, args):
+ host, brick = brick_details.split(":")
+ p = Process(target=node_run,
+ args=(volume, host, brick, start, outfile, args, True))
+ p.start()
+ p.join()
+ return p.exitcode
+
+
+def run_in_nodes(volume, start, args):
+ """
+ Get nodes of volume using gluster volume info, spawn a process
+ each for a Node. Merge the output files once all the process
+ complete their tasks.
+ """
+ nodes = get_nodes(volume)
+ pool = []
+ node_outfiles = []
+ for num, node in enumerate(nodes):
+ host, brick = node[1].split(":")
+ # temp output file
+ node_outfile = os.path.join(conf.get_opt("working_dir"),
+ args.session,
+ volume,
+ "tmp_output_%s.txt" % num)
+ node_outfiles.append(node_outfile)
+ p = Process(target=node_run, args=(volume, host, brick, start,
+ node_outfile, args))
+ p.start()
+ pool.append(p)
+
+ exit_codes = 0
+ for idx, p in enumerate(pool):
+ p.join()
+ # Handle the Changelog failure, fallback to Brickfind
+ if p.exitcode == 2:
+ rc = failback_node_run(nodes[idx][1], idx, volume, start,
+ node_outfiles[idx], args)
+ exit_codes += (0 if rc == 0 else 1)
+ elif p.exitcode != 0:
+ exit_codes += (0 if p.exitcode == 0 else 1)
+
+ if exit_codes != 0:
+ sys.exit(1)
+
+ # Merge all output files
+ cmd = ["sort", "-u"] + node_outfiles + ["-o", args.outfile]
+ execute(cmd,
+ exit_msg="Failed to merge output files "
+ "collected from nodes", logger=logger)
+
+ cleanup(nodes, args)
+
+
+def get_nodes(volume):
+ """
+ Get the gluster volume info xml output and parse to get
+ the brick details.
+ """
+ cmd = ["gluster", 'volume', 'info', volume, "--xml"]
+ _, data, _ = execute(cmd,
+ exit_msg="Failed to Run Gluster Volume Info",
+ logger=logger)
+ tree = etree.fromstring(data)
+
+ nodes = []
+ volume_el = tree.find('volInfo/volumes/volume')
+ try:
+ for b in volume_el.findall('bricks/brick'):
+ nodes.append((b.find('hostUuid').text,
+ b.find('name').text))
+ except (ParseError, AttributeError, ValueError) as e:
+ fail("Failed to parse Volume Info: %s" % e, logger=logger)
+
+ return nodes
+
+
+def _get_args():
+ parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
+ description=PROG_DESCRIPTION)
+ subparsers = parser.add_subparsers(dest="mode")
+
+ # create <SESSION> <VOLUME> [--debug] [--force]
+ parser_create = subparsers.add_parser('create')
+ parser_create.add_argument("session", help="Session Name")
+ parser_create.add_argument("volume", help="Volume Name")
+ parser_create.add_argument("--debug", help="Debug", action="store_true")
+ parser_create.add_argument("--force", help="Force option to recreate "
+ "the session", action="store_true")
+
+ # delete <SESSION> <VOLUME> [--debug]
+ parser_delete = subparsers.add_parser('delete')
+ parser_delete.add_argument("session", help="Session Name")
+ parser_delete.add_argument("volume", help="Volume Name")
+ parser_delete.add_argument("--debug", help="Debug", action="store_true")
+
+ # list [--session <SESSION>] [--volume <VOLUME>]
+ parser_list = subparsers.add_parser('list')
+ parser_list.add_argument("--session", help="Session Name", default="")
+ parser_list.add_argument("--volume", help="Volume Name", default="")
+ parser_list.add_argument("--debug", help="Debug", action="store_true")
+
+ # pre <SESSION> <VOLUME> <OUTFILE> [--change-detector <CHANGE_DETECTOR>]
+ # [--output-prefix <OUTPUT_PREFIX>] [--full]
+ parser_pre = subparsers.add_parser('pre')
+ parser_pre.add_argument("session", help="Session Name")
+ parser_pre.add_argument("volume", help="Volume Name")
+ parser_pre.add_argument("outfile", help="Output File")
+ parser_pre.add_argument("--debug", help="Debug", action="store_true")
+ parser_pre.add_argument("--full", help="Full find", action="store_true")
+ parser_pre.add_argument("--change-detector", dest="change_detector",
+ help="Change detection",
+ choices=conf.list_change_detectors(),
+ type=str, default='changelog')
+ parser_pre.add_argument("--output-prefix", help="File prefix in output",
+ default=".")
+
+ # post <SESSION> <VOLUME>
+ parser_post = subparsers.add_parser('post')
+ parser_post.add_argument("session", help="Session Name")
+ parser_post.add_argument("volume", help="Volume Name")
+ parser_post.add_argument("--debug", help="Debug", action="store_true")
+
+ return parser.parse_args()
+
+
+def ssh_setup():
+ if not os.path.exists(conf.get_opt("secret_pem")):
+ # Generate ssh-key
+ cmd = ["ssh-keygen",
+ "-N",
+ "",
+ "-f",
+ conf.get_opt("secret_pem")]
+ execute(cmd,
+ exit_msg="Unable to generate ssh key %s"
+ % conf.get_opt("secret_pem"),
+ logger=logger)
+
+ logger.info("Ssh key generated %s" % conf.get_opt("secret_pem"))
+
+ # Copy pub file to all nodes
+ cmd = ["gluster",
+ "system::",
+ "copy",
+ "file",
+ "/" + os.path.basename(conf.get_opt("secret_pem")) + ".pub"]
+ execute(cmd, exit_msg="Failed to distribute ssh keys", logger=logger)
+
+ logger.info("Distributed ssh key to all nodes of Volume")
+
+ # Add to authorized_keys file in each node
+ cmd = ["gluster",
+ "system::",
+ "execute",
+ "add_secret_pub",
+ "root",
+ os.path.basename(conf.get_opt("secret_pem")) + ".pub"]
+ execute(cmd,
+ exit_msg="Failed to add ssh keys to authorized_keys file",
+ logger=logger)
+
+ logger.info("Ssh key added to authorized_keys of Volume nodes")
+
+
+def mode_create(session_dir, args):
+ logger.debug("Init is called - Session: %s, Volume: %s"
+ % (args.session, args.volume))
+
+ mkdirp(session_dir, exit_on_err=True, logger=logger)
+ mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+ logger=logger)
+ status_file = os.path.join(session_dir, args.volume, "status")
+
+ if os.path.exists(status_file) and not args.force:
+ fail("Session %s already created" % args.session, logger=logger)
+
+ if not os.path.exists(status_file) or args.force:
+ ssh_setup()
+
+ execute(["gluster", "volume", "set",
+ args.volume, "build-pgfid", "on"],
+ exit_msg="Failed to set volume option build-pgfid on",
+ logger=logger)
+ logger.info("Volume option set %s, build-pgfid on" % args.volume)
+
+ execute(["gluster", "volume", "set",
+ args.volume, "changelog.changelog", "on"],
+ exit_msg="Failed to set volume option "
+ "changelog.changelog on", logger=logger)
+ logger.info("Volume option set %s, changelog.changelog on"
+ % args.volume)
+
+ if not os.path.exists(status_file):
+ with open(status_file, "w", buffering=0) as f:
+ # Add Rollover time to current time to make sure changelogs
+ # will be available if we use this time as start time
+ time_to_update = int(time.time()) + int(
+ conf.get_opt("changelog_rollover_time"))
+ f.write(str(time_to_update))
+
+ sys.exit(0)
+
+
+def mode_pre(session_dir, args):
+ """
+ Read from Session file and write to session.pre file
+ """
+ endtime_to_update = int(time.time()) - int(
+ conf.get_opt("changelog_rollover_time"))
+ status_file = os.path.join(session_dir, args.volume, "status")
+ status_file_pre = status_file + ".pre"
+
+ mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger)
+
+ start = 0
+ try:
+ with open(status_file) as f:
+ start = int(f.read().strip())
+ except ValueError:
+ pass
+ except (OSError, IOError) as e:
+ fail("Error Opening Session file %s: %s"
+ % (status_file, e), logger=logger)
+
+ logger.debug("Pre is called - Session: %s, Volume: %s, "
+ "Start time: %s, End time: %s"
+ % (args.session, args.volume, start, endtime_to_update))
+
+ run_in_nodes(args.volume, start, args)
+
+ with open(status_file_pre, "w", buffering=0) as f:
+ f.write(str(endtime_to_update))
+
+ sys.stdout.write("Generated output file %s\n" % args.outfile)
+
+
+def mode_post(session_dir, args):
+ """
+ If pre session file exists, overwrite session file
+ If pre session file does not exists, return ERROR
+ """
+ status_file = os.path.join(session_dir, args.volume, "status")
+ logger.debug("Post is called - Session: %s, Volume: %s"
+ % (args.session, args.volume))
+ status_file_pre = status_file + ".pre"
+
+ if os.path.exists(status_file_pre):
+ os.rename(status_file_pre, status_file)
+ sys.exit(0)
+ else:
+ fail("Pre script is not run", logger=logger)
+
+
+def mode_delete(session_dir, args):
+ def handle_rm_error(func, path, exc_info):
+ if exc_info[1].errno == ENOENT:
+ return
+
+ raise exc_info[1]
+
+ shutil.rmtree(os.path.join(session_dir, args.volume),
+ onerror=handle_rm_error)
+
+
+def mode_list(session_dir, args):
+ """
+ List available sessions to stdout, if session name is set
+ only list that session.
+ """
+ if args.session:
+ if not os.path.exists(os.path.join(session_dir, args.session)):
+ fail("Invalid Session", logger=logger)
+ sessions = [args.session]
+ else:
+ sessions = []
+ for d in os.listdir(session_dir):
+ sessions.append(d)
+
+ output = []
+ for session in sessions:
+ # Session Volume Last Processed
+ volnames = os.listdir(os.path.join(session_dir, session))
+
+ for volname in volnames:
+ if args.volume and args.volume != volname:
+ continue
+
+ status_file = os.path.join(session_dir, session, volname, "status")
+ last_processed = None
+ try:
+ with open(status_file) as f:
+ last_processed = f.read().strip()
+ except (OSError, IOError) as e:
+ if e.errno == ENOENT:
+ pass
+ else:
+ raise
+ output.append((session, volname, last_processed))
+
+ if output:
+ sys.stdout.write("%s %s %s\n" % ("SESSION".ljust(25),
+ "VOLUME".ljust(25),
+ "SESSION TIME".ljust(25)))
+ sys.stdout.write("-"*75)
+ sys.stdout.write("\n")
+ for session, volname, last_processed in output:
+ sys.stdout.write("%s %s %s\n" % (session.ljust(25),
+ volname.ljust(25),
+ human_time(last_processed).ljust(25)))
+
+
+def main():
+ args = _get_args()
+ mkdirp(conf.get_opt("session_dir"), exit_on_err=True)
+
+ if args.mode == "list":
+ session_dir = conf.get_opt("session_dir")
+ else:
+ session_dir = os.path.join(conf.get_opt("session_dir"),
+ args.session)
+
+ if not os.path.exists(session_dir) and args.mode not in ["create", "list"]:
+ fail("Invalid session %s" % args.session)
+
+ mkdirp(os.path.join(conf.get_opt("log_dir"), args.session, args.volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ args.session,
+ args.volume,
+ "cli.log")
+ setup_logger(logger, log_file, args.debug)
+
+ # globals() will have all the functions already defined.
+ # mode_<args.mode> will be the function name to be called
+ globals()["mode_" + args.mode](session_dir, args)
diff --git a/tools/glusterfind/src/nodecleanup.py b/tools/glusterfind/src/nodecleanup.py
new file mode 100644
index 0000000000..a31d4d83ac
--- /dev/null
+++ b/tools/glusterfind/src/nodecleanup.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import shutil
+import sys
+import os
+import logging
+from errno import ENOENT
+
+from utils import setup_logger, mkdirp
+import conf
+
+logger = logging.getLogger()
+
+
+if __name__ == "__main__":
+ # Args: <SESSION> <VOLUME>
+ session = sys.argv[1]
+ volume = sys.argv[2]
+
+ working_dir = os.path.join(conf.get_opt("working_dir"),
+ session,
+ volume)
+
+ mkdirp(os.path.join(conf.get_opt("log_dir"), session, volume),
+ exit_on_err=True)
+ log_file = os.path.join(conf.get_opt("log_dir"),
+ session,
+ volume,
+ "changelog.log")
+
+ setup_logger(logger, log_file)
+
+ try:
+ def handle_rm_error(func, path, exc_info):
+ if exc_info[1].errno == ENOENT:
+ return
+
+ raise exc_info[1]
+
+ shutil.rmtree(working_dir, onerror=handle_rm_error)
+ except (OSError, IOError) as e:
+ logger.error("Failed to delete working directory: %s" % e)
+ sys.exit(1)
diff --git a/tools/glusterfind/src/tool.conf.in b/tools/glusterfind/src/tool.conf.in
new file mode 100644
index 0000000000..bae46499aa
--- /dev/null
+++ b/tools/glusterfind/src/tool.conf.in
@@ -0,0 +1,11 @@
+[vars]
+session_dir=@GLUSTERD_WORKDIR@/glusterfind/
+secret_pem=@GLUSTERD_WORKDIR@/glusterfind.secret.pem
+working_dir=@GLUSTERFSD_MISCDIR@/glusterfind/
+log_dir=/var/log/glusterfs/glusterfind/
+nodecleanup=@GLUSTERFS_LIBEXECDIR@/glusterfind/nodecleanup.py
+changelog_rollover_time=15
+
+[change_detectors]
+changelog=@GLUSTERFS_LIBEXECDIR@/glusterfind/changelog.py
+brickfind=@GLUSTERFS_LIBEXECDIR@/glusterfind/brickfind.py \ No newline at end of file
diff --git a/tools/glusterfind/src/utils.py b/tools/glusterfind/src/utils.py
new file mode 100644
index 0000000000..c503a2b9f5
--- /dev/null
+++ b/tools/glusterfind/src/utils.py
@@ -0,0 +1,203 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com/>
+# This file is part of GlusterFS.
+#
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+
+import sys
+import socket
+from subprocess import PIPE, Popen
+from errno import EPERM, EEXIST
+import logging
+import os
+from datetime import datetime
+
+ROOT_GFID = "00000000-0000-0000-0000-000000000001"
+
+
+def find(path, callback_func=lambda x: True, filter_func=lambda x: True,
+ ignore_dirs=[], subdirs_crawl=True):
+ if os.path.basename(path) in ignore_dirs:
+ return
+
+ if filter_func(path):
+ callback_func(path)
+
+ for p in os.listdir(path):
+ full_path = os.path.join(path, p)
+
+ if os.path.isdir(full_path):
+ if subdirs_crawl:
+ find(full_path, callback_func, filter_func, ignore_dirs)
+ else:
+ if filter_func(full_path):
+ callback_func(full_path)
+ else:
+ if filter_func(full_path):
+ callback_func(full_path)
+
+
+def output_write(f, path, prefix="."):
+ if path == "":
+ return
+
+ if prefix != ".":
+ path = os.path.join(prefix, path)
+ f.write("%s\n" % path)
+
+
+def human_time(ts):
+ return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S")
+
+
+def setup_logger(logger, path, debug=False):
+ if debug:
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.setLevel(logging.INFO)
+
+ # create the logging file handler
+ fh = logging.FileHandler(path)
+
+ formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
+ "[%(module)s - %(lineno)s:%(funcName)s] "
+ "- %(message)s")
+
+ fh.setFormatter(formatter)
+
+ # add handler to logger object
+ logger.addHandler(fh)
+
+
+def create_file(path, exit_on_err=False, logger=None):
+ """
+ If file exists overwrite. Print error to stderr and exit
+ if exit_on_err is set, else raise the exception. Consumer
+ should handle the exception.
+ """
+ try:
+ open(path, 'w').close()
+ except (OSError, IOError) as e:
+ if exit_on_err:
+ fail("Failed to create file %s: %s" % (path, e), logger=logger)
+ else:
+ raise
+
+
+def mkdirp(path, exit_on_err=False, logger=None):
+ """
+ Try creating required directory structure
+ ignore EEXIST and raise exception for rest of the errors.
+ Print error in stderr and exit if exit_on_err is set, else
+ raise exception.
+ """
+ try:
+ os.makedirs(path)
+ except (OSError, IOError) as e:
+ if e.errno == EEXIST and os.path.isdir(path):
+ pass
+ else:
+ if exit_on_err:
+ fail("Fail to create dir %s: %s" % (path, e), logger=logger)
+ else:
+ raise
+
+
+def fail(msg, code=1, logger=None):
+ """
+ Write error to stderr and exit
+ """
+ if logger:
+ logger.error(msg)
+ sys.stderr.write("%s\n" % msg)
+ sys.exit(code)
+
+
+def execute(cmd, exit_msg=None, logger=None):
+ """
+ If failure_msg is not None then return returncode, out and error.
+ If failure msg is set, write to stderr and exit.
+ """
+ p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
+
+ (out, err) = p.communicate()
+ if p.returncode != 0 and exit_msg is not None:
+ fail("%s: %s" % (exit_msg, err), p.returncode, logger=logger)
+
+ return (p.returncode, out, err)
+
+
+def symlink_gfid_to_path(brick, gfid):
+ """
+ Each directories are symlinked to file named GFID
+ in .glusterfs directory of brick backend. Using readlink
+ we get PARGFID/basename of dir. readlink recursively till
+ we get PARGFID as ROOT_GFID.
+ """
+ if gfid == ROOT_GFID:
+ return ""
+
+ out_path = ""
+ while True:
+ path = os.path.join(brick, ".glusterfs", gfid[0:2], gfid[2:4], gfid)
+ path_readlink = os.readlink(path)
+ pgfid = os.path.dirname(path_readlink)
+ out_path = os.path.join(os.path.basename(path_readlink), out_path)
+ if pgfid == "../../00/00/%s" % ROOT_GFID:
+ break
+ gfid = os.path.basename(pgfid)
+ return out_path
+
+
+def is_host_local(host):
+ """
+ Find if a host is local or not.
+ Code copied from $GLUSTERFS/geo-replication/syncdaemon/syncdutils.py
+ """
+ locaddr = False
+ for ai in socket.getaddrinfo(host, None):
+ # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators
+ # /mgmt/glusterd/src/glusterd-utils.c#L125
+ if ai[0] == socket.AF_INET:
+ if ai[-1][0].split(".")[0] == "127":
+ locaddr = True
+ break
+ elif ai[0] == socket.AF_INET6:
+ if ai[-1][0] == "::1":
+ locaddr = True
+ break
+ else:
+ continue
+ try:
+ # use ICMP socket to avoid net.ipv4.ip_nonlocal_bind issue,
+ # cf. https://bugzilla.redhat.com/show_bug.cgi?id=890587
+ s = socket.socket(ai[0], socket.SOCK_RAW, socket.IPPROTO_ICMP)
+ except socket.error:
+ ex = sys.exc_info()[1]
+ if ex.errno != EPERM:
+ raise
+ f = None
+ try:
+ f = open("/proc/sys/net/ipv4/ip_nonlocal_bind")
+ if int(f.read()) != 0:
+ logger.warning("non-local bind is set and not "
+ "allowed to create "
+ "raw sockets, cannot determine "
+ "if %s is local" % host)
+ return False
+ s = socket.socket(ai[0], socket.SOCK_DGRAM)
+ finally:
+ if f:
+ f.close()
+ try:
+ s.bind(ai[-1])
+ locaddr = True
+ break
+ except:
+ pass
+ s.close()
+ return locaddr