diff options
-rw-r--r-- | func/forkbomb.py | 17 | ||||
-rw-r--r-- | func/jobthing.py | 30 | ||||
-rw-r--r-- | func/overlord/base_command.py | 3 | ||||
-rw-r--r-- | func/overlord/cmd_modules/call.py | 4 | ||||
-rw-r--r-- | test/async_test.py | 4 |
5 files changed, 35 insertions, 23 deletions
diff --git a/func/forkbomb.py b/func/forkbomb.py index 7cc9df3..ef0817a 100644 --- a/func/forkbomb.py +++ b/func/forkbomb.py @@ -16,7 +16,7 @@ import os import random # for testing only import time # for testing only import shelve -import bsddb +import dbm import sys import tempfile import fcntl @@ -39,10 +39,10 @@ def __access_buckets(filename,clear,new_key=None,new_value=None): modifying it as required. """ - internal_db = bsddb.btopen(filename, 'c', 0644 ) - handle = open(filename,"r") + handle = open(filename,"w") fcntl.flock(handle.fileno(), fcntl.LOCK_EX) - storage = shelve.BsdDbShelf(internal_db) + internal_db = dbm.open(filename, 'c', 0644 ) + storage = shelve.Shelf(internal_db) if clear: storage.clear() @@ -138,7 +138,14 @@ def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR): buckets = __bucketize(pool, nforks) __forkbomb(0,buckets,callback,shelf_file) rc = __access_buckets(shelf_file,False,None) - os.remove(shelf_file) + + try: #it's only cleanup so don't care if the files disapeared + os.remove(shelf_file) + os.remove(shelf_file+".pag") + os.remove(shelf_file+".dir") + except OSError: + pass + return rc def __test(nforks=4,sample_size=20): diff --git a/func/jobthing.py b/func/jobthing.py index 6024274..93c26f0 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -18,7 +18,7 @@ import os import random # for testing only import time # for testing only import shelve -import bsddb +import dbm import sys import fcntl import forkbomb @@ -68,10 +68,10 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg os.makedirs(dir) filename = os.path.join(dir,"status-%s" % os.getuid()) - internal_db = bsddb.btopen(filename, 'c', 0644 ) - handle = open(filename,"r") + handle = open(filename,"w") fcntl.flock(handle.fileno(), fcntl.LOCK_EX) - storage = shelve.BsdDbShelf(internal_db) + internal_db = dbm.open(filename, 'c', 0644 ) + storage = shelve.Shelf(internal_db) if clear: @@ -118,8 +118,7 @@ def batch_run(pool, callback, nforks): return job_id else: # kick off the job - # I don't thing it's needed - kaa - #__update_status(job_id, JOB_ID_RUNNING, -1) + __update_status(job_id, JOB_ID_RUNNING, -1) results = forkbomb.batch_run(pool, callback, nforks) # we now have a list of job id's for each minion, kill the task @@ -135,14 +134,19 @@ def minion_async_run(retriever, method, args): job_id = "%s-minion" % time.time() - signal.signal(signal.SIGCHLD, 0) + __update_status(job_id, JOB_ID_RUNNING, -1) pid = os.fork() if pid != 0: - __update_status(job_id, JOB_ID_RUNNING, -1) + os.waitpid(pid, 0) return job_id else: - # I don't thing it's needed - kaa - #__update_status(job_id, JOB_ID_RUNNING, -1) + # daemonize! + os.umask(077) + os.chdir('/') + os.setsid() + if os.fork(): + os._exit(0) + try: function_ref = retriever(method) rc = function_ref(*args) @@ -151,7 +155,7 @@ def minion_async_run(retriever, method, args): rc = utils.nice_exception(t,v,tb) __update_status(job_id, JOB_ID_FINISHED, rc) - sys.exit(0) + os._exit(0) def job_status(jobid, client_class=None): @@ -180,9 +184,9 @@ def job_status(jobid, client_class=None): client = client_class(host, noglobs=True, async=False) minion_result = client.jobs.job_status(minion_job) - if type(minion_result) != tuple: + if type(minion_result) != list or len(minion_result)!=2: minion_interim_rc = JOB_ID_REMOTE_ERROR - minion_interim_result = minion_result + minion_interim_result = minion_result[:3] else: (minion_interim_rc, minion_interim_result) = minion_result diff --git a/func/overlord/base_command.py b/func/overlord/base_command.py index bacb005..8f16eca 100644 --- a/func/overlord/base_command.py +++ b/func/overlord/base_command.py @@ -29,4 +29,5 @@ class BaseCommand(command.Command): interactive=self.interactive, verbose=self.verbose, config=self.config, - async=self.options.async) + async=self.options.async, + nforks=self.options.forks) diff --git a/func/overlord/cmd_modules/call.py b/func/overlord/cmd_modules/call.py index edf9f38..bb9a61e 100644 --- a/func/overlord/cmd_modules/call.py +++ b/func/overlord/cmd_modules/call.py @@ -146,11 +146,11 @@ class Call(base_command.BaseCommand): (return_code, async_results) = self.overlord_obj.job_status(results) if return_code == jobthing.JOB_ID_RUNNING: time.sleep(0.1) - elif return_code == jobthing.JOB_ID_ASYNC_FINISHED: + elif return_code == jobthing.JOB_ID_FINISHED: async_done = True partial = self.print_partial_results(partial, async_results, self.options.sort) return partial - elif return_code == jobthing.JOB_ID_ASYNC_PARTIAL: + elif return_code == jobthing.JOB_ID_PARTIAL: if not self.options.sort: partial = self.print_partial_results(partial, async_results) else: diff --git a/test/async_test.py b/test/async_test.py index 8e6961d..8e0495f 100644 --- a/test/async_test.py +++ b/test/async_test.py @@ -44,12 +44,12 @@ def __tester(async,test): return if code == jobthing.JOB_ID_RUNNING: print "task is still running, %s elapsed ..." % delta - elif code == jobthing.JOB_ID_ASYNC_PARTIAL: + elif code == jobthing.JOB_ID_PARTIAL: print "task reports partial status, %s elapsed, results = %s" % (delta, results) elif code == jobthing.JOB_ID_FINISHED: print "(non-async) task complete, %s elapsed, results = %s" % (delta, results) return - elif code == jobthing.JOB_ID_ASYNC_FINISHED: + elif code == jobthing.JOB_ID_FINISHED: print "(async) task complete, %s elapsed, results = %s" % (delta, results) return else: |