diff options
author | Michael DeHaan <mdehaan@redhat.com> | 2008-01-23 15:58:48 -0500 |
---|---|---|
committer | Michael DeHaan <mdehaan@redhat.com> | 2008-01-23 15:58:48 -0500 |
commit | 19e4d1ba808cbbe95568271a5b0075b8422e4fb6 (patch) | |
tree | 197d55cf717319c5ffe565f9aa8a639b962fc490 /func/jobthing.py | |
parent | ce0eaca23fb42f77f67408e509bbe091f4c27e56 (diff) | |
download | third_party-func-19e4d1ba808cbbe95568271a5b0075b8422e4fb6.tar.gz third_party-func-19e4d1ba808cbbe95568271a5b0075b8422e4fb6.tar.xz third_party-func-19e4d1ba808cbbe95568271a5b0075b8422e4fb6.zip |
Double-barrel asynchronous calls. Async can now occur on both sides simultaneously and still appears as if there is only one
"global" job id to the API caller, the minion job id's are managed in the background, complete with partial result response
as things come in which should be very nice for ajaxy implication. job_status API does still need to be modified to list
active jobs as well as to store the job name.
Diffstat (limited to 'func/jobthing.py')
-rw-r--r-- | func/jobthing.py | 89 |
1 files changed, 80 insertions, 9 deletions
diff --git a/func/jobthing.py b/func/jobthing.py index 4923daa..486fe6b 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -27,6 +27,7 @@ import forkbomb JOB_ID_RUNNING = 0 JOB_ID_FINISHED = 1 JOB_ID_LOST_IN_SPACE = 2 +JOB_ID_PARTIAL = 3 # how long to retain old job records in the job id database RETAIN_INTERVAL = 60 * 60 @@ -40,6 +41,8 @@ def __update_status(jobid, status, results, clear=False): def __get_status(jobid): return __access_status(jobid=jobid, write=False) +def purge_old_jobs(): + return __access_status(purge=True) def __purge_old_jobs(storage): """ @@ -50,11 +53,13 @@ def __purge_old_jobs(storage): """ nowtime = time.time() for x in storage.keys(): - create_time = float(x) + # minion jobs have "-minion" in the job id so disambiguation so we need to remove that + jobkey = x.replace("-","").replace("minion","") + create_time = float(jobkey) if nowtime - create_time > RETAIN_INTERVAL: del storage[x] -def __access_status(jobid=0, status=0, results=0, clear=False, write=False): +def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purge=False): dir = os.path.expanduser(CACHE_DIR) if not os.path.exists(dir): @@ -66,22 +71,28 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False): fcntl.flock(handle.fileno(), fcntl.LOCK_EX) storage = shelve.BsdDbShelf(internal_db) + if clear: storage.clear() storage.close() fcntl.flock(handle.fileno(), fcntl.LOCK_UN) return {} + + if purge or write: + __purge_old_jobs(storage) if write: - __purge_old_jobs(storage) storage[str(jobid)] = (status, results) rc = jobid - else: + elif not purge: if storage.has_key(str(jobid)): # tuple of (status, results) + rc = storage[str(jobid)] else: rc = (JOB_ID_LOST_IN_SPACE, 0) + else: + rc = 0 storage.close() fcntl.flock(handle.fileno(), fcntl.LOCK_UN) @@ -91,7 +102,7 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False): def batch_run(server, process_server, nforks): """ This is the method used by the overlord side usage of jobthing. - It likely makes little sense for the minion async usage (yet). + Minion side usage will use minion_async_run instead. Given an array of items (pool), call callback in each one, but divide the workload over nfork forks. Temporary files used during the @@ -105,15 +116,75 @@ def batch_run(server, process_server, nforks): __update_status(job_id, JOB_ID_RUNNING, -1) return job_id else: - #print "DEBUG: UPDATE STATUS: r2: %s" % job_id + # kick off the job __update_status(job_id, JOB_ID_RUNNING, -1) results = forkbomb.batch_run(server, process_server, nforks) - #print "DEBUG: UPDATE STATUS: f1: %s" % job_id + + # we now have a list of job id's for each minion, kill the task + __update_status(job_id, JOB_ID_PARTIAL, results) + sys.exit(0) + +def minion_async_run(function_ref, args): + """ + This is a simpler invocation for minion side async usage. + """ + # to avoid confusion of job id's (we use the same job database) + # minion jobs contain the string "minion". + job_id = "%s-minion" % time.time() + pid = os.fork() + if pid != 0: + __update_status(job_id, JOB_ID_RUNNING, -1) + return job_id + else: + __update_status(job_id, JOB_ID_RUNNING, -1) + results = function_ref(args) __update_status(job_id, JOB_ID_FINISHED, results) sys.exit(0) -def job_status(jobid): - return __get_status(jobid) +def job_status(jobid, client_class=None): + + # NOTE: client_class is here to get around some evil circular reference + # type stuff. This is intended to be called by minions (who can leave it None) + # or by the Client module code (which does not need to be worried about it). API + # users should not be calling jobthing.py methods directly. + + got_status = __get_status(jobid) + + # if the status comes back as JOB_ID_PARTIAL what we have is actually a hash + # of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them + # for their actual status, filling in only the ones that are actually done. + + (interim_rc, interim_results) = got_status + + if interim_rc == JOB_ID_PARTIAL: + + partial_results = {} + + for host in interim_results.keys(): + + minion_job = interim_results[host] + client = client_class(host, noglobs=True, async=False) + # print "DEBUG: client: %s" % client_class + minion_result = client.jobs.job_status(minion_job) + # print "DEBUG: minion: %s" % minion_result + (minion_interim_rc, minion_interim_result) = minion_result + + some_missing = False + if minion_interim_rc == JOB_ID_FINISHED: + partial_results[host] = minion_interim_result + else: + + some_missing = True + + if some_missing: + return (JOB_ID_PARTIAL, partial_results) + else: + return (JOB_ID_FINISHED, partial_results) + + else: + return got_status + + # of job id's on the minion in results. if __name__ == "__main__": __test() |