summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-01-23 15:58:48 -0500
committerMichael DeHaan <mdehaan@redhat.com>2008-01-23 15:58:48 -0500
commit19e4d1ba808cbbe95568271a5b0075b8422e4fb6 (patch)
tree197d55cf717319c5ffe565f9aa8a639b962fc490
parentce0eaca23fb42f77f67408e509bbe091f4c27e56 (diff)
downloadthird_party-func-19e4d1ba808cbbe95568271a5b0075b8422e4fb6.tar.gz
third_party-func-19e4d1ba808cbbe95568271a5b0075b8422e4fb6.tar.xz
third_party-func-19e4d1ba808cbbe95568271a5b0075b8422e4fb6.zip
Double-barrel asynchronous calls. Async can now occur on both sides simultaneously and still appears as if there is only one
"global" job id to the API caller, the minion job id's are managed in the background, complete with partial result response as things come in which should be very nice for ajaxy implication. job_status API does still need to be modified to list active jobs as well as to store the job name.
-rw-r--r--func/forkbomb.py1
-rw-r--r--func/jobthing.py89
-rw-r--r--func/minion/modules/jobs.py36
-rwxr-xr-xfunc/minion/server.py17
-rwxr-xr-xfunc/overlord/client.py11
-rw-r--r--test/async_test.py3
6 files changed, 146 insertions, 11 deletions
diff --git a/func/forkbomb.py b/func/forkbomb.py
index c30cc9e..daad28a 100644
--- a/func/forkbomb.py
+++ b/func/forkbomb.py
@@ -133,6 +133,7 @@ def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR):
if nforks <= 1:
# modulus voodoo gets crazy otherwise and bad things happen
nforks = 2
+ # print "DEBUG: nforks=%s" % 2
shelf_file = __get_storage(cachedir)
__access_buckets(shelf_file,True,None)
buckets = __bucketize(pool, nforks)
diff --git a/func/jobthing.py b/func/jobthing.py
index 4923daa..486fe6b 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -27,6 +27,7 @@ import forkbomb
JOB_ID_RUNNING = 0
JOB_ID_FINISHED = 1
JOB_ID_LOST_IN_SPACE = 2
+JOB_ID_PARTIAL = 3
# how long to retain old job records in the job id database
RETAIN_INTERVAL = 60 * 60
@@ -40,6 +41,8 @@ def __update_status(jobid, status, results, clear=False):
def __get_status(jobid):
return __access_status(jobid=jobid, write=False)
+def purge_old_jobs():
+ return __access_status(purge=True)
def __purge_old_jobs(storage):
"""
@@ -50,11 +53,13 @@ def __purge_old_jobs(storage):
"""
nowtime = time.time()
for x in storage.keys():
- create_time = float(x)
+ # minion jobs have "-minion" in the job id so disambiguation so we need to remove that
+ jobkey = x.replace("-","").replace("minion","")
+ create_time = float(jobkey)
if nowtime - create_time > RETAIN_INTERVAL:
del storage[x]
-def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
+def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purge=False):
dir = os.path.expanduser(CACHE_DIR)
if not os.path.exists(dir):
@@ -66,22 +71,28 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
storage = shelve.BsdDbShelf(internal_db)
+
if clear:
storage.clear()
storage.close()
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
return {}
+
+ if purge or write:
+ __purge_old_jobs(storage)
if write:
- __purge_old_jobs(storage)
storage[str(jobid)] = (status, results)
rc = jobid
- else:
+ elif not purge:
if storage.has_key(str(jobid)):
# tuple of (status, results)
+
rc = storage[str(jobid)]
else:
rc = (JOB_ID_LOST_IN_SPACE, 0)
+ else:
+ rc = 0
storage.close()
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
@@ -91,7 +102,7 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
def batch_run(server, process_server, nforks):
"""
This is the method used by the overlord side usage of jobthing.
- It likely makes little sense for the minion async usage (yet).
+ Minion side usage will use minion_async_run instead.
Given an array of items (pool), call callback in each one, but divide
the workload over nfork forks. Temporary files used during the
@@ -105,15 +116,75 @@ def batch_run(server, process_server, nforks):
__update_status(job_id, JOB_ID_RUNNING, -1)
return job_id
else:
- #print "DEBUG: UPDATE STATUS: r2: %s" % job_id
+ # kick off the job
__update_status(job_id, JOB_ID_RUNNING, -1)
results = forkbomb.batch_run(server, process_server, nforks)
- #print "DEBUG: UPDATE STATUS: f1: %s" % job_id
+
+ # we now have a list of job id's for each minion, kill the task
+ __update_status(job_id, JOB_ID_PARTIAL, results)
+ sys.exit(0)
+
+def minion_async_run(function_ref, args):
+ """
+ This is a simpler invocation for minion side async usage.
+ """
+ # to avoid confusion of job id's (we use the same job database)
+ # minion jobs contain the string "minion".
+ job_id = "%s-minion" % time.time()
+ pid = os.fork()
+ if pid != 0:
+ __update_status(job_id, JOB_ID_RUNNING, -1)
+ return job_id
+ else:
+ __update_status(job_id, JOB_ID_RUNNING, -1)
+ results = function_ref(args)
__update_status(job_id, JOB_ID_FINISHED, results)
sys.exit(0)
-def job_status(jobid):
- return __get_status(jobid)
+def job_status(jobid, client_class=None):
+
+ # NOTE: client_class is here to get around some evil circular reference
+ # type stuff. This is intended to be called by minions (who can leave it None)
+ # or by the Client module code (which does not need to be worried about it). API
+ # users should not be calling jobthing.py methods directly.
+
+ got_status = __get_status(jobid)
+
+ # if the status comes back as JOB_ID_PARTIAL what we have is actually a hash
+ # of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them
+ # for their actual status, filling in only the ones that are actually done.
+
+ (interim_rc, interim_results) = got_status
+
+ if interim_rc == JOB_ID_PARTIAL:
+
+ partial_results = {}
+
+ for host in interim_results.keys():
+
+ minion_job = interim_results[host]
+ client = client_class(host, noglobs=True, async=False)
+ # print "DEBUG: client: %s" % client_class
+ minion_result = client.jobs.job_status(minion_job)
+ # print "DEBUG: minion: %s" % minion_result
+ (minion_interim_rc, minion_interim_result) = minion_result
+
+ some_missing = False
+ if minion_interim_rc == JOB_ID_FINISHED:
+ partial_results[host] = minion_interim_result
+ else:
+
+ some_missing = True
+
+ if some_missing:
+ return (JOB_ID_PARTIAL, partial_results)
+ else:
+ return (JOB_ID_FINISHED, partial_results)
+
+ else:
+ return got_status
+
+ # of job id's on the minion in results.
if __name__ == "__main__":
__test()
diff --git a/func/minion/modules/jobs.py b/func/minion/modules/jobs.py
new file mode 100644
index 0000000..69fb75f
--- /dev/null
+++ b/func/minion/modules/jobs.py
@@ -0,0 +1,36 @@
+## (Largely internal) module for access to asynchoronously dispatched
+## module job ID's. The Func Client() module wraps most of this usage
+## so it's not entirely relevant to folks using the CLI or Func API
+## directly.
+##
+## Copyright 2008, Red Hat, Inc
+## Michael DeHaan <mdehaan@redhat.com>
+##
+## This software may be freely redistributed under the terms of the GNU
+## general public license.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, write to the Free Software
+## Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+##
+
+import codes
+from func import jobthing
+import func_module
+
+# =================================
+
+class JobsModule(func_module.FuncModule):
+
+ version = "0.0.1"
+ api_version = "0.0.1"
+ description = "Internal module for tracking background minion tasks."
+
+ def job_status(self, job_id):
+ """
+ Returns job status in the form of (status, datastruct).
+ Datastruct is undefined for unfinished jobs. See jobthing.py and
+ Wiki details on async invocation for more information.
+ """
+ return jobthing.job_status(job_id)
+
diff --git a/func/minion/server.py b/func/minion/server.py
index 6e55e70..1b2a556 100755
--- a/func/minion/server.py
+++ b/func/minion/server.py
@@ -28,6 +28,7 @@ from func.config import read_config
from func.commonconfig import FuncdConfig
from func import logger
from func import certs
+import func.jobthing as jobthing
# our modules
import AuthedXMLRPCServer
@@ -196,6 +197,16 @@ class FuncSSLXMLRPCServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer,
peer_cert = r.get_peer_certificate()
ip = a[0]
+
+ # generally calling conventions are: hardware.info
+ # async convention is async.hardware.info
+ # here we parse out the async to decide how to invoke it.
+ # see the async docs on the Wiki for further info.
+ async_dispatch = False
+ if method.startswith("async."):
+ async_dispatch = True
+ method = method.replace("async.","",1)
+
if not self._check_acl(peer_cert, ip, method, params):
raise codes.AccessToMethodDenied
@@ -207,7 +218,11 @@ class FuncSSLXMLRPCServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer,
sub_hash = peer_cert.subject_name_hash()
self.audit_logger.log_call(ip, cn, sub_hash, method, params)
- return self.get_dispatch_method(method)(*params)
+ if not async_dispatch:
+ return self.get_dispatch_method(method)(*params)
+ else:
+ meth = self.get_dispatch_method(method)
+ return jobthing.minion_async_run(meth, *params)
def auth_cb(self, request, client_address):
peer_cert = request.get_peer_certificate()
diff --git a/func/overlord/client.py b/func/overlord/client.py
index 98edaed..6e663ea 100755
--- a/func/overlord/client.py
+++ b/func/overlord/client.py
@@ -166,7 +166,7 @@ class Client(object):
"""
Use this to acquire status from jobs when using run with async client handles
"""
- return jobthing.job_status(jobid)
+ return jobthing.job_status(jobid, client_class=Client)
# -----------------------------------------------
@@ -200,7 +200,16 @@ class Client(object):
# we can't call "call" on s, since thats a rpc, so
# we call gettatr around it.
meth = "%s.%s" % (module, method)
+
+ # async calling signature has an "imaginary" prefix
+ # so async.abc.def does abc.def as a background task.
+ # see Wiki docs for details
+ if self.async:
+ meth = "async.%s" % meth
+
+ # this is the point at which we make the remote call.
retval = getattr(conn, meth)(*args[:])
+
if self.interactive:
print retval
except Exception, e:
diff --git a/test/async_test.py b/test/async_test.py
index 43904c4..cec512b 100644
--- a/test/async_test.py
+++ b/test/async_test.py
@@ -23,6 +23,9 @@ def __tester(async):
sys.exit(1)
if code == jobthing.JOB_ID_RUNNING:
print "task is still running, %s elapsed ..." % delta
+ if code == jobthing.JOB_ID_PARTIAL:
+ print "task reports partial status, %s elapsed, results = %s" % (delta, results)
+
elif code == jobthing.JOB_ID_FINISHED:
print "task complete, %s elapsed, results = %s" % (delta, results)
sys.exit(0)