summaryrefslogtreecommitdiffstats
path: root/func/jobthing.py
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-01-23 15:58:48 -0500
committerMichael DeHaan <mdehaan@redhat.com>2008-01-23 15:58:48 -0500
commit19e4d1ba808cbbe95568271a5b0075b8422e4fb6 (patch)
tree197d55cf717319c5ffe565f9aa8a639b962fc490 /func/jobthing.py
parentce0eaca23fb42f77f67408e509bbe091f4c27e56 (diff)
downloadthird_party-func-19e4d1ba808cbbe95568271a5b0075b8422e4fb6.tar.gz
third_party-func-19e4d1ba808cbbe95568271a5b0075b8422e4fb6.tar.xz
third_party-func-19e4d1ba808cbbe95568271a5b0075b8422e4fb6.zip
Double-barrel asynchronous calls. Async can now occur on both sides simultaneously and still appears as if there is only one
"global" job id to the API caller, the minion job id's are managed in the background, complete with partial result response as things come in which should be very nice for ajaxy implication. job_status API does still need to be modified to list active jobs as well as to store the job name.
Diffstat (limited to 'func/jobthing.py')
-rw-r--r--func/jobthing.py89
1 files changed, 80 insertions, 9 deletions
diff --git a/func/jobthing.py b/func/jobthing.py
index 4923daa..486fe6b 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -27,6 +27,7 @@ import forkbomb
JOB_ID_RUNNING = 0
JOB_ID_FINISHED = 1
JOB_ID_LOST_IN_SPACE = 2
+JOB_ID_PARTIAL = 3
# how long to retain old job records in the job id database
RETAIN_INTERVAL = 60 * 60
@@ -40,6 +41,8 @@ def __update_status(jobid, status, results, clear=False):
def __get_status(jobid):
return __access_status(jobid=jobid, write=False)
+def purge_old_jobs():
+ return __access_status(purge=True)
def __purge_old_jobs(storage):
"""
@@ -50,11 +53,13 @@ def __purge_old_jobs(storage):
"""
nowtime = time.time()
for x in storage.keys():
- create_time = float(x)
+ # minion jobs have "-minion" in the job id so disambiguation so we need to remove that
+ jobkey = x.replace("-","").replace("minion","")
+ create_time = float(jobkey)
if nowtime - create_time > RETAIN_INTERVAL:
del storage[x]
-def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
+def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purge=False):
dir = os.path.expanduser(CACHE_DIR)
if not os.path.exists(dir):
@@ -66,22 +71,28 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
storage = shelve.BsdDbShelf(internal_db)
+
if clear:
storage.clear()
storage.close()
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
return {}
+
+ if purge or write:
+ __purge_old_jobs(storage)
if write:
- __purge_old_jobs(storage)
storage[str(jobid)] = (status, results)
rc = jobid
- else:
+ elif not purge:
if storage.has_key(str(jobid)):
# tuple of (status, results)
+
rc = storage[str(jobid)]
else:
rc = (JOB_ID_LOST_IN_SPACE, 0)
+ else:
+ rc = 0
storage.close()
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
@@ -91,7 +102,7 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
def batch_run(server, process_server, nforks):
"""
This is the method used by the overlord side usage of jobthing.
- It likely makes little sense for the minion async usage (yet).
+ Minion side usage will use minion_async_run instead.
Given an array of items (pool), call callback in each one, but divide
the workload over nfork forks. Temporary files used during the
@@ -105,15 +116,75 @@ def batch_run(server, process_server, nforks):
__update_status(job_id, JOB_ID_RUNNING, -1)
return job_id
else:
- #print "DEBUG: UPDATE STATUS: r2: %s" % job_id
+ # kick off the job
__update_status(job_id, JOB_ID_RUNNING, -1)
results = forkbomb.batch_run(server, process_server, nforks)
- #print "DEBUG: UPDATE STATUS: f1: %s" % job_id
+
+ # we now have a list of job id's for each minion, kill the task
+ __update_status(job_id, JOB_ID_PARTIAL, results)
+ sys.exit(0)
+
+def minion_async_run(function_ref, args):
+ """
+ This is a simpler invocation for minion side async usage.
+ """
+ # to avoid confusion of job id's (we use the same job database)
+ # minion jobs contain the string "minion".
+ job_id = "%s-minion" % time.time()
+ pid = os.fork()
+ if pid != 0:
+ __update_status(job_id, JOB_ID_RUNNING, -1)
+ return job_id
+ else:
+ __update_status(job_id, JOB_ID_RUNNING, -1)
+ results = function_ref(args)
__update_status(job_id, JOB_ID_FINISHED, results)
sys.exit(0)
-def job_status(jobid):
- return __get_status(jobid)
+def job_status(jobid, client_class=None):
+
+ # NOTE: client_class is here to get around some evil circular reference
+ # type stuff. This is intended to be called by minions (who can leave it None)
+ # or by the Client module code (which does not need to be worried about it). API
+ # users should not be calling jobthing.py methods directly.
+
+ got_status = __get_status(jobid)
+
+ # if the status comes back as JOB_ID_PARTIAL what we have is actually a hash
+ # of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them
+ # for their actual status, filling in only the ones that are actually done.
+
+ (interim_rc, interim_results) = got_status
+
+ if interim_rc == JOB_ID_PARTIAL:
+
+ partial_results = {}
+
+ for host in interim_results.keys():
+
+ minion_job = interim_results[host]
+ client = client_class(host, noglobs=True, async=False)
+ # print "DEBUG: client: %s" % client_class
+ minion_result = client.jobs.job_status(minion_job)
+ # print "DEBUG: minion: %s" % minion_result
+ (minion_interim_rc, minion_interim_result) = minion_result
+
+ some_missing = False
+ if minion_interim_rc == JOB_ID_FINISHED:
+ partial_results[host] = minion_interim_result
+ else:
+
+ some_missing = True
+
+ if some_missing:
+ return (JOB_ID_PARTIAL, partial_results)
+ else:
+ return (JOB_ID_FINISHED, partial_results)
+
+ else:
+ return got_status
+
+ # of job id's on the minion in results.
if __name__ == "__main__":
__test()