summaryrefslogtreecommitdiffstats
path: root/geo-replication/syncdaemon/syncdutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'geo-replication/syncdaemon/syncdutils.py')
-rw-r--r--geo-replication/syncdaemon/syncdutils.py114
1 files changed, 83 insertions, 31 deletions
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
index 1b5684c6d0..822d919ecb 100644
--- a/geo-replication/syncdaemon/syncdutils.py
+++ b/geo-replication/syncdaemon/syncdutils.py
@@ -1,3 +1,13 @@
+#
+# Copyright (c) 2011-2014 Red Hat, Inc. <http://www.redhat.com>
+# This file is part of GlusterFS.
+
+# This file is licensed to you under your choice of the GNU Lesser
+# General Public License, version 3 or any later version (LGPLv3 or
+# later), or the GNU General Public License, version 2 (GPLv2), in all
+# cases as published by the Free Software Foundation.
+#
+
import os
import sys
import pwd
@@ -7,9 +17,9 @@ import shutil
import logging
import socket
from threading import Lock, Thread as baseThread
-from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED, EINTR, ENOENT, EPERM, ESTALE, errorcode
-from signal import signal, SIGTERM, SIGKILL
-from time import sleep
+from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
+from errno import EINTR, ENOENT, EPERM, ESTALE, errorcode
+from signal import signal, SIGTERM
import select as oselect
from os import waitpid as owaitpid
@@ -37,25 +47,29 @@ except ImportError:
_CL_AUX_GFID_PFX = ".gfid/"
GF_OP_RETRIES = 20
+
def escape(s):
"""the chosen flavor of string escaping, used all over
to turn whatever data to creatable representation"""
return urllib.quote_plus(s)
+
def unescape(s):
"""inverse of .escape"""
return urllib.unquote_plus(s)
+
def norm(s):
if s:
return s.replace('-', '_')
-def update_file(path, updater, merger = lambda f: True):
+
+def update_file(path, updater, merger=lambda f: True):
"""update a file in a transaction-like manner"""
fr = fw = None
try:
- fd = os.open(path, os.O_CREAT|os.O_RDWR)
+ fd = os.open(path, os.O_CREAT | os.O_RDWR)
try:
fr = os.fdopen(fd, 'r+b')
except:
@@ -66,7 +80,7 @@ def update_file(path, updater, merger = lambda f: True):
return
tmpp = path + '.tmp.' + str(os.getpid())
- fd = os.open(tmpp, os.O_CREAT|os.O_EXCL|os.O_WRONLY)
+ fd = os.open(tmpp, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
try:
fw = os.fdopen(fd, 'wb', 0)
except:
@@ -80,29 +94,31 @@ def update_file(path, updater, merger = lambda f: True):
if fx:
fx.close()
+
def create_manifest(fname, content):
"""
Create manifest file for SSH Control Path
"""
fd = None
try:
- fd = os.open(fname, os.O_CREAT|os.O_RDWR)
+ fd = os.open(fname, os.O_CREAT | os.O_RDWR)
try:
os.write(fd, content)
except:
os.close(fd)
raise
finally:
- if fd != None:
+ if fd is not None:
os.close(fd)
+
def setup_ssh_ctl(ctld, remote_addr, resource_url):
"""
Setup GConf ssh control path parameters
"""
gconf.ssh_ctl_dir = ctld
content = "SLAVE_HOST=%s\nSLAVE_RESOURCE_URL=%s" % (remote_addr,
- resource_url)
+ resource_url)
content_md5 = md5hex(content)
fname = os.path.join(gconf.ssh_ctl_dir,
"%s.mft" % content_md5)
@@ -112,16 +128,17 @@ def setup_ssh_ctl(ctld, remote_addr, resource_url):
"%s.sock" % content_md5)
gconf.ssh_ctl_args = ["-oControlMaster=auto", "-S", ssh_ctl_path]
+
def grabfile(fname, content=None):
"""open @fname + contest for its fcntl lock
@content: if given, set the file content to it
"""
# damn those messy open() mode codes
- fd = os.open(fname, os.O_CREAT|os.O_RDWR)
+ fd = os.open(fname, os.O_CREAT | os.O_RDWR)
f = os.fdopen(fd, 'r+b', 0)
try:
- fcntl.lockf(f, fcntl.LOCK_EX|fcntl.LOCK_NB)
+ fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except:
ex = sys.exc_info()[1]
f.close()
@@ -139,6 +156,7 @@ def grabfile(fname, content=None):
gconf.permanent_handles.append(f)
return f
+
def grabpidfile(fname=None, setpid=True):
""".grabfile customization for pid files"""
if not fname:
@@ -150,6 +168,7 @@ def grabpidfile(fname=None, setpid=True):
final_lock = Lock()
+
def finalize(*a, **kw):
"""all those messy final steps we go trough upon termination
@@ -169,7 +188,7 @@ def finalize(*a, **kw):
if os.waitpid(gconf.cpid, os.WNOHANG)[0] == gconf.cpid:
# child has terminated
rm_pidf = True
- break;
+ break
time.sleep(0.1)
if rm_pidf:
try:
@@ -194,6 +213,7 @@ def finalize(*a, **kw):
sys.stderr.flush()
os._exit(kw.get('exval', 0))
+
def log_raise_exception(excont):
"""top-level exception handler
@@ -218,20 +238,27 @@ def log_raise_exception(excont):
logging.error(exc.args[0])
sys.stderr.write('failure: ' + exc.args[0] + '\n')
elif isinstance(exc, PickleError) or isinstance(exc, EOFError) or \
- ((isinstance(exc, OSError) or isinstance(exc, IOError)) and \
- exc.errno == EPIPE):
+ ((isinstance(exc, OSError) or isinstance(exc, IOError)) and
+ exc.errno == EPIPE):
logging.error('connection to peer is broken')
if hasattr(gconf, 'transport'):
gconf.transport.wait()
if gconf.transport.returncode == 127:
logging.warn("!!!!!!!!!!!!!")
- logging.warn('!!! getting "No such file or directory" errors '
- "is most likely due to MISCONFIGURATION, please consult "
- "https://access.redhat.com/site/documentation/en-US/Red_Hat_Storage/2.1/html/Administration_Guide/chap-User_Guide-Geo_Rep-Preparation-Settingup_Environment.html")
+ logging.warn('!!! getting "No such file or directory" '
+ "errors is most likely due to "
+ "MISCONFIGURATION"
+ ", please consult https://access.redhat.com"
+ "/site/documentation/en-US/Red_Hat_Storage"
+ "/2.1/html/Administration_Guide"
+ "/chap-User_Guide-Geo_Rep-Preparation-"
+ "Settingup_Environment.html")
logging.warn("!!!!!!!!!!!!!")
gconf.transport.terminate_geterr()
- elif isinstance(exc, OSError) and exc.errno in (ENOTCONN, ECONNABORTED):
- logging.error('glusterfs session went down [%s]', errorcode[exc.errno])
+ elif isinstance(exc, OSError) and exc.errno in (ENOTCONN,
+ ECONNABORTED):
+ logging.error('glusterfs session went down [%s]',
+ errorcode[exc.errno])
else:
logtag = "FAIL"
if not logtag and logging.getLogger().isEnabledFor(logging.DEBUG):
@@ -244,46 +271,54 @@ def log_raise_exception(excont):
class FreeObject(object):
+
"""wildcard class for which any attribute can be set"""
def __init__(self, **kw):
- for k,v in kw.items():
+ for k, v in kw.items():
setattr(self, k, v)
+
class Thread(baseThread):
+
"""thread class flavor for gsyncd
- always a daemon thread
- force exit for whole program if thread
function coughs up an exception
"""
+
def __init__(self, *a, **kw):
tf = kw.get('target')
if tf:
def twrap(*aa):
- excont = FreeObject(exval = 0)
+ excont = FreeObject(exval=0)
try:
tf(*aa)
except:
try:
log_raise_exception(excont)
finally:
- finalize(exval = excont.exval)
+ finalize(exval=excont.exval)
kw['target'] = twrap
baseThread.__init__(self, *a, **kw)
self.setDaemon(True)
+
class GsyncdError(Exception):
pass
-def getusername(uid = None):
- if uid == None:
+
+def getusername(uid=None):
+ if uid is None:
uid = os.geteuid()
return pwd.getpwuid(uid).pw_name
+
def privileged():
return os.geteuid() == 0
+
def boolify(s):
"""
Generic string to boolean converter
@@ -294,7 +329,7 @@ def boolify(s):
- False if it's in false_list
- Warn if it's not present in either and return False
"""
- true_list = ['true', 'yes', '1', 'on']
+ true_list = ['true', 'yes', '1', 'on']
false_list = ['false', 'no', '0', 'off']
if isinstance(s, bool):
@@ -305,10 +340,12 @@ def boolify(s):
if lstr in true_list:
rv = True
elif not lstr in false_list:
- logging.warn("Unknown string (%s) in string to boolean conversion defaulting to False\n" % (s))
+ logging.warn("Unknown string (%s) in string to boolean conversion "
+ "defaulting to False\n" % (s))
return rv
+
def eintr_wrap(func, exc, *a):
"""
wrapper around syscalls resilient to interrupt caused
@@ -322,19 +359,24 @@ def eintr_wrap(func, exc, *a):
if not ex.args[0] == EINTR:
raise
+
def select(*a):
return eintr_wrap(oselect.select, oselect.error, *a)
-def waitpid (*a):
+
+def waitpid(*a):
return eintr_wrap(owaitpid, OSError, *a)
+
def set_term_handler(hook=lambda *a: finalize(*a, **{'exval': 1})):
signal(SIGTERM, hook)
+
def is_host_local(host):
locaddr = False
for ai in socket.getaddrinfo(host, None):
- # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators/mgmt/glusterd/src/glusterd-utils.c#L125
+ # cf. http://github.com/gluster/glusterfs/blob/ce111f47/xlators
+ # /mgmt/glusterd/src/glusterd-utils.c#L125
if ai[0] == socket.AF_INET:
if ai[-1][0].split(".")[0] == "127":
locaddr = True
@@ -358,8 +400,8 @@ def is_host_local(host):
f = open("/proc/sys/net/ipv4/ip_nonlocal_bind")
if int(f.read()) != 0:
raise GsyncdError(
- "non-local bind is set and not allowed to create raw sockets, "
- "cannot determine if %s is local" % host)
+ "non-local bind is set and not allowed to create "
+ "raw sockets, cannot determine if %s is local" % host)
s = socket.socket(ai[0], socket.SOCK_DGRAM)
finally:
if f:
@@ -373,6 +415,7 @@ def is_host_local(host):
s.close()
return locaddr
+
def funcode(f):
fc = getattr(f, 'func_code', None)
if not fc:
@@ -380,32 +423,40 @@ def funcode(f):
fc = f.__code__
return fc
+
def memoize(f):
fc = funcode(f)
fn = fc.co_name
+
def ff(self, *a, **kw):
rv = getattr(self, '_' + fn, None)
- if rv == None:
+ if rv is None:
rv = f(self, *a, **kw)
setattr(self, '_' + fn, rv)
return rv
return ff
+
def umask():
return os.umask(0)
+
def entry2pb(e):
return e.rsplit('/', 1)
+
def gauxpfx():
return _CL_AUX_GFID_PFX
+
def md5hex(s):
return md5(s).hexdigest()
+
def selfkill(sig=SIGTERM):
os.kill(os.getpid(), sig)
+
def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
""" wrapper around calls resilient to errnos.
retry in case of ESTALE by default.
@@ -427,6 +478,7 @@ def errno_wrap(call, arg=[], errnos=[], retry_errnos=[ESTALE]):
return
time.sleep(0.250) # retry the call
+
def lstat(e):
try:
return os.lstat(e)