From 3f62e1d302fb63f1f3c5c72121388c8b0dd6e784 Mon Sep 17 00:00:00 2001 From: "Krzysztof A. Adamski" Date: Fri, 6 Jun 2008 18:08:26 -0400 Subject: Fixing "modulus voodoo". --- func/forkbomb.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/func/forkbomb.py b/func/forkbomb.py index b835d3d..7cc9df3 100644 --- a/func/forkbomb.py +++ b/func/forkbomb.py @@ -75,8 +75,8 @@ def __bucketize(pool, slots): buckets = {} count = 0 for key in pool: - count = count + 1 slot = count % slots + count = count + 1 if not buckets.has_key(slot): buckets[slot] = [] buckets[slot].append(key) @@ -130,9 +130,9 @@ def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR): the workload over nfork forks. Temporary files used during the operation will be created in cachedir and subsequently deleted. """ - if nforks <= 1: + if nforks < 1: # modulus voodoo gets crazy otherwise and bad things happen - nforks = 2 + nforks = 1 shelf_file = __get_storage(cachedir) __access_buckets(shelf_file,True,None) buckets = __bucketize(pool, nforks) -- cgit From 2d23caf2f9e96022dd49781bc00c351dcab7c10c Mon Sep 17 00:00:00 2001 From: "Krzysztof A. Adamski" Date: Fri, 6 Jun 2008 18:14:01 -0400 Subject: Jobthing status codes cleanup. --- func/jobthing.py | 13 ++++++------- func/overlord/cmd_modules/call.py | 4 ++-- test/async_test.py | 4 ++-- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/func/jobthing.py b/func/jobthing.py index 75a1d1a..bbc4f89 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -27,8 +27,7 @@ import utils JOB_ID_RUNNING = 0 JOB_ID_FINISHED = 1 JOB_ID_LOST_IN_SPACE = 2 -JOB_ID_ASYNC_PARTIAL = 3 -JOB_ID_ASYNC_FINISHED = 4 +JOB_ID_PARTIAL = 3 # how long to retain old job records in the job id database RETAIN_INTERVAL = 60 * 60 @@ -121,7 +120,7 @@ def batch_run(server, process_server, nforks): results = forkbomb.batch_run(server, process_server, nforks) # we now have a list of job id's for each minion, kill the task - __update_status(job_id, JOB_ID_ASYNC_PARTIAL, results) + __update_status(job_id, JOB_ID_PARTIAL, results) sys.exit(0) def minion_async_run(retriever, method, args): @@ -158,13 +157,13 @@ def job_status(jobid, client_class=None): got_status = __get_status(jobid) - # if the status comes back as JOB_ID_ASYNC_PARTIAL what we have is actually a hash + # if the status comes back as JOB_ID_PARTIAL what we have is actually a hash # of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them # for their actual status, filling in only the ones that are actually done. (interim_rc, interim_results) = got_status - if interim_rc == JOB_ID_ASYNC_PARTIAL: + if interim_rc == JOB_ID_PARTIAL: partial_results = {} @@ -187,9 +186,9 @@ def job_status(jobid, client_class=None): some_missing = True if some_missing: - return (JOB_ID_ASYNC_PARTIAL, partial_results) + return (JOB_ID_PARTIAL, partial_results) else: - return (JOB_ID_ASYNC_FINISHED, partial_results) + return (JOB_ID_FINISHED, partial_results) else: return got_status diff --git a/func/overlord/cmd_modules/call.py b/func/overlord/cmd_modules/call.py index edf9f38..bb9a61e 100644 --- a/func/overlord/cmd_modules/call.py +++ b/func/overlord/cmd_modules/call.py @@ -146,11 +146,11 @@ class Call(base_command.BaseCommand): (return_code, async_results) = self.overlord_obj.job_status(results) if return_code == jobthing.JOB_ID_RUNNING: time.sleep(0.1) - elif return_code == jobthing.JOB_ID_ASYNC_FINISHED: + elif return_code == jobthing.JOB_ID_FINISHED: async_done = True partial = self.print_partial_results(partial, async_results, self.options.sort) return partial - elif return_code == jobthing.JOB_ID_ASYNC_PARTIAL: + elif return_code == jobthing.JOB_ID_PARTIAL: if not self.options.sort: partial = self.print_partial_results(partial, async_results) else: diff --git a/test/async_test.py b/test/async_test.py index 8e6961d..8e0495f 100644 --- a/test/async_test.py +++ b/test/async_test.py @@ -44,12 +44,12 @@ def __tester(async,test): return if code == jobthing.JOB_ID_RUNNING: print "task is still running, %s elapsed ..." % delta - elif code == jobthing.JOB_ID_ASYNC_PARTIAL: + elif code == jobthing.JOB_ID_PARTIAL: print "task reports partial status, %s elapsed, results = %s" % (delta, results) elif code == jobthing.JOB_ID_FINISHED: print "(non-async) task complete, %s elapsed, results = %s" % (delta, results) return - elif code == jobthing.JOB_ID_ASYNC_FINISHED: + elif code == jobthing.JOB_ID_FINISHED: print "(async) task complete, %s elapsed, results = %s" % (delta, results) return else: -- cgit From 58439ecd3433434b8fa31821619fec8e5bbeda27 Mon Sep 17 00:00:00 2001 From: "Krzysztof A. Adamski" Date: Fri, 6 Jun 2008 18:17:50 -0400 Subject: Cleanup: Change parameter names to match comments and similar funtion in forkbomb. --- func/jobthing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/func/jobthing.py b/func/jobthing.py index bbc4f89..c7eb179 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -99,7 +99,7 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg return rc -def batch_run(server, process_server, nforks): +def batch_run(pool, callback, nforks): """ This is the method used by the overlord side usage of jobthing. Minion side usage will use minion_async_run instead. @@ -117,7 +117,7 @@ def batch_run(server, process_server, nforks): else: # kick off the job __update_status(job_id, JOB_ID_RUNNING, -1) - results = forkbomb.batch_run(server, process_server, nforks) + results = forkbomb.batch_run(pool, callback, nforks) # we now have a list of job id's for each minion, kill the task __update_status(job_id, JOB_ID_PARTIAL, results) -- cgit From 9acec083713038365e804464cfaf7c22a405d0a0 Mon Sep 17 00:00:00 2001 From: "Krzysztof A. Adamski" Date: Fri, 6 Jun 2008 18:18:32 -0400 Subject: Cleanup: Remove unneeded print statement. --- func/overlord/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/func/overlord/client.py b/func/overlord/client.py index 299fb5d..fb436ed 100755 --- a/func/overlord/client.py +++ b/func/overlord/client.py @@ -321,7 +321,7 @@ class Overlord(object): # expanded = expand_servers(self.server_spec, port=self.port, noglobs=True, verbose=self.verbose)[0] expanded_minions = Minions(self.server_spec, port=self.port, noglobs=True, verbose=self.verbose) minions = expanded_minions.get_urls()[0] - print minions +# print minions results = process_server(0, 0, minions) return results -- cgit From bb80b6e6f10bd97b2926f447e913d8249592c32d Mon Sep 17 00:00:00 2001 From: "Krzysztof A. Adamski" Date: Sat, 7 Jun 2008 15:03:27 -0400 Subject: Change bsddb which is leaking on new python versions. --- func/forkbomb.py | 17 ++++++++++++----- func/jobthing.py | 8 ++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/func/forkbomb.py b/func/forkbomb.py index 7cc9df3..ef0817a 100644 --- a/func/forkbomb.py +++ b/func/forkbomb.py @@ -16,7 +16,7 @@ import os import random # for testing only import time # for testing only import shelve -import bsddb +import dbm import sys import tempfile import fcntl @@ -39,10 +39,10 @@ def __access_buckets(filename,clear,new_key=None,new_value=None): modifying it as required. """ - internal_db = bsddb.btopen(filename, 'c', 0644 ) - handle = open(filename,"r") + handle = open(filename,"w") fcntl.flock(handle.fileno(), fcntl.LOCK_EX) - storage = shelve.BsdDbShelf(internal_db) + internal_db = dbm.open(filename, 'c', 0644 ) + storage = shelve.Shelf(internal_db) if clear: storage.clear() @@ -138,7 +138,14 @@ def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR): buckets = __bucketize(pool, nforks) __forkbomb(0,buckets,callback,shelf_file) rc = __access_buckets(shelf_file,False,None) - os.remove(shelf_file) + + try: #it's only cleanup so don't care if the files disapeared + os.remove(shelf_file) + os.remove(shelf_file+".pag") + os.remove(shelf_file+".dir") + except OSError: + pass + return rc def __test(nforks=4,sample_size=20): diff --git a/func/jobthing.py b/func/jobthing.py index c7eb179..aa801bc 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -18,7 +18,7 @@ import os import random # for testing only import time # for testing only import shelve -import bsddb +import dbm import sys import fcntl import forkbomb @@ -66,10 +66,10 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg os.makedirs(dir) filename = os.path.join(dir,"status-%s" % os.getuid()) - internal_db = bsddb.btopen(filename, 'c', 0644 ) - handle = open(filename,"r") + handle = open(filename,"w") fcntl.flock(handle.fileno(), fcntl.LOCK_EX) - storage = shelve.BsdDbShelf(internal_db) + internal_db = dbm.open(filename, 'c', 0644 ) + storage = shelve.Shelf(internal_db) if clear: -- cgit From dee316020aaf6e70c909ea177adf2742a33e2b7b Mon Sep 17 00:00:00 2001 From: "Krzysztof A. Adamski" Date: Sat, 7 Jun 2008 15:04:09 -0400 Subject: Daemonize async jobs on minion. --- func/jobthing.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/func/jobthing.py b/func/jobthing.py index aa801bc..033368c 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -135,9 +135,18 @@ def minion_async_run(retriever, method, args): pid = os.fork() if pid != 0: __update_status(job_id, JOB_ID_RUNNING, -1) + os.waitpid(pid, 0) return job_id else: __update_status(job_id, JOB_ID_RUNNING, -1) + + # daemonize! + os.umask(077) + os.chdir('/') + os.setsid() + if os.fork(): + os._exit(0) + try: function_ref = retriever(method) rc = function_ref(*args) @@ -146,7 +155,7 @@ def minion_async_run(retriever, method, args): rc = utils.nice_exception(t,v,tb) __update_status(job_id, JOB_ID_FINISHED, rc) - sys.exit(0) + os._exit(0) def job_status(jobid, client_class=None): -- cgit From 721fa16f64627fcd5b0d0b5c4f9d9e382729bfa0 Mon Sep 17 00:00:00 2001 From: "Krzysztof A. Adamski" Date: Sat, 7 Jun 2008 15:04:52 -0400 Subject: Eliminate concurency between parent and child which could override job status. --- func/jobthing.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/func/jobthing.py b/func/jobthing.py index 033368c..0e613f8 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -132,14 +132,12 @@ def minion_async_run(retriever, method, args): job_id = "%s-minion" % time.time() + __update_status(job_id, JOB_ID_RUNNING, -1) pid = os.fork() if pid != 0: - __update_status(job_id, JOB_ID_RUNNING, -1) os.waitpid(pid, 0) return job_id else: - __update_status(job_id, JOB_ID_RUNNING, -1) - # daemonize! os.umask(077) os.chdir('/') -- cgit From fdabc175f9ef5ee9d1e6887409be58dd44cf661d Mon Sep 17 00:00:00 2001 From: "Krzysztof A. Adamski" Date: Sat, 7 Jun 2008 16:12:03 -0400 Subject: Handle remote errors with async calls. --- func/jobthing.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/func/jobthing.py b/func/jobthing.py index 0e613f8..a190dff 100644 --- a/func/jobthing.py +++ b/func/jobthing.py @@ -28,6 +28,7 @@ JOB_ID_RUNNING = 0 JOB_ID_FINISHED = 1 JOB_ID_LOST_IN_SPACE = 2 JOB_ID_PARTIAL = 3 +JOB_ID_REMOTE_ERROR = 4 # how long to retain old job records in the job id database RETAIN_INTERVAL = 60 * 60 @@ -182,7 +183,11 @@ def job_status(jobid, client_class=None): client = client_class(host, noglobs=True, async=False) minion_result = client.jobs.job_status(minion_job) - (minion_interim_rc, minion_interim_result) = minion_result + if type(minion_result) != list or len(minion_result)!=2: + minion_interim_rc = JOB_ID_REMOTE_ERROR + minion_interim_result = minion_result[:3] + else: + (minion_interim_rc, minion_interim_result) = minion_result if minion_interim_rc not in [ JOB_ID_RUNNING ]: if minion_interim_rc in [ JOB_ID_LOST_IN_SPACE ]: -- cgit From 3750980ea547c73a63cb456a85be0b14cc36c30c Mon Sep 17 00:00:00 2001 From: "Krzysztof A. Adamski" Date: Sat, 7 Jun 2008 16:13:01 -0400 Subject: Make --forks command line parameter actually working now :) --- func/overlord/base_command.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/func/overlord/base_command.py b/func/overlord/base_command.py index bacb005..8f16eca 100644 --- a/func/overlord/base_command.py +++ b/func/overlord/base_command.py @@ -29,4 +29,5 @@ class BaseCommand(command.Command): interactive=self.interactive, verbose=self.verbose, config=self.config, - async=self.options.async) + async=self.options.async, + nforks=self.options.forks) -- cgit