summaryrefslogtreecommitdiffstats
path: root/func
diff options
context:
space:
mode:
authormakkalot <makkalot@gmail.com>2008-08-03 00:29:49 +0300
committermakkalot <makkalot@gmail.com>2008-08-03 00:29:49 +0300
commit0ee2d63f870fcc9849df892b8e429faf5cbccec9 (patch)
tree0b58b440eb9d99546f050a2dacac598007900507 /func
parentb3d1bdf3ca79e181d82320c4305410b367ca99ca (diff)
parent8e8560dcb4d9e796156df8bfaad6564a555e2724 (diff)
downloadfunc-0ee2d63f870fcc9849df892b8e429faf5cbccec9.tar.gz
func-0ee2d63f870fcc9849df892b8e429faf5cbccec9.tar.xz
func-0ee2d63f870fcc9849df892b8e429faf5cbccec9.zip
merge from master async db stuff fixes
Diffstat (limited to 'func')
-rw-r--r--func/jobthing.py28
-rwxr-xr-xfunc/utils.py24
2 files changed, 44 insertions, 8 deletions
diff --git a/func/jobthing.py b/func/jobthing.py
index d56475a..cc6808b 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -47,6 +47,9 @@ def __get_status(jobid):
def purge_old_jobs():
return __access_status(purge=True)
+def clear_db():
+ return __access_status(clear=True)
+
def __purge_old_jobs(storage):
"""
Deletes jobs older than RETAIN_INTERVAL seconds.
@@ -56,12 +59,20 @@ def __purge_old_jobs(storage):
"""
nowtime = time.time()
for x in storage.keys():
- # minion jobs have "-minion" in the job id so disambiguation so we need to remove that
jobkey = x.strip().split('-')
- if len(jobkey) == 4: #the overrlord part for new format job_ids
- jobkey = jobkey[3]
- else: #the minion part job_ids
+ #if the jobkey's lenght is smaller than 4 it means that
+ #that id maybe a minion id that is in timestap-minion format
+ #or maybe a an old id which is in timestap format that handles
+ #both situations
+ if len(jobkey)<4: #the minion part job_ids
jobkey = jobkey[0]
+ #if the job is equal or bigger than 4 that means that it is a new type id
+ #which is in glob-module-method-timestamp format, in a perfect world the lenght
+ #of the jobkey should be exactly 4 but in some situations we have bigger lenghts
+ #anyway that control will hande all situation because only we need is the timestamp
+ #member which is the last one
+ else:
+ jobkey = jobkey[len(jobkey)-1]
create_time = float(jobkey)
if nowtime - create_time > RETAIN_INTERVAL:
@@ -81,7 +92,9 @@ def __get_open_ids(storage):
#TOBE REMOVED that control is for old job_ids
#some users who will upgrade to new version will have errors
#if we dont have that control here :)
- if len(job_id.split("-"))==4: #ignore the old job_ids
+ if len(job_id.split("-"))>=4: #ignore the old job_ids the overlord part
+ result_hash_pack[job_id]=result[0]
+ elif len(job_id.split("-"))==2: #it seems to be a minion side id and also ignores old ids
result_hash_pack[job_id]=result[0]
return result_hash_pack
@@ -147,7 +160,8 @@ def batch_run(pool, callback, nforks,**extra_args):
operation will be created in cachedir and subsequently deleted.
"""
- job_id = "".join([extra_args['spec'],"-",extra_args['module'],"-",extra_args['method'],"-",pprint.pformat(time.time())])
+ job_id = utils.get_formated_jobid(**extra_args)
+
__update_status(job_id, JOB_ID_RUNNING, -1)
pid = os.fork()
if pid != 0:
@@ -232,7 +246,7 @@ def job_status(jobid, client_class=None):
else:
some_missing = True
- if some_missing:
+ if some_missing or not interim_results:
if partial_results:
__update_status(jobid, JOB_ID_PARTIAL, partial_results)
return (JOB_ID_PARTIAL, partial_results)
diff --git a/func/utils.py b/func/utils.py
index 0e4b4d5..628694d 100755
--- a/func/utils.py
+++ b/func/utils.py
@@ -29,4 +29,26 @@ def is_error(result):
return False
-
+def remove_weird_chars(dirty_word):
+ """
+ That method will be used to clean some
+ glob adress expressions because async stuff
+ depends on that part
+
+ @param dirty_word : word to be cleaned
+ """
+ from copy import copy
+ copy_word = copy(dirty_word)
+ copy_word = copy_word.replace("-","_")
+ return copy_word
+
+def get_formated_jobid(**id_pack):
+ import time
+ import pprint
+
+ glob = remove_weird_chars(id_pack['spec'])
+ module = remove_weird_chars(id_pack['module'])
+ method = remove_weird_chars(id_pack['method'])
+ job_id = "".join([glob,"-",module,"-",method,"-",pprint.pformat(time.time())])
+ return job_id
+