summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-01-13 15:10:48 -0500
committerMichael DeHaan <mdehaan@redhat.com>2008-01-13 15:10:48 -0500
commite87e62d301dbd8fe99ac74be7a38009b2e3748cb (patch)
treebd125c5daac096a7887ab0a12a6a9ea4633a40ca
parent8c4a154f196383f6d0969934e10641e83ac51af4 (diff)
downloadfunc-e87e62d301dbd8fe99ac74be7a38009b2e3748cb.tar.gz
func-e87e62d301dbd8fe99ac74be7a38009b2e3748cb.tar.xz
func-e87e62d301dbd8fe99ac74be7a38009b2e3748cb.zip
Add jobthing, which is our async job engine. It's still in progress
and very much a prototype that isn't expected to work yet, but you get the idea.
-rwxr-xr-xfunc/minion/utils.py5
-rwxr-xr-xfunc/overlord/client.py23
-rw-r--r--func/overlord/forkbomb.py3
-rw-r--r--func/overlord/jobthing.py99
4 files changed, 121 insertions, 9 deletions
diff --git a/func/minion/utils.py b/func/minion/utils.py
index 7599657..723bd85 100755
--- a/func/minion/utils.py
+++ b/func/minion/utils.py
@@ -37,7 +37,10 @@ def get_hostname():
# for the certmaster for now
hostname = None
hostname = socket.gethostname()
- ip = socket.gethostbyname(hostname)
+ try:
+ ip = socket.gethostbyname(hostname)
+ except:
+ return hostname
if ip != "127.0.0.1":
return hostname
diff --git a/func/overlord/client.py b/func/overlord/client.py
index c6d8ab6..5eb6ef0 100755
--- a/func/overlord/client.py
+++ b/func/overlord/client.py
@@ -27,6 +27,7 @@ import sslclient
import command
import forkbomb
+import jobthing
# ===================================
# defaults
@@ -114,7 +115,7 @@ def isServer(server_string):
class Client(object):
def __init__(self, server_spec, port=DEFAULT_PORT, interactive=False,
- verbose=False, noglobs=False, nforks=1, config=None):
+ verbose=False, noglobs=False, nforks=1, async=False, config=None):
"""
Constructor.
@server_spec -- something like "*.example.org" or "foosball"
@@ -134,6 +135,7 @@ class Client(object):
self.interactive = interactive
self.noglobs = noglobs
self.nforks = nforks
+ self.async = async
self.servers = expand_servers(self.server_spec, port=self.port, noglobs=self.noglobs,verbose=self.verbose)
@@ -163,6 +165,14 @@ class Client(object):
# -----------------------------------------------
+ def job_status(self, jobid):
+ """
+ Use this to acquire status from jobs when using run with async client handles
+ """
+ return jobthing.job_status(jobid)
+
+ # -----------------------------------------------
+
def run(self, module, method, args, nforks=1):
"""
Invoke a remote method on one or more servers.
@@ -174,8 +184,6 @@ class Client(object):
just a single value, not a hash.
"""
-
-
results = {}
def process_server(bucketnumber, buckets, server):
@@ -210,14 +218,15 @@ class Client(object):
left = server.rfind("/")+1
right = server.rfind(":")
server_name = server[left:right]
- # TEST (changed for integration with forkbomb)
- # results[server_name] = retval
return (server_name, retval)
if not self.noglobs:
- if self.nforks > 1:
+ if self.nforks > 1 or self.async:
# using forkbomb module to distribute job over multiple threads
- results = forkbomb.batch_run(self.servers, process_server,nforks)
+ if not self.async:
+ results = forkbomb.batch_run(self.servers, process_server, nforks)
+ else:
+ results = jobthing.batch_run(self.servers, process_server, nforks)
else:
# no need to go through the fork code, we can do this directly
results = {}
diff --git a/func/overlord/forkbomb.py b/func/overlord/forkbomb.py
index 1d1f5ac..8d7fb31 100644
--- a/func/overlord/forkbomb.py
+++ b/func/overlord/forkbomb.py
@@ -78,6 +78,7 @@ def __bucketize(pool, slots):
if not buckets.has_key(slot):
buckets[slot] = []
buckets[slot].append(key)
+ # print "DEBUG: buckets: %s" % buckets
return buckets
def __with_my_bucket(bucket_number,buckets,what_to_do,filename):
@@ -131,7 +132,7 @@ def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR):
shelf_file = __get_storage("~/.func")
__access_buckets(shelf_file,True,None)
buckets = __bucketize(pool, nforks)
- __forkbomb(0,buckets,callback,shelf_file)
+ __forkbomb(1,buckets,callback,shelf_file)
rc = __access_buckets(shelf_file,False,None)
os.remove(shelf_file)
return rc
diff --git a/func/overlord/jobthing.py b/func/overlord/jobthing.py
new file mode 100644
index 0000000..57703e6
--- /dev/null
+++ b/func/overlord/jobthing.py
@@ -0,0 +1,99 @@
+# forkbomb is a module that partitions arbitrary workloads
+# among N seperate forks, for a configurable N, and
+# collates results upon return, as if it never forked.
+#
+# Copyright 2007, Red Hat, Inc
+# Michael DeHaan <mdehaan@redhat.com>
+#
+# This software may be freely redistributed under the terms of the GNU
+# general public license.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+
+import os
+import random # for testing only
+import time # for testing only
+import shelve
+import bsddb
+import sys
+import tempfile
+import fcntl
+import forkbomb
+
+JOB_ID_RUNNING = 0
+JOB_ID_FINISHED = 1
+JOB_ID_LOST_IN_SPACE = 2
+
+DEFAULT_CACHE_DIR = "~/.func"
+
+def __update_status(jobid, status, results, clear=False):
+ return __access_status(jobid=jobid, status=status, results=results, write=True)
+
+def __get_status(jobid):
+ return __access_status(jobid=jobid, write=False)
+
+def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
+ dir = os.path.expanduser("~/.func")
+ if not os.path.exists(dir):
+ os.makedirs(dir)
+ filename = os.path.join(dir,"status")
+
+ internal_db = bsddb.btopen(filename, 'c', 0644 )
+ handle = open(filename,"r")
+ fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
+ storage = shelve.BsdDbShelf(internal_db)
+
+ if clear:
+ storage.clear()
+ storage.close()
+ fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
+ return {}
+
+ if not storage.has_key("data"):
+ storage["data"] = {}
+
+ # FIXME: the jobid is the time of the job, so deleting jobs
+ # that are older than a set time would be a very good idea.
+
+ if write:
+ storage["data"][jobid] = (status, results)
+ rc = jobid
+ else:
+ if storage["data"].has_key(jobid):
+ # tuple of (status, results)
+ rc = storage["data"][jobid]
+ else:
+ rc = (JOB_ID_LOST_IN_SPACE, 0)
+
+ storage.close()
+ fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
+
+ return rc
+
+def batch_run(server, process_server, nforks):
+ """
+ Given an array of items (pool), call callback in each one, but divide
+ the workload over nfork forks. Temporary files used during the
+ operation will be created in cachedir and subsequently deleted.
+ """
+
+ job_id = time.time()
+ pid = os.fork()
+ if pid != 0:
+ __update_status(job_id, JOB_ID_RUNNING, -1)
+ return job_id
+ else:
+ __update_status(job_id, JOB_ID_RUNNING, -1)
+ results = forkbomb.batch_run(server, process_server, nforks)
+ __update_status(job_id, JOB_ID_FINISHED, results)
+ sys.exit(0)
+
+def job_status(jobid):
+ return __get_status(jobid)
+
+if __name__ == "__main__":
+ __test()
+
+