summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon
diff options
context:
space:
mode:
authorAravinda VK <avishwan@redhat.com>2016-08-31 11:53:06 +0530
committerAravinda VK <avishwan@redhat.com>2016-09-07 23:11:18 -0700
commit5de500cd0116796ff797099c60d33258bd48ce3c (patch)
tree789417fc6c528986117a40c19543543bc0a9704e /geo-replication/syncdaemon
parent0fd7d0e1c78fdbedfcdb085445c4b0be3c1a97a9 (diff)
downloadglusterfs-5de500cd0116796ff797099c60d33258bd48ce3c.tar.gz
glusterfs-5de500cd0116796ff797099c60d33258bd48ce3c.tar.xz
glusterfs-5de500cd0116796ff797099c60d33258bd48ce3c.zip
geo-rep: Fix History post process
This patch removes changelogsdb part of post processing since not got much performance advantage as expected. Entry stime and other logging improvements retained. BUG: 1364420 Change-Id: Ib99d23f09d96c14bc28225b47d9134260f5551bf Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/15371 NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> CentOS-regression: Gluster Build System <jenkins@build.gluster.org> Reviewed-by: Kotresh HR <khiremat@redhat.com> Smoke: Gluster Build System <jenkins@build.gluster.org>
Diffstat (limited to 'geo-replication/syncdaemon')
-rw-r--r--geo-replication/syncdaemon/Makefile.am2
-rw-r--r--geo-replication/syncdaemon/changelogsdb.py111
-rw-r--r--geo-replication/syncdaemon/gsyncd.py21
-rw-r--r--geo-replication/syncdaemon/master.py307
-rw-r--r--geo-replication/syncdaemon/resource.py2
5 files changed, 123 insertions, 320 deletions
diff --git a/geo-replication/syncdaemon/Makefile.am b/geo-replication/syncdaemon/Makefile.am
index 88c9e64e52..7cdaf45dde 100644
--- a/geo-replication/syncdaemon/Makefile.am
+++ b/geo-replication/syncdaemon/Makefile.am
@@ -3,6 +3,6 @@ syncdaemondir = $(libexecdir)/glusterfs/python/syncdaemon
syncdaemon_PYTHON = gconf.py gsyncd.py __init__.py master.py README.md repce.py \
resource.py configinterface.py syncdutils.py monitor.py libcxattr.py \
$(top_builddir)/contrib/ipaddr-py/ipaddr.py libgfchangelog.py changelogagent.py \
- gsyncdstatus.py changelogsdb.py conf.py
+ gsyncdstatus.py conf.py
CLEANFILES =
diff --git a/geo-replication/syncdaemon/changelogsdb.py b/geo-replication/syncdaemon/changelogsdb.py
deleted file mode 100644
index 7e64158e7a..0000000000
--- a/geo-replication/syncdaemon/changelogsdb.py
+++ /dev/null
@@ -1,111 +0,0 @@
-#
-# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
-# This file is part of GlusterFS.
-
-# This file is licensed to you under your choice of the GNU Lesser
-# General Public License, version 3 or any later version (LGPLv3 or
-# later), or the GNU General Public License, version 2 (GPLv2), in all
-# cases as published by the Free Software Foundation.
-#
-
-import os
-import sqlite3
-from errno import ENOENT
-
-conn = None
-cursor = None
-
-
-def db_commit():
- conn.commit()
-
-
-def db_init(db_path):
- global conn, cursor
- # Remove Temp Db
- try:
- os.unlink(db_path)
- os.unlink(db_path + "-journal")
- except OSError as e:
- if e.errno != ENOENT:
- raise
-
- conn = sqlite3.connect(db_path)
- cursor = conn.cursor()
- cursor.execute("DROP TABLE IF EXISTS data")
- cursor.execute("DROP TABLE IF EXISTS meta")
- query = """CREATE TABLE IF NOT EXISTS data(
- gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE,
- changelog_time VARCHAR(100)
- )"""
- cursor.execute(query)
-
- query = """CREATE TABLE IF NOT EXISTS meta(
- gfid VARCHAR(100) PRIMARY KEY ON CONFLICT IGNORE,
- changelog_time VARCHAR(100)
- )"""
- cursor.execute(query)
-
-
-def db_record_data(gfid, changelog_time):
- query = "INSERT INTO data(gfid, changelog_time) VALUES(?, ?)"
- cursor.execute(query, (gfid, changelog_time))
-
-
-def db_record_meta(gfid, changelog_time):
- query = "INSERT INTO meta(gfid, changelog_time) VALUES(?, ?)"
- cursor.execute(query, (gfid, changelog_time))
-
-
-def db_remove_meta(gfid):
- query = "DELETE FROM meta WHERE gfid = ?"
- cursor.execute(query, (gfid, ))
-
-
-def db_remove_data(gfid):
- query = "DELETE FROM data WHERE gfid = ?"
- cursor.execute(query, (gfid, ))
-
-
-def db_get_data(start, end, limit, offset):
- query = """SELECT gfid FROM data WHERE changelog_time
- BETWEEN ? AND ? LIMIT ? OFFSET ?"""
- cursor.execute(query, (start, end, limit, offset))
- out = []
- for row in cursor:
- out.append(row[0])
-
- return out
-
-
-def db_get_meta(start, end, limit, offset):
- query = """SELECT gfid FROM meta WHERE changelog_time
- BETWEEN ? AND ? LIMIT ? OFFSET ?"""
- cursor.execute(query, (start, end, limit, offset))
- out = []
- for row in cursor:
- out.append(row[0])
-
- return out
-
-
-def db_delete_meta_if_exists_in_data():
- query = """
- DELETE FROM meta WHERE gfid in
- (SELECT M.gfid
- FROM meta M INNER JOIN data D
- ON M.gfid = D.gfid)
- """
- cursor.execute(query)
-
-
-def db_get_data_count():
- query = "SELECT COUNT(gfid) FROM data"
- cursor.execute(query)
- return cursor.fetchone()[0]
-
-
-def db_get_meta_count():
- query = "SELECT COUNT(gfid) FROM meta"
- cursor.execute(query)
- return cursor.fetchone()[0]
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
index 918bee0ce1..84b26a4114 100644
--- a/geo-replication/syncdaemon/gsyncd.py
+++ b/geo-replication/syncdaemon/gsyncd.py
@@ -275,27 +275,6 @@ def main_i():
op.add_option('--log-rsync-performance', default=False,
action='store_true')
op.add_option('--max-rsync-retries', type=int, default=10)
-
- # This is for stime granularity, Bigger batch will be split into
- # multiple data batches, On failure it will start from this point
- # Default value is 1 day changelogs
- # (4 * 60 * 24 = 5760)
- # 4 changelogs per minute
- # 60 min per hr
- # 24 hrs per day
- op.add_option('--max-data-changelogs-in-batch', type=int, default=5760)
-
- # While processing Historical Changelogs above BATCH SIZE is not considered
- # since all Changelogs to be post processed once, Batching it makes more
- # rsync retries. (4 * 60 * 24 * 15 = 86400)
- # 4 changelogs per minute
- # 60 min per hr
- # 24 hrs per day
- # 15 days
- # This means 15 days changelogs can be processed at once in case of
- # History scan
- op.add_option('--max-history-changelogs-in-batch', type=int, default=86400)
-
op.add_option('--pause-on-start', default=False, action='store_true')
op.add_option('-L', '--log-level', metavar='LVL')
op.add_option('-r', '--remote-gsyncd', metavar='CMD',
diff --git a/geo-replication/syncdaemon/master.py b/geo-replication/syncdaemon/master.py
index 3df08e41a1..7d015aee71 100644
--- a/geo-replication/syncdaemon/master.py
+++ b/geo-replication/syncdaemon/master.py
@@ -26,12 +26,6 @@ from syncdutils import Thread, GsyncdError, boolify, escape
from syncdutils import unescape, gauxpfx, md5hex, selfkill
from syncdutils import lstat, errno_wrap, FreeObject
from syncdutils import NoStimeAvailable, PartialHistoryAvailable
-from changelogsdb import db_init, db_record_data, db_record_meta
-from changelogsdb import db_remove_data, db_remove_meta
-from changelogsdb import db_get_data, db_get_meta, db_commit
-from changelogsdb import db_get_data_count, db_get_meta_count
-from changelogsdb import db_delete_meta_if_exists_in_data
-
URXTIME = (-1, 0)
@@ -51,10 +45,6 @@ CHANGELOG_ROLLOVER_TIME = 15
# that batch since stime will get updated after each batch.
MAX_CHANGELOG_BATCH_SIZE = 727040
-# Number of record to query once
-DB_PAGINATION_SIZE_META = 100
-DB_PAGINATION_SIZE_DATA = 1000
-
# Utility functions to help us to get to closer proximity
# of the DRY principle (no, don't look for elevated or
# perspectivistic things here)
@@ -287,7 +277,7 @@ class TarSSHEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- db_remove_data(se)
+ self.unlinked_gfids.add(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -322,7 +312,7 @@ class RsyncEngine(object):
st = lstat(se)
if isinstance(st, int):
# file got unlinked in the interim
- db_remove_data(se)
+ self.unlinked_gfids.add(se)
return True
self.add_job(self.FLAT_DIR_HIERARCHY, 'reg', regjob, f, None, pb)
@@ -427,6 +417,7 @@ class GMasterCommon(object):
self.volinfo = None
self.terminate = False
self.sleep_interval = 1
+ self.unlinked_gfids = set()
def init_keep_alive(cls):
"""start the keep-alive thread """
@@ -666,11 +657,6 @@ class GMasterCommon(object):
ret = j[-1]()
if not ret:
succeed = False
-
- # All the unlinked GFIDs removed from Data and Meta list
- # Commit the Transaction
- db_commit()
-
if succeed and not args[0] is None:
self.sendmark(path, *args)
return succeed
@@ -783,12 +769,10 @@ class GMasterChangelogMixin(GMasterCommon):
pfx = gauxpfx()
clist = []
entries = []
- change_ts = change.split(".")[-1]
+ meta_gfid = set()
+ datas = set()
- # self.data_batch_start is None only in beginning and during
- # new batch start
- if self.data_batch_start is None:
- self.data_batch_start = change_ts
+ change_ts = change.split(".")[-1]
# Ignore entry ops which are already processed in Changelog modes
ignore_entry_ops = False
@@ -860,10 +844,9 @@ class GMasterChangelogMixin(GMasterCommon):
# not fail
pt = os.path.join(pfx, ec[0])
st = lstat(pt)
- if isinstance(st, int):
+ if pt in datas and isinstance(st, int):
# file got unlinked, May be historical Changelog
- db_remove_data(pt)
- db_remove_meta(pt)
+ datas.remove(pt)
if not boolify(gconf.ignore_deletes):
if not ignore_entry_ops:
@@ -886,10 +869,10 @@ class GMasterChangelogMixin(GMasterCommon):
# CREATED if source not exists.
entries.append(edct('LINK', stat=st, entry=en,
gfid=gfid))
+
# Here, we have the assumption that only
# tier-gfid.linkto causes this mknod. Add data
- db_record_data(os.path.join(pfx, ec[0]),
- change_ts)
+ datas.add(os.path.join(pfx, ec[0]))
continue
# stat info. present in the changelog itself
@@ -931,25 +914,48 @@ class GMasterChangelogMixin(GMasterCommon):
else:
logging.warn('ignoring %s [op %s]' % (gfid, ty))
elif et == self.TYPE_GFID:
- db_record_data(os.path.join(pfx, ec[0]), change_ts)
+ # If self.unlinked_gfids is available, then that means it is
+ # retrying the changelog second time. Do not add the GFID's
+ # to rsync job if failed previously but unlinked in master
+ if self.unlinked_gfids and \
+ os.path.join(pfx, ec[0]) in self.unlinked_gfids:
+ logging.debug("ignoring data, since file purged interim")
+ else:
+ datas.add(os.path.join(pfx, ec[0]))
elif et == self.TYPE_META:
if ec[1] == 'SETATTR': # only setattr's for now...
- db_record_meta(os.path.join(pfx, ec[0]), change_ts)
+ if len(ec) == 5:
+ # In xsync crawl, we already have stat data
+ # avoid doing stat again
+ meta_gfid.add((os.path.join(pfx, ec[0]),
+ XCrawlMetadata(st_uid=ec[2],
+ st_gid=ec[3],
+ st_mode=ec[4],
+ st_atime=ec[5],
+ st_mtime=ec[6])))
+ else:
+ meta_gfid.add((os.path.join(pfx, ec[0]), ))
elif ec[1] == 'SETXATTR' or ec[1] == 'XATTROP' or \
ec[1] == 'FXATTROP':
# To sync xattr/acls use rsync/tar, --xattrs and --acls
# switch to rsync and tar
if not boolify(gconf.use_tarssh) and \
(boolify(gconf.sync_xattrs) or boolify(gconf.sync_acls)):
- db_record_data(os.path.join(pfx, ec[0]), change_ts)
+ datas.add(os.path.join(pfx, ec[0]))
else:
logging.warn('got invalid changelog type: %s' % (et))
logging.debug('entries: %s' % repr(entries))
+ # Increment counters for Status
+ self.status.inc_value("entry", len(entries))
+ self.files_in_batch += len(datas)
+ self.status.inc_value("data", len(datas))
+
# sync namespace
if entries and not ignore_entry_ops:
# Increment counters for Status
self.status.inc_value("entry", len(entries))
+
failures = self.slave.server.entry_ops(entries)
self.log_failures(failures, 'gfid', gauxpfx(), 'ENTRY')
self.status.dec_value("entry", len(entries))
@@ -969,53 +975,70 @@ class GMasterChangelogMixin(GMasterCommon):
self.skipped_entry_changelogs_last = change_ts
- # Batch data based on number of changelogs as configured as
- # gconf.max_data_changelogs_in_batch(Default is 24 hrs)
- # stime will be set after completion of these batch, so on failure
- # Geo-rep will progress day by day
- if (self.num_changelogs > gconf.max_data_changelogs_in_batch):
- # (Start Changelog TS, End Changelog TS, [Changes])
- self.data_batches.append([self.data_batch_start, change_ts,
- [change]])
- self.data_batch_start = None
- self.num_changelogs = 0
- else:
- self.data_batches[-1][1] = change_ts
- self.data_batches[-1][2].append(change)
-
- def datas_to_queue(self, start, end):
- # Paginate db entries and add it to Rsync PostBox
- offset = 0
- total_datas = 0
- while True:
- # Db Pagination
- datas = db_get_data(start=start, end=end,
- limit=DB_PAGINATION_SIZE_DATA,
- offset=offset)
- if len(datas) == 0:
- break
- offset += DB_PAGINATION_SIZE_DATA
- total_datas += len(datas)
+ # sync metadata
+ if meta_gfid:
+ meta_entries = []
+ for go in meta_gfid:
+ if len(go) > 1:
+ st = go[1]
+ else:
+ st = lstat(go[0])
+ if isinstance(st, int):
+ logging.debug('file %s got purged in the interim' % go[0])
+ continue
+ meta_entries.append(edct('META', go=go[0], stat=st))
+ if meta_entries:
+ self.status.inc_value("meta", len(entries))
+ failures = self.slave.server.meta_ops(meta_entries)
+ self.log_failures(failures, 'go', '', 'META')
+ self.status.dec_value("meta", len(entries))
+
+ # sync data
+ if datas:
self.a_syncdata(datas)
- return total_datas
+ self.datas_in_batch.update(datas)
- def handle_data_sync(self, start, end, changes, done, total_datas):
- """
- Wait till all rsync jobs are complete, also handle the retries
- Update data stime Once Rsync jobs are complete.
- """
- retry = False
+ def process(self, changes, done=1):
tries = 0
+ retry = False
+ self.unlinked_gfids = set()
+ self.files_in_batch = 0
+ self.datas_in_batch = set()
# Error log disabled till the last round
self.syncer.disable_errorlog()
+ self.skipped_entry_changelogs_first = None
+ self.skipped_entry_changelogs_last = None
+ self.num_skipped_entry_changelogs = 0
+ self.batch_start_time = time.time()
while True:
- if retry:
- self.datas_to_queue(start, end)
+ # first, fire all changelog transfers in parallel. entry and
+ # metadata are performed synchronously, therefore in serial.
+ # However at the end of each changelog, data is synchronized
+ # with syncdata_async() - which means it is serial w.r.t
+ # entries/metadata of that changelog but happens in parallel
+ # with data of other changelogs.
- if retry and tries == (gconf.max_rsync_retries - 1):
- # Enable Error logging if it is last retry
- self.syncer.enable_errorlog()
+ if retry:
+ if tries == (int(gconf.max_rsync_retries) - 1):
+ # Enable Error logging if it is last retry
+ self.syncer.enable_errorlog()
+
+ # Remove Unlinked GFIDs from Queue
+ for unlinked_gfid in self.unlinked_gfids:
+ if unlinked_gfid in self.datas_in_batch:
+ self.datas_in_batch.remove(unlinked_gfid)
+
+ # Retry only Sync. Do not retry entry ops
+ if self.datas_in_batch:
+ self.a_syncdata(self.datas_in_batch)
+ else:
+ for change in changes:
+ logging.debug('processing change %s' % change)
+ self.process_change(change, done, retry)
+ if not retry:
+ # number of changelogs processed in the batch
+ self.turns += 1
# Now we wait for all the data transfers fired off in the above
# step to complete. Note that this is not ideal either. Ideally
@@ -1038,34 +1061,38 @@ class GMasterChangelogMixin(GMasterCommon):
# @change is the last changelog (therefore max time for this batch)
if self.syncdata_wait():
+ self.unlinked_gfids = set()
if done:
- xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
+ xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
# Reset Data counter after sync
- self.status.dec_value("data", total_datas)
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
+ self.datas_in_batch = set()
break
# We do not know which changelog transfer failed, retry everything.
retry = True
tries += 1
- if tries >= gconf.max_rsync_retries:
+ if tries == int(gconf.max_rsync_retries):
logging.error('changelogs %s could not be processed '
'completely - moving on...' %
' '.join(map(os.path.basename, changes)))
# Reset data counter on failure
- self.status.dec_value("data", total_datas)
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
+ self.datas_in_batch = set()
if done:
- xtl = (int(changes[-1].split('.')[-1]) - 1, 0)
+ xtl = (int(change.split('.')[-1]) - 1, 0)
self.upd_stime(xtl)
map(self.changelog_done_func, changes)
self.archive_and_purge_changelogs(changes)
break
-
# it's either entry_ops() or Rsync that failed to do it's
# job. Mostly it's entry_ops() [which currently has a problem
# of failing to create an entry but failing to return an errno]
@@ -1076,8 +1103,21 @@ class GMasterChangelogMixin(GMasterCommon):
logging.warn('incomplete sync, retrying changelogs: %s' %
' '.join(map(os.path.basename, changes)))
+ # Reset the Data counter before Retry
+ self.status.dec_value("data", self.files_in_batch)
+ self.files_in_batch = 0
time.sleep(0.5)
+ # Log the Skipped Entry ops range if any
+ if self.skipped_entry_changelogs_first is not None and \
+ self.skipped_entry_changelogs_last is not None:
+ logging.info("Skipping already processed entry "
+ "ops from CHANGELOG.{0} to CHANGELOG.{1} "
+ "Num: {2}".format(
+ self.skipped_entry_changelogs_first,
+ self.skipped_entry_changelogs_last,
+ self.num_skipped_entry_changelogs))
+
# Log Current batch details
if changes:
logging.info(
@@ -1091,94 +1131,6 @@ class GMasterChangelogMixin(GMasterCommon):
repr(self.get_data_stime()),
repr(self.get_entry_stime())))
- def process(self, changes, done=1):
- retry = False
- first_changelog_ts = changes[0].split(".")[-1]
-
- db_init(os.path.join(self.tempdir, "temp_changelogs.db"))
-
- self.skipped_entry_changelogs_first = None
- self.skipped_entry_changelogs_last = None
- self.num_skipped_entry_changelogs = 0
- self.batch_start_time = time.time()
- # (Start Changelog TS, End Changelog TS, [Changes])
- self.data_batches = [[first_changelog_ts, first_changelog_ts, []]]
- self.data_batch_start = None
- self.num_changelogs = 0
-
- for change in changes:
- logging.debug('processing change %s' % change)
- self.process_change(change, done, retry)
- # number of changelogs processed in the batch
- self.turns += 1
-
- # Rsync/Tar will preserve permissions, so if a GFID exists
- # in data queue then it syncs meta details too. Remove
- # all meta from meta table if exists in data table
- db_delete_meta_if_exists_in_data()
-
- # All the Data/Meta populated, Commit the Changes in Db
- db_commit()
-
- # Log the Skipped Entry ops range if any
- if self.skipped_entry_changelogs_first is not None and \
- self.skipped_entry_changelogs_last is not None:
- logging.info("Skipping already processed entry "
- "ops from CHANGELOG.{0} to CHANGELOG.{1} "
- "Num: {2}".format(
- self.skipped_entry_changelogs_first,
- self.skipped_entry_changelogs_last,
- self.num_skipped_entry_changelogs))
-
- # Entry Changelogs syncing finished
- logging.info("Syncing Entries completed in {0:.4f} seconds "
- "CHANGELOG.{1} - CHANGELOG.{2} "
- "Num: {3}".format(
- time.time() - self.batch_start_time,
- changes[0].split(".")[-1],
- changes[-1].split(".")[-1],
- len(changes)))
-
- # Update Status Data and Meta Count
- self.status.inc_value("data", db_get_data_count())
- self.status.inc_value("meta", db_get_meta_count())
-
- for b in self.data_batches:
- # Add to data Queue, so that Rsync will start parallelly
- # while syncing Meta ops
- total_datas = self.datas_to_queue(b[0], b[1])
-
- # Sync Meta
- offset = 0
- while True:
- # Db Pagination
- meta_gfids = db_get_meta(start=b[0], end=b[1],
- limit=DB_PAGINATION_SIZE_META,
- offset=offset)
- if len(meta_gfids) == 0:
- break
- offset += DB_PAGINATION_SIZE_META
-
- # Collect required information for GFIDs which
- # exists in Master
- meta_entries = []
- for go in meta_gfids:
- st = lstat(go)
- if isinstance(st, int):
- logging.debug('file %s got purged in the '
- 'interim' % go)
- continue
- meta_entries.append(edct('META', go=go, stat=st))
-
- if meta_entries:
- failures = self.slave.server.meta_ops(meta_entries)
- self.log_failures(failures, 'go', '', 'META')
- self.status.dec_value("meta", len(meta_entries))
-
- # Sync Data, Rsync already started syncing the files
- # wait for the completion and retry if required.
- self.handle_data_sync(b[0], b[1], b[2], done, total_datas)
-
def upd_entry_stime(self, stime):
self.slave.server.set_entry_stime(self.FLAT_DIR_HIERARCHY,
self.uuid,
@@ -1206,12 +1158,7 @@ class GMasterChangelogMixin(GMasterCommon):
remote_node_ip = node.split(":")[0]
self.status.set_slave_node(remote_node_ip)
- def changelogs_batch_process(self, changes, single_batch=False):
- if single_batch and changes:
- logging.debug('processing changes %s' % repr(changes))
- self.process(changes)
- return
-
+ def changelogs_batch_process(self, changes):
changelogs_batches = []
current_size = 0
for c in changes:
@@ -1245,6 +1192,7 @@ class GMasterChangelogMixin(GMasterCommon):
changes = self.changelog_agent.getchanges()
if changes:
if data_stime:
+ logging.info("slave's time: %s" % repr(data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
@@ -1313,15 +1261,13 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
# to be processed. returns positive value as number of changelogs
# to be processed, which will be fetched using
# history_getchanges()
- num_scanned_changelogs = self.changelog_agent.history_scan()
- num_changelogs = num_scanned_changelogs
- changes = []
- while num_scanned_changelogs > 0:
+ while self.changelog_agent.history_scan() > 0:
self.crawls += 1
- changes += self.changelog_agent.history_getchanges()
+ changes = self.changelog_agent.history_getchanges()
if changes:
if data_stime:
+ logging.info("slave's time: %s" % repr(data_stime))
processed = [x for x in changes
if int(x.split('.')[-1]) < data_stime[0]]
for pr in processed:
@@ -1330,18 +1276,7 @@ class GMasterChangeloghistoryMixin(GMasterChangelogMixin):
self.changelog_done_func(pr)
changes.remove(pr)
- if num_changelogs > gconf.max_history_changelogs_in_batch:
- self.changelogs_batch_process(changes, single_batch=True)
- num_changelogs = 0
- changes = []
-
- num_scanned_changelogs = self.changelog_agent.history_scan()
- num_changelogs += num_scanned_changelogs
-
- # If Last batch is not processed with MAX_NUM_CHANGELOGS_IN_BATCH
- # condition above
- if changes:
- self.changelogs_batch_process(changes, single_batch=True)
+ self.changelogs_batch_process(changes)
history_turn_time = int(time.time()) - self.history_crawl_start_time
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
index 91ca1916f6..00bf4cb9d5 100644
--- a/geo-replication/syncdaemon/resource.py
+++ b/geo-replication/syncdaemon/resource.py
@@ -31,6 +31,7 @@ import shutil
from gconf import gconf
import repce
from repce import RepceServer, RepceClient
+from master import gmaster_builder
import syncdutils
from syncdutils import GsyncdError, select, privileged, boolify, funcode
from syncdutils import umask, entry2pb, gauxpfx, errno_wrap, lstat
@@ -1408,7 +1409,6 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
def gmaster_instantiate_tuple(self, slave):
"""return a tuple of the 'one shot' and the 'main crawl'
class instance"""
- from master import gmaster_builder
return (gmaster_builder('xsync')(self, slave),
gmaster_builder()(self, slave),
gmaster_builder('changeloghistory')(self, slave))