diff options
author | Michael DeHaan <mdehaan@redhat.com> | 2008-01-24 17:30:09 -0500 |
---|---|---|
committer | Michael DeHaan <mdehaan@redhat.com> | 2008-01-24 17:30:09 -0500 |
commit | 8106c1d88407371505115e7938dc99bcaf6fb1e9 (patch) | |
tree | 7533cf08de4cf4f038a638c27dac8d438d6d6f65 | |
parent | f3e03a6e1cf1696a5c194c662142ea0354726d9d (diff) | |
download | func-8106c1d88407371505115e7938dc99bcaf6fb1e9.tar.gz func-8106c1d88407371505115e7938dc99bcaf6fb1e9.tar.xz func-8106c1d88407371505115e7938dc99bcaf6fb1e9.zip |
Still working on async (pardon the debug output still), there's a long ways to go with the partial status
reporting but it is getting better at actually doing the task, just a few kinks to work out in getting
results reported correctly.
-rw-r--r-- | func/jobthing.py | 24 | ||||
-rwxr-xr-x | func/utils.py | 6 | ||||
-rw-r--r-- | test/async_test.py | 47 |
3 files changed, 47 insertions, 30 deletions
diff --git a/func/jobthing.py b/func/jobthing.py index 5b89599..a5b0820 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -28,7 +28,8 @@ import utils JOB_ID_RUNNING = 0 JOB_ID_FINISHED = 1 JOB_ID_LOST_IN_SPACE = 2 -JOB_ID_PARTIAL = 3 +JOB_ID_ASYNC_STATUS = 3 +JOB_ID_ASYNC_COMPLETE = 4 # how long to retain old job records in the job id database RETAIN_INTERVAL = 60 * 60 @@ -125,7 +126,7 @@ def batch_run(server, process_server, nforks): results = forkbomb.batch_run(server, process_server, nforks) # we now have a list of job id's for each minion, kill the task - __update_status(job_id, JOB_ID_PARTIAL, results) + __update_status(job_id, JOB_ID_ASYNC_STATUS, results) sys.exit(0) def minion_async_run(function_ref, args): @@ -154,36 +155,37 @@ def job_status(jobid, client_class=None): got_status = __get_status(jobid) - # if the status comes back as JOB_ID_PARTIAL what we have is actually a hash + # if the status comes back as JOB_ID_ASYNC_STATUS what we have is actually a hash # of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them # for their actual status, filling in only the ones that are actually done. (interim_rc, interim_results) = got_status - if interim_rc == JOB_ID_PARTIAL: + if interim_rc == JOB_ID_ASYNC_STATUS: partial_results = {} + # print "DEBUG: partial results for batch task: %s" % interim_results + for host in interim_results.keys(): minion_job = interim_results[host] client = client_class(host, noglobs=True, async=False) - # print "DEBUG: client: %s" % client_class minion_result = client.jobs.job_status(minion_job) - # print "DEBUG: minion: %s" % minion_result + print "DEBUG: background task on minion (%s) has status %s" % (minion_job, minion_result) + (minion_interim_rc, minion_interim_result) = minion_result some_missing = False - if minion_interim_rc == JOB_ID_FINISHED: - partial_results[host] = minion_interim_result + if minion_interim_rc not in [ JOB_ID_RUNNING ]: + partial_results[host] = minion_interim_result else: - some_missing = True if some_missing: - return (JOB_ID_PARTIAL, partial_results) + return (JOB_ID_ASYNC_STATUS, partial_results) else: - return (JOB_ID_FINISHED, partial_results) + return (JOB_ID_ASYNC_COMPLETE, partial_results) else: return got_status diff --git a/func/utils.py b/func/utils.py index 140b761..c2fbb9f 100755 --- a/func/utils.py +++ b/func/utils.py @@ -55,32 +55,26 @@ def remove_exceptions(results): """ if results is None: - print "DEBUG: A" return REMOTE_CANARY if str(results).startswith("<Fault"): - print "DEBUG: B" return REMOTE_CANARY if type(results) == xmlrpclib.Fault: - print "DEBUG: C" return REMOTE_CANARY if type(results) == dict: new_results = {} for x in results.keys(): value = results[x] - # print "DEBUG: checking against: %s" % str(value) if str(value).find("<Fault") == -1: # there are interesting issues with the way it is imported and type() # so that is why this hack is here. type(x) != xmlrpclib.Fault appears to miss some things new_results[x] = value else: new_results[x] = REMOTE_CANARY - # print "DEBUG: removed exceptions = %s" % new_results return new_results - print "DEBUG: removed exceptions = %s" % results return results diff --git a/test/async_test.py b/test/async_test.py index 9f70598..af5e55a 100644 --- a/test/async_test.py +++ b/test/async_test.py @@ -6,16 +6,32 @@ import sys TEST_SLEEP = 5 EXTRA_SLEEP = 5 -def __tester(async): +SLOW_COMMAND = 1 +QUICK_COMMAND = 2 +RAISES_EXCEPTION_COMMAND = 3 +FAKE_COMMAND = 4 +TESTS = [ SLOW_COMMAND, QUICK_COMMAND, RAISES_EXCEPTION_COMMAND, FAKE_COMMAND ] + +def __tester(async,test): if async: client = Client("*",nforks=10,async=True) oldtime = time.time() - print "asking minion to sleep for %s seconds" % TEST_SLEEP - # job_id = client.test.sleep(TEST_SLEEP) # ok - # job_id = client.hardware.info() # ok - # job_id = client.test.explode() # doesn't work yet - job_id = client.test.does_not_exist(1,2) # ditto + job_id = -411 + print "======================================================" + if test == SLOW_COMMAND: + print "TESTING command that sleeps %s seconds" % TEST_SLEEP + job_id = client.test.sleep(TEST_SLEEP) + elif test == QUICK_COMMAND: + print "TESTING a quick command" + job_id = client.test.add(1,2) + elif test == RAISES_EXCEPTION_COMMAND: + print "TESTING a command that deliberately raises an exception" + job_id = client.test.explode() # doesn't work yet + elif test == FAKE_COMMAND: + print "TESTING a command that does not exist" + job_id = client.test.does_not_exist(1,2) # ditto + print "======================================================" print "job_id = %s" % job_id while True: @@ -25,22 +41,27 @@ def __tester(async): delta = int(nowtime - oldtime) if nowtime > oldtime + TEST_SLEEP + EXTRA_SLEEP: print "time expired, test failed" - sys.exit(1) + return if code == jobthing.JOB_ID_RUNNING: print "task is still running, %s elapsed ..." % delta - if code == jobthing.JOB_ID_PARTIAL: + elif code == jobthing.JOB_ID_ASYNC_STATUS: print "task reports partial status, %s elapsed, results = %s" % (delta, results) - elif code == jobthing.JOB_ID_FINISHED: - print "task complete, %s elapsed, results = %s" % (delta, results) - sys.exit(0) + print "(non-async) task complete, %s elapsed, results = %s" % (delta, results) + return + elif code == jobthing.JOB_ID_ASYNC_COMPLETE: + print "(async) task complete, %s elapsed, results = %s" % (delta, results) + return else: print "job not found: %s, %s elapased" % (code, delta) time.sleep(1) else: print Client("*",nforks=10,async=False).test.sleep(5) -# __tester(False) -__tester(True) +for t in TESTS: + __tester(True,t) +print "=======================================================" +print "Testing non-async call" +print __tester(False,-1) |