summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormakkalot <makkalot@gmail.com>2008-07-17 11:49:34 +0300
committermakkalot <makkalot@gmail.com>2008-07-17 11:49:34 +0300
commit241ae0393defac1c28b58ef79c0d585bc8d7b349 (patch)
tree53c79d4a6693947668e4b1db8622afd8f3338cbd
parentf7744eef540cd0c741c471f3f54e98a1ba3d1473 (diff)
downloadfunc-241ae0393defac1c28b58ef79c0d585bc8d7b349.tar.gz
func-241ae0393defac1c28b58ef79c0d585bc8d7b349.tar.xz
func-241ae0393defac1c28b58ef79c0d585bc8d7b349.zip
adding a new method that gets back the open_ids in func database
-rw-r--r--func/jobthing.py27
1 files changed, 23 insertions, 4 deletions
diff --git a/func/jobthing.py b/func/jobthing.py
index 051480d..df5f195 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -57,12 +57,29 @@ def __purge_old_jobs(storage):
nowtime = time.time()
for x in storage.keys():
# minion jobs have "-minion" in the job id so disambiguation so we need to remove that
- jobkey = x.replace("-","").replace("minion","")
+ jobkey = x.strip().split('-')[3]
create_time = float(jobkey)
if nowtime - create_time > RETAIN_INTERVAL:
del storage[x]
-def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purge=False):
+def get_open_ids():
+ return __access_status(write=False,get_all=True)
+
+def __get_open_ids(storage):
+ """
+ That method is needes from other language/API/UI/GUI parts that uses
+ func's async methods to know the status of the results.
+ """
+ result_hash_pack = {}
+ #print storage
+ for job_id,result in storage.iteritems():
+ result_hash_pack[job_id]=result[0]
+
+ return result_hash_pack
+
+
+
+def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purge=False,get_all=False):
dir = os.path.expanduser(CACHE_DIR)
if not os.path.exists(dir):
@@ -94,6 +111,8 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg
if write:
storage[str(jobid)] = (status, results)
rc = jobid
+ elif get_all:
+ rc=__get_open_ids(storage)
elif not purge:
if storage.has_key(str(jobid)):
# tuple of (status, results)
@@ -109,7 +128,7 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg
return rc
-def batch_run(pool, callback, nforks):
+def batch_run(pool, callback, nforks,**extra_args):
"""
This is the method used by the overlord side usage of jobthing.
Minion side usage will use minion_async_run instead.
@@ -119,7 +138,7 @@ def batch_run(pool, callback, nforks):
operation will be created in cachedir and subsequently deleted.
"""
- job_id = pprint.pformat(time.time())
+ job_id = "".join([extra_args['spec'],"-",extra_args['module'],"-",extra_args['method'],"-",pprint.pformat(time.time())])
__update_status(job_id, JOB_ID_RUNNING, -1)
pid = os.fork()
if pid != 0: