diff options
author | Michael DeHaan <mdehaan@redhat.com> | 2008-01-13 15:10:48 -0500 |
---|---|---|
committer | Michael DeHaan <mdehaan@redhat.com> | 2008-01-13 15:10:48 -0500 |
commit | e87e62d301dbd8fe99ac74be7a38009b2e3748cb (patch) | |
tree | bd125c5daac096a7887ab0a12a6a9ea4633a40ca | |
parent | 8c4a154f196383f6d0969934e10641e83ac51af4 (diff) | |
download | func-e87e62d301dbd8fe99ac74be7a38009b2e3748cb.tar.gz func-e87e62d301dbd8fe99ac74be7a38009b2e3748cb.tar.xz func-e87e62d301dbd8fe99ac74be7a38009b2e3748cb.zip |
Add jobthing, which is our async job engine. It's still in progress
and very much a prototype that isn't expected to work yet, but you get
the idea.
-rwxr-xr-x | func/minion/utils.py | 5 | ||||
-rwxr-xr-x | func/overlord/client.py | 23 | ||||
-rw-r--r-- | func/overlord/forkbomb.py | 3 | ||||
-rw-r--r-- | func/overlord/jobthing.py | 99 |
4 files changed, 121 insertions, 9 deletions
diff --git a/func/minion/utils.py b/func/minion/utils.py index 7599657..723bd85 100755 --- a/func/minion/utils.py +++ b/func/minion/utils.py @@ -37,7 +37,10 @@ def get_hostname(): # for the certmaster for now hostname = None hostname = socket.gethostname() - ip = socket.gethostbyname(hostname) + try: + ip = socket.gethostbyname(hostname) + except: + return hostname if ip != "127.0.0.1": return hostname diff --git a/func/overlord/client.py b/func/overlord/client.py index c6d8ab6..5eb6ef0 100755 --- a/func/overlord/client.py +++ b/func/overlord/client.py @@ -27,6 +27,7 @@ import sslclient import command import forkbomb +import jobthing # =================================== # defaults @@ -114,7 +115,7 @@ def isServer(server_string): class Client(object): def __init__(self, server_spec, port=DEFAULT_PORT, interactive=False, - verbose=False, noglobs=False, nforks=1, config=None): + verbose=False, noglobs=False, nforks=1, async=False, config=None): """ Constructor. @server_spec -- something like "*.example.org" or "foosball" @@ -134,6 +135,7 @@ class Client(object): self.interactive = interactive self.noglobs = noglobs self.nforks = nforks + self.async = async self.servers = expand_servers(self.server_spec, port=self.port, noglobs=self.noglobs,verbose=self.verbose) @@ -163,6 +165,14 @@ class Client(object): # ----------------------------------------------- + def job_status(self, jobid): + """ + Use this to acquire status from jobs when using run with async client handles + """ + return jobthing.job_status(jobid) + + # ----------------------------------------------- + def run(self, module, method, args, nforks=1): """ Invoke a remote method on one or more servers. @@ -174,8 +184,6 @@ class Client(object): just a single value, not a hash. """ - - results = {} def process_server(bucketnumber, buckets, server): @@ -210,14 +218,15 @@ class Client(object): left = server.rfind("/")+1 right = server.rfind(":") server_name = server[left:right] - # TEST (changed for integration with forkbomb) - # results[server_name] = retval return (server_name, retval) if not self.noglobs: - if self.nforks > 1: + if self.nforks > 1 or self.async: # using forkbomb module to distribute job over multiple threads - results = forkbomb.batch_run(self.servers, process_server,nforks) + if not self.async: + results = forkbomb.batch_run(self.servers, process_server, nforks) + else: + results = jobthing.batch_run(self.servers, process_server, nforks) else: # no need to go through the fork code, we can do this directly results = {} diff --git a/func/overlord/forkbomb.py b/func/overlord/forkbomb.py index 1d1f5ac..8d7fb31 100644 --- a/func/overlord/forkbomb.py +++ b/func/overlord/forkbomb.py @@ -78,6 +78,7 @@ def __bucketize(pool, slots): if not buckets.has_key(slot): buckets[slot] = [] buckets[slot].append(key) + # print "DEBUG: buckets: %s" % buckets return buckets def __with_my_bucket(bucket_number,buckets,what_to_do,filename): @@ -131,7 +132,7 @@ def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR): shelf_file = __get_storage("~/.func") __access_buckets(shelf_file,True,None) buckets = __bucketize(pool, nforks) - __forkbomb(0,buckets,callback,shelf_file) + __forkbomb(1,buckets,callback,shelf_file) rc = __access_buckets(shelf_file,False,None) os.remove(shelf_file) return rc diff --git a/func/overlord/jobthing.py b/func/overlord/jobthing.py new file mode 100644 index 0000000..57703e6 --- /dev/null +++ b/func/overlord/jobthing.py @@ -0,0 +1,99 @@ +# forkbomb is a module that partitions arbitrary workloads +# among N seperate forks, for a configurable N, and +# collates results upon return, as if it never forked. +# +# Copyright 2007, Red Hat, Inc +# Michael DeHaan <mdehaan@redhat.com> +# +# This software may be freely redistributed under the terms of the GNU +# general public license. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +import os +import random # for testing only +import time # for testing only +import shelve +import bsddb +import sys +import tempfile +import fcntl +import forkbomb + +JOB_ID_RUNNING = 0 +JOB_ID_FINISHED = 1 +JOB_ID_LOST_IN_SPACE = 2 + +DEFAULT_CACHE_DIR = "~/.func" + +def __update_status(jobid, status, results, clear=False): + return __access_status(jobid=jobid, status=status, results=results, write=True) + +def __get_status(jobid): + return __access_status(jobid=jobid, write=False) + +def __access_status(jobid=0, status=0, results=0, clear=False, write=False): + dir = os.path.expanduser("~/.func") + if not os.path.exists(dir): + os.makedirs(dir) + filename = os.path.join(dir,"status") + + internal_db = bsddb.btopen(filename, 'c', 0644 ) + handle = open(filename,"r") + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + storage = shelve.BsdDbShelf(internal_db) + + if clear: + storage.clear() + storage.close() + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + return {} + + if not storage.has_key("data"): + storage["data"] = {} + + # FIXME: the jobid is the time of the job, so deleting jobs + # that are older than a set time would be a very good idea. + + if write: + storage["data"][jobid] = (status, results) + rc = jobid + else: + if storage["data"].has_key(jobid): + # tuple of (status, results) + rc = storage["data"][jobid] + else: + rc = (JOB_ID_LOST_IN_SPACE, 0) + + storage.close() + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + + return rc + +def batch_run(server, process_server, nforks): + """ + Given an array of items (pool), call callback in each one, but divide + the workload over nfork forks. Temporary files used during the + operation will be created in cachedir and subsequently deleted. + """ + + job_id = time.time() + pid = os.fork() + if pid != 0: + __update_status(job_id, JOB_ID_RUNNING, -1) + return job_id + else: + __update_status(job_id, JOB_ID_RUNNING, -1) + results = forkbomb.batch_run(server, process_server, nforks) + __update_status(job_id, JOB_ID_FINISHED, results) + sys.exit(0) + +def job_status(jobid): + return __get_status(jobid) + +if __name__ == "__main__": + __test() + + |