diff options
-rw-r--r-- | func/jobthing.py | 27 |
1 files changed, 23 insertions, 4 deletions
diff --git a/func/jobthing.py b/func/jobthing.py index 051480d..df5f195 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -57,12 +57,29 @@ def __purge_old_jobs(storage): nowtime = time.time() for x in storage.keys(): # minion jobs have "-minion" in the job id so disambiguation so we need to remove that - jobkey = x.replace("-","").replace("minion","") + jobkey = x.strip().split('-')[3] create_time = float(jobkey) if nowtime - create_time > RETAIN_INTERVAL: del storage[x] -def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purge=False): +def get_open_ids(): + return __access_status(write=False,get_all=True) + +def __get_open_ids(storage): + """ + That method is needes from other language/API/UI/GUI parts that uses + func's async methods to know the status of the results. + """ + result_hash_pack = {} + #print storage + for job_id,result in storage.iteritems(): + result_hash_pack[job_id]=result[0] + + return result_hash_pack + + + +def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purge=False,get_all=False): dir = os.path.expanduser(CACHE_DIR) if not os.path.exists(dir): @@ -94,6 +111,8 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg if write: storage[str(jobid)] = (status, results) rc = jobid + elif get_all: + rc=__get_open_ids(storage) elif not purge: if storage.has_key(str(jobid)): # tuple of (status, results) @@ -109,7 +128,7 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg return rc -def batch_run(pool, callback, nforks): +def batch_run(pool, callback, nforks,**extra_args): """ This is the method used by the overlord side usage of jobthing. Minion side usage will use minion_async_run instead. @@ -119,7 +138,7 @@ def batch_run(pool, callback, nforks): operation will be created in cachedir and subsequently deleted. """ - job_id = pprint.pformat(time.time()) + job_id = "".join([extra_args['spec'],"-",extra_args['module'],"-",extra_args['method'],"-",pprint.pformat(time.time())]) __update_status(job_id, JOB_ID_RUNNING, -1) pid = os.fork() if pid != 0: |