summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-01-15 15:26:13 -0500
committerMichael DeHaan <mdehaan@redhat.com>2008-01-15 15:26:13 -0500
commitb51e8c0837354eba4e234a1f9bcf0ecd630d2ae0 (patch)
treebb8ac131fe733e7b96b9394343ddd374e829d767
parent820cb35f0ac38dfc49b2369e330056d95eaca7ec (diff)
downloadthird_party-func-b51e8c0837354eba4e234a1f9bcf0ecd630d2ae0.tar.gz
third_party-func-b51e8c0837354eba4e234a1f9bcf0ecd630d2ae0.tar.xz
third_party-func-b51e8c0837354eba4e234a1f9bcf0ecd630d2ae0.zip
Jobthing is now functional (see async_test.py for example usage), we still need
to delete jobs that have expired after a certain amount of time to avoid keeping too many results around in storage.
-rw-r--r--func/overlord/jobthing.py13
-rw-r--r--test/async_test.py38
2 files changed, 45 insertions, 6 deletions
diff --git a/func/overlord/jobthing.py b/func/overlord/jobthing.py
index cd13253..d28e966 100644
--- a/func/overlord/jobthing.py
+++ b/func/overlord/jobthing.py
@@ -37,6 +37,7 @@ def __get_status(jobid):
return __access_status(jobid=jobid, write=False)
def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
+
dir = os.path.expanduser("~/.func")
if not os.path.exists(dir):
os.makedirs(dir)
@@ -53,19 +54,16 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
return {}
- if not storage.has_key("data"):
- storage["data"] = {}
-
# FIXME: the jobid is the time of the job, so deleting jobs
# that are older than a set time would be a very good idea.
if write:
- storage["data"][jobid] = (status, results)
+ storage[str(jobid)] = (status, results)
rc = jobid
else:
- if storage["data"].has_key(jobid):
+ if storage.has_key(str(jobid)):
# tuple of (status, results)
- rc = storage["data"][jobid]
+ rc = storage[str(jobid)]
else:
rc = (JOB_ID_LOST_IN_SPACE, 0)
@@ -84,11 +82,14 @@ def batch_run(server, process_server, nforks):
job_id = time.time()
pid = os.fork()
if pid != 0:
+ #print "DEBUG: UPDATE STATUS: r1: %s" % job_id
__update_status(job_id, JOB_ID_RUNNING, -1)
return job_id
else:
+ #print "DEBUG: UPDATE STATUS: r2: %s" % job_id
__update_status(job_id, JOB_ID_RUNNING, -1)
results = forkbomb.batch_run(server, process_server, nforks)
+ #print "DEBUG: UPDATE STATUS: f1: %s" % job_id
__update_status(job_id, JOB_ID_FINISHED, results)
sys.exit(0)
diff --git a/test/async_test.py b/test/async_test.py
new file mode 100644
index 0000000..4c1acf5
--- /dev/null
+++ b/test/async_test.py
@@ -0,0 +1,38 @@
+from func.overlord.client import Client
+import func.overlord.jobthing as jobthing
+import time
+import sys
+
+TEST_SLEEP = 5
+EXTRA_SLEEP = 5
+
+def __tester(async):
+ if async:
+ client = Client("*",nforks=10,async=True)
+ oldtime = time.time()
+ print "asking minion to sleep for %s seconds" % TEST_SLEEP
+ job_id = client.test.sleep(TEST_SLEEP)
+ print "job_id = %s" % job_id
+ while True:
+ status = client.job_status(job_id)
+ (code, results) = status
+ nowtime = time.time()
+ delta = int(nowtime - oldtime)
+ if nowtime > oldtime + TEST_SLEEP + EXTRA_SLEEP:
+ print "time expired, test failed"
+ sys.exit(1)
+ if code == jobthing.JOB_ID_RUNNING:
+ print "task is still running, %s elapsed ..." % delta
+ elif code == jobthing.JOB_ID_FINISHED:
+ print "task complete, %s elapsed, results = %s" % (delta, results)
+ sys.exit(0)
+ else:
+ print "job not found: %s, %s elapased" % (code, delta)
+ time.sleep(1)
+ else:
+ print Client("*",nforks=10,async=False).test.sleep(5)
+
+# __tester(False)
+__tester(True)
+
+