diff options
-rw-r--r-- | func/overlord/jobthing.py | 13 | ||||
-rw-r--r-- | test/async_test.py | 38 |
2 files changed, 45 insertions, 6 deletions
diff --git a/func/overlord/jobthing.py b/func/overlord/jobthing.py index cd13253..d28e966 100644 --- a/func/overlord/jobthing.py +++ b/func/overlord/jobthing.py @@ -37,6 +37,7 @@ def __get_status(jobid): return __access_status(jobid=jobid, write=False) def __access_status(jobid=0, status=0, results=0, clear=False, write=False): + dir = os.path.expanduser("~/.func") if not os.path.exists(dir): os.makedirs(dir) @@ -53,19 +54,16 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False): fcntl.flock(handle.fileno(), fcntl.LOCK_UN) return {} - if not storage.has_key("data"): - storage["data"] = {} - # FIXME: the jobid is the time of the job, so deleting jobs # that are older than a set time would be a very good idea. if write: - storage["data"][jobid] = (status, results) + storage[str(jobid)] = (status, results) rc = jobid else: - if storage["data"].has_key(jobid): + if storage.has_key(str(jobid)): # tuple of (status, results) - rc = storage["data"][jobid] + rc = storage[str(jobid)] else: rc = (JOB_ID_LOST_IN_SPACE, 0) @@ -84,11 +82,14 @@ def batch_run(server, process_server, nforks): job_id = time.time() pid = os.fork() if pid != 0: + #print "DEBUG: UPDATE STATUS: r1: %s" % job_id __update_status(job_id, JOB_ID_RUNNING, -1) return job_id else: + #print "DEBUG: UPDATE STATUS: r2: %s" % job_id __update_status(job_id, JOB_ID_RUNNING, -1) results = forkbomb.batch_run(server, process_server, nforks) + #print "DEBUG: UPDATE STATUS: f1: %s" % job_id __update_status(job_id, JOB_ID_FINISHED, results) sys.exit(0) diff --git a/test/async_test.py b/test/async_test.py new file mode 100644 index 0000000..4c1acf5 --- /dev/null +++ b/test/async_test.py @@ -0,0 +1,38 @@ +from func.overlord.client import Client +import func.overlord.jobthing as jobthing +import time +import sys + +TEST_SLEEP = 5 +EXTRA_SLEEP = 5 + +def __tester(async): + if async: + client = Client("*",nforks=10,async=True) + oldtime = time.time() + print "asking minion to sleep for %s seconds" % TEST_SLEEP + job_id = client.test.sleep(TEST_SLEEP) + print "job_id = %s" % job_id + while True: + status = client.job_status(job_id) + (code, results) = status + nowtime = time.time() + delta = int(nowtime - oldtime) + if nowtime > oldtime + TEST_SLEEP + EXTRA_SLEEP: + print "time expired, test failed" + sys.exit(1) + if code == jobthing.JOB_ID_RUNNING: + print "task is still running, %s elapsed ..." % delta + elif code == jobthing.JOB_ID_FINISHED: + print "task complete, %s elapsed, results = %s" % (delta, results) + sys.exit(0) + else: + print "job not found: %s, %s elapased" % (code, delta) + time.sleep(1) + else: + print Client("*",nforks=10,async=False).test.sleep(5) + +# __tester(False) +__tester(True) + + |