summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/changelogagent.py
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2014-04-29 12:14:24 +0530
committerVenky Shankar <vshankar@redhat.com>2014-05-09 00:27:40 -0700
commitc7b0396f680863528248e6f5a162de47184b6c88 (patch)
treeead2f295d041df1e258db4bf09fe10944b15f4d7 /geo-replication/syncdaemon/changelogagent.py
parent65757e0f57f93103d87fdf9534c5ca25b66d14b7 (diff)
downloadglusterfs-c7b0396f680863528248e6f5a162de47184b6c88.tar.gz
glusterfs-c7b0396f680863528248e6f5a162de47184b6c88.tar.xz
glusterfs-c7b0396f680863528248e6f5a162de47184b6c88.zip
geo-rep: Pause and Resume feature for geo-replication
Changelog consumption/processing now happens in seperate process group than monitor. When monitor process group gets SIGSTOP all worker process, ssh, rsync will be paused except the changelog processing. When it gets SIGCONT it resumes its operation. Changelog agent runs as RepceServer, geo-rep worker communicates with changelog agent using RepceClient. Change-Id: I35c333e4d8b13d03a7808aed601960eef23cfa04 BUG: 1093602 Signed-off-by: Venky Shankar <vshankar@redhat.com> Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/7322
Diffstat (limited to 'geo-replication/syncdaemon/changelogagent.py')
-rw-r--r--geo-replication/syncdaemon/changelogagent.py78
1 files changed, 78 insertions, 0 deletions
diff --git a/geo-replication/syncdaemon/changelogagent.py b/geo-replication/syncdaemon/changelogagent.py
new file mode 100644
index 0000000000..54d82cefcd
--- /dev/null
+++ b/geo-replication/syncdaemon/changelogagent.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
+import os
+import logging
+import syncdutils
+from syncdutils import select, CHANGELOG_AGENT_SERVER_VERSION
+from repce import RepceServer
+
+
+class _MetaChangelog(object):
+
+ def __getattr__(self, meth):
+ from libgfchangelog import Changes as LChanges
+ xmeth = [m for m in dir(LChanges) if m[0] != '_']
+ if meth not in xmeth:
+ return
+ for m in xmeth:
+ setattr(self, m, getattr(LChanges, m))
+ return getattr(self, meth)
+
+Changes = _MetaChangelog()
+
+
+class Changelog(object):
+ def version(self):
+ return CHANGELOG_AGENT_SERVER_VERSION
+
+ def register(self, cl_brick, cl_dir, cl_log, cl_level, retries=0):
+ return Changes.cl_register(cl_brick, cl_dir, cl_log, cl_level, retries)
+
+ def scan(self):
+ return Changes.cl_scan()
+
+ def getchanges(self):
+ return Changes.cl_getchanges()
+
+ def done(self, clfile):
+ return Changes.cl_done(clfile)
+
+ def history(self, changelog_path, start, end, num_parallel):
+ return Changes.cl_history_changelog(changelog_path, start, end,
+ num_parallel)
+
+ def history_scan(self):
+ return Changes.cl_history_scan()
+
+ def history_getchanges(self):
+ return Changes.cl_history_getchanges()
+
+ def history_done(self, clfile):
+ return Changes.cl_history_done(clfile)
+
+
+class ChangelogAgent(object):
+ def __init__(self, obj, fd_tup):
+ (inf, ouf, rw, ww) = fd_tup.split(',')
+ os.close(int(rw))
+ os.close(int(ww))
+ repce = RepceServer(obj, int(inf), int(ouf), 1)
+ t = syncdutils.Thread(target=lambda: (repce.service_loop(),
+ syncdutils.finalize()))
+ t.start()
+ logging.info('Agent listining...')
+
+ select((), (), ())
+
+
+def agent(obj, fd_tup):
+ return ChangelogAgent(obj, fd_tup)