diff options
Diffstat (limited to 'geo-replication/syncdaemon/monitor.py')
-rw-r--r-- | geo-replication/syncdaemon/monitor.py | 68 |
1 files changed, 18 insertions, 50 deletions
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py index a193b57caf..257d34a743 100644 --- a/geo-replication/syncdaemon/monitor.py +++ b/geo-replication/syncdaemon/monitor.py @@ -14,7 +14,6 @@ import time import signal import logging import xml.etree.ElementTree as XET -from subprocess import PIPE from threading import Lock from errno import ECHILD, ESRCH import random @@ -23,9 +22,9 @@ from resource import SSH import gsyncdconfig as gconf from rconf import rconf from syncdutils import select, waitpid, errno_wrap, lf, grabpidfile -from syncdutils import set_term_handler, is_host_local, GsyncdError -from syncdutils import Thread, finalize, Popen, Volinfo -from syncdutils import gf_event, EVENT_GEOREP_FAULTY +from syncdutils import set_term_handler, GsyncdError +from syncdutils import Thread, finalize, Volinfo, VolinfoFromGconf +from syncdutils import gf_event, EVENT_GEOREP_FAULTY, get_up_nodes from gsyncdstatus import GeorepStatus, set_monitor_status @@ -54,43 +53,6 @@ def get_subvol_num(brick_idx, vol, hot): return str(cnt) -def get_slave_bricks_status(host, vol): - po = Popen(['gluster', '--xml', '--remote-host=' + host, - 'volume', 'status', vol, "detail"], - stdout=PIPE, stderr=PIPE) - vix = po.stdout.read() - po.wait() - po.terminate_geterr(fail_on_err=False) - if po.returncode != 0: - logging.info(lf("Volume status command failed, unable to get " - "list of up nodes, returning empty list", - volume=vol, - error=po.returncode)) - return [] - vi = XET.fromstring(vix) - if vi.find('opRet').text != '0': - logging.info(lf("Unable to get list of up nodes, " - "returning empty list", - volume=vol, - error=vi.find('opErrstr').text)) - return [] - - up_hosts = set() - - try: - for el in vi.findall('volStatus/volumes/volume/node'): - if el.find('status').text == '1': - up_hosts.add((el.find('hostname').text, - el.find('peerid').text)) - except (ParseError, AttributeError, ValueError) as e: - logging.info(lf("Parsing failed to get list of up nodes, " - "returning empty list", - volume=vol, - error=e)) - - return list(up_hosts) - - class Monitor(object): """class which spawns and manages gsyncd workers""" @@ -116,7 +78,7 @@ class Monitor(object): errno_wrap(os.kill, [-os.getpid(), signal.SIGTERM], [ESRCH]) def monitor(self, w, argv, cpids, agents, slave_vol, slave_host, master, - suuid): + suuid, slavenodes): """the monitor loop Basic logic is a blantantly simple blunt heuristics: @@ -180,8 +142,7 @@ class Monitor(object): # If the connected slave node is down then try to connect to # different up node. current_slave_host = remote_host - slave_up_hosts = get_slave_bricks_status( - slave_host, slave_vol) + slave_up_hosts = get_up_nodes(slavenodes, gconf.get("ssh-port")) if (current_slave_host, remote_id) not in slave_up_hosts: if len(slave_up_hosts) > 0: @@ -354,7 +315,7 @@ class Monitor(object): self.status[w[0]['dir']].set_worker_status(self.ST_INCON) return ret - def multiplex(self, wspx, suuid, slave_vol, slave_host, master): + def multiplex(self, wspx, suuid, slave_vol, slave_host, master, slavenodes): argv = [os.path.basename(sys.executable), sys.argv[0]] cpids = set() @@ -363,7 +324,7 @@ class Monitor(object): for wx in wspx: def wmon(w): cpid, _ = self.monitor(w, argv, cpids, agents, slave_vol, - slave_host, master, suuid) + slave_host, master, suuid, slavenodes) time.sleep(1) self.lock.acquire() for cpid in cpids: @@ -380,7 +341,10 @@ class Monitor(object): def distribute(master, slave): - mvol = Volinfo(master.volume, master.host) + if rconf.args.use_gconf_volinfo: + mvol = VolinfoFromGconf(master.volume, master=True) + else: + mvol = Volinfo(master.volume, master.host) logging.debug('master bricks: ' + repr(mvol.bricks)) prelude = [] slave_host = None @@ -393,7 +357,11 @@ def distribute(master, slave): logging.debug('slave SSH gateway: ' + slave.remote_addr) - svol = Volinfo(slave.volume, "localhost", prelude) + if rconf.args.use_gconf_volinfo: + svol = VolinfoFromGconf(slave.volume, master=False) + else: + svol = Volinfo(slave.volume, "localhost", prelude) + sbricks = svol.bricks suuid = svol.uuid slave_host = slave.remote_addr.split('@')[-1] @@ -415,14 +383,14 @@ def distribute(master, slave): workerspex = [] for idx, brick in enumerate(mvol.bricks): - if is_host_local(brick['uuid']): + if rconf.args.local_node_id == brick['uuid']: is_hot = mvol.is_hot(":".join([brick['host'], brick['dir']])) workerspex.append((brick, slaves[idx % len(slaves)], get_subvol_num(idx, mvol, is_hot), is_hot)) logging.debug('worker specs: ' + repr(workerspex)) - return workerspex, suuid, slave_vol, slave_host, master + return workerspex, suuid, slave_vol, slave_host, master, slavenodes def monitor(local, remote): |