summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-06-10 14:17:38 -0400
committerMichael DeHaan <mdehaan@redhat.com>2008-06-10 14:17:38 -0400
commit76996c8ac3016389fdafb47e7791f2c636e2a4ed (patch)
treed244b1655283d498d5228565eea98e32d6d06355
parent59fb48805b4a9e1ebe088d0b8d5853dd3af8d5b1 (diff)
parent3750980ea547c73a63cb456a85be0b14cc36c30c (diff)
downloadthird_party-func-76996c8ac3016389fdafb47e7791f2c636e2a4ed.tar.gz
third_party-func-76996c8ac3016389fdafb47e7791f2c636e2a4ed.tar.xz
third_party-func-76996c8ac3016389fdafb47e7791f2c636e2a4ed.zip
Merge branch 'kadamski-async2'
Conflicts: func/jobthing.py
-rw-r--r--func/forkbomb.py17
-rw-r--r--func/jobthing.py30
-rw-r--r--func/overlord/base_command.py3
-rw-r--r--func/overlord/cmd_modules/call.py4
-rw-r--r--test/async_test.py4
5 files changed, 35 insertions, 23 deletions
diff --git a/func/forkbomb.py b/func/forkbomb.py
index 7cc9df3..ef0817a 100644
--- a/func/forkbomb.py
+++ b/func/forkbomb.py
@@ -16,7 +16,7 @@ import os
import random # for testing only
import time # for testing only
import shelve
-import bsddb
+import dbm
import sys
import tempfile
import fcntl
@@ -39,10 +39,10 @@ def __access_buckets(filename,clear,new_key=None,new_value=None):
modifying it as required.
"""
- internal_db = bsddb.btopen(filename, 'c', 0644 )
- handle = open(filename,"r")
+ handle = open(filename,"w")
fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
- storage = shelve.BsdDbShelf(internal_db)
+ internal_db = dbm.open(filename, 'c', 0644 )
+ storage = shelve.Shelf(internal_db)
if clear:
storage.clear()
@@ -138,7 +138,14 @@ def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR):
buckets = __bucketize(pool, nforks)
__forkbomb(0,buckets,callback,shelf_file)
rc = __access_buckets(shelf_file,False,None)
- os.remove(shelf_file)
+
+ try: #it's only cleanup so don't care if the files disapeared
+ os.remove(shelf_file)
+ os.remove(shelf_file+".pag")
+ os.remove(shelf_file+".dir")
+ except OSError:
+ pass
+
return rc
def __test(nforks=4,sample_size=20):
diff --git a/func/jobthing.py b/func/jobthing.py
index 6024274..93c26f0 100644
--- a/func/jobthing.py
+++ b/func/jobthing.py
@@ -18,7 +18,7 @@ import os
import random # for testing only
import time # for testing only
import shelve
-import bsddb
+import dbm
import sys
import fcntl
import forkbomb
@@ -68,10 +68,10 @@ def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purg
os.makedirs(dir)
filename = os.path.join(dir,"status-%s" % os.getuid())
- internal_db = bsddb.btopen(filename, 'c', 0644 )
- handle = open(filename,"r")
+ handle = open(filename,"w")
fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
- storage = shelve.BsdDbShelf(internal_db)
+ internal_db = dbm.open(filename, 'c', 0644 )
+ storage = shelve.Shelf(internal_db)
if clear:
@@ -118,8 +118,7 @@ def batch_run(pool, callback, nforks):
return job_id
else:
# kick off the job
- # I don't thing it's needed - kaa
- #__update_status(job_id, JOB_ID_RUNNING, -1)
+ __update_status(job_id, JOB_ID_RUNNING, -1)
results = forkbomb.batch_run(pool, callback, nforks)
# we now have a list of job id's for each minion, kill the task
@@ -135,14 +134,19 @@ def minion_async_run(retriever, method, args):
job_id = "%s-minion" % time.time()
- signal.signal(signal.SIGCHLD, 0)
+ __update_status(job_id, JOB_ID_RUNNING, -1)
pid = os.fork()
if pid != 0:
- __update_status(job_id, JOB_ID_RUNNING, -1)
+ os.waitpid(pid, 0)
return job_id
else:
- # I don't thing it's needed - kaa
- #__update_status(job_id, JOB_ID_RUNNING, -1)
+ # daemonize!
+ os.umask(077)
+ os.chdir('/')
+ os.setsid()
+ if os.fork():
+ os._exit(0)
+
try:
function_ref = retriever(method)
rc = function_ref(*args)
@@ -151,7 +155,7 @@ def minion_async_run(retriever, method, args):
rc = utils.nice_exception(t,v,tb)
__update_status(job_id, JOB_ID_FINISHED, rc)
- sys.exit(0)
+ os._exit(0)
def job_status(jobid, client_class=None):
@@ -180,9 +184,9 @@ def job_status(jobid, client_class=None):
client = client_class(host, noglobs=True, async=False)
minion_result = client.jobs.job_status(minion_job)
- if type(minion_result) != tuple:
+ if type(minion_result) != list or len(minion_result)!=2:
minion_interim_rc = JOB_ID_REMOTE_ERROR
- minion_interim_result = minion_result
+ minion_interim_result = minion_result[:3]
else:
(minion_interim_rc, minion_interim_result) = minion_result
diff --git a/func/overlord/base_command.py b/func/overlord/base_command.py
index bacb005..8f16eca 100644
--- a/func/overlord/base_command.py
+++ b/func/overlord/base_command.py
@@ -29,4 +29,5 @@ class BaseCommand(command.Command):
interactive=self.interactive,
verbose=self.verbose,
config=self.config,
- async=self.options.async)
+ async=self.options.async,
+ nforks=self.options.forks)
diff --git a/func/overlord/cmd_modules/call.py b/func/overlord/cmd_modules/call.py
index edf9f38..bb9a61e 100644
--- a/func/overlord/cmd_modules/call.py
+++ b/func/overlord/cmd_modules/call.py
@@ -146,11 +146,11 @@ class Call(base_command.BaseCommand):
(return_code, async_results) = self.overlord_obj.job_status(results)
if return_code == jobthing.JOB_ID_RUNNING:
time.sleep(0.1)
- elif return_code == jobthing.JOB_ID_ASYNC_FINISHED:
+ elif return_code == jobthing.JOB_ID_FINISHED:
async_done = True
partial = self.print_partial_results(partial, async_results, self.options.sort)
return partial
- elif return_code == jobthing.JOB_ID_ASYNC_PARTIAL:
+ elif return_code == jobthing.JOB_ID_PARTIAL:
if not self.options.sort:
partial = self.print_partial_results(partial, async_results)
else:
diff --git a/test/async_test.py b/test/async_test.py
index 8e6961d..8e0495f 100644
--- a/test/async_test.py
+++ b/test/async_test.py
@@ -44,12 +44,12 @@ def __tester(async,test):
return
if code == jobthing.JOB_ID_RUNNING:
print "task is still running, %s elapsed ..." % delta
- elif code == jobthing.JOB_ID_ASYNC_PARTIAL:
+ elif code == jobthing.JOB_ID_PARTIAL:
print "task reports partial status, %s elapsed, results = %s" % (delta, results)
elif code == jobthing.JOB_ID_FINISHED:
print "(non-async) task complete, %s elapsed, results = %s" % (delta, results)
return
- elif code == jobthing.JOB_ID_ASYNC_FINISHED:
+ elif code == jobthing.JOB_ID_FINISHED:
print "(async) task complete, %s elapsed, results = %s" % (delta, results)
return
else: