diff options
author | makkalot <makkalot@gmail.com> | 2008-08-03 00:12:11 +0300 |
---|---|---|
committer | makkalot <makkalot@gmail.com> | 2008-08-03 00:12:11 +0300 |
commit | 8e8560dcb4d9e796156df8bfaad6564a555e2724 (patch) | |
tree | cb395135d194c9228203ee337a1fee097e41c5b0 | |
parent | 085dc1cf3a1387cf6bc361b0a0a93e0d63df1ecf (diff) | |
download | func-8e8560dcb4d9e796156df8bfaad6564a555e2724.tar.gz func-8e8560dcb4d9e796156df8bfaad6564a555e2724.tar.xz func-8e8560dcb4d9e796156df8bfaad6564a555e2724.zip |
fix for jobthing to tolerate the weird and long names for globs in database related with async job_ids
-rw-r--r-- | func/jobthing.py | 28 | ||||
-rwxr-xr-x | func/utils.py | 24 |
2 files changed, 44 insertions, 8 deletions
diff --git a/func/jobthing.py b/func/jobthing.py index d56475a..cc6808b 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -47,6 +47,9 @@ def __get_status(jobid): def purge_old_jobs(): return __access_status(purge=True) +def clear_db(): + return __access_status(clear=True) + def __purge_old_jobs(storage): """ Deletes jobs older than RETAIN_INTERVAL seconds. @@ -56,12 +59,20 @@ def __purge_old_jobs(storage): """ nowtime = time.time() for x in storage.keys(): - # minion jobs have "-minion" in the job id so disambiguation so we need to remove that jobkey = x.strip().split('-') - if len(jobkey) == 4: #the overrlord part for new format job_ids - jobkey = jobkey[3] - else: #the minion part job_ids + #if the jobkey's lenght is smaller than 4 it means that + #that id maybe a minion id that is in timestap-minion format + #or maybe a an old id which is in timestap format that handles + #both situations + if len(jobkey)<4: #the minion part job_ids jobkey = jobkey[0] + #if the job is equal or bigger than 4 that means that it is a new type id + #which is in glob-module-method-timestamp format, in a perfect world the lenght + #of the jobkey should be exactly 4 but in some situations we have bigger lenghts + #anyway that control will hande all situation because only we need is the timestamp + #member which is the last one + else: + jobkey = jobkey[len(jobkey)-1] create_time = float(jobkey) if nowtime - create_time > RETAIN_INTERVAL: @@ -81,7 +92,9 @@ def __get_open_ids(storage): #TOBE REMOVED that control is for old job_ids #some users who will upgrade to new version will have errors #if we dont have that control here :) - if len(job_id.split("-"))==4: #ignore the old job_ids + if len(job_id.split("-"))>=4: #ignore the old job_ids the overlord part + result_hash_pack[job_id]=result[0] + elif len(job_id.split("-"))==2: #it seems to be a minion side id and also ignores old ids result_hash_pack[job_id]=result[0] return result_hash_pack @@ -147,7 +160,8 @@ def batch_run(pool, callback, nforks,**extra_args): operation will be created in cachedir and subsequently deleted. """ - job_id = "".join([extra_args['spec'],"-",extra_args['module'],"-",extra_args['method'],"-",pprint.pformat(time.time())]) + job_id = utils.get_formated_jobid(**extra_args) + __update_status(job_id, JOB_ID_RUNNING, -1) pid = os.fork() if pid != 0: @@ -232,7 +246,7 @@ def job_status(jobid, client_class=None): else: some_missing = True - if some_missing: + if some_missing or not interim_results: if partial_results: __update_status(jobid, JOB_ID_PARTIAL, partial_results) return (JOB_ID_PARTIAL, partial_results) diff --git a/func/utils.py b/func/utils.py index 0e4b4d5..628694d 100755 --- a/func/utils.py +++ b/func/utils.py @@ -29,4 +29,26 @@ def is_error(result): return False - +def remove_weird_chars(dirty_word): + """ + That method will be used to clean some + glob adress expressions because async stuff + depends on that part + + @param dirty_word : word to be cleaned + """ + from copy import copy + copy_word = copy(dirty_word) + copy_word = copy_word.replace("-","_") + return copy_word + +def get_formated_jobid(**id_pack): + import time + import pprint + + glob = remove_weird_chars(id_pack['spec']) + module = remove_weird_chars(id_pack['module']) + method = remove_weird_chars(id_pack['method']) + job_id = "".join([glob,"-",module,"-",method,"-",pprint.pformat(time.time())]) + return job_id + |