diff options
author | Adrian Likins <alikins@grimlock.devel.redhat.com> | 2008-01-23 15:56:02 -0500 |
---|---|---|
committer | Adrian Likins <alikins@grimlock.devel.redhat.com> | 2008-01-23 15:56:02 -0500 |
commit | 461fe996412ee57f49ccc1a265ac813ba14f783d (patch) | |
tree | 0e25c10d1e26b29e42c3ef0ffe97907e38eda7f4 /func/overlord | |
parent | 86195b5602040be63acda253b7f83541dc38d0b6 (diff) | |
parent | 19e4d1ba808cbbe95568271a5b0075b8422e4fb6 (diff) | |
download | func-461fe996412ee57f49ccc1a265ac813ba14f783d.tar.gz func-461fe996412ee57f49ccc1a265ac813ba14f783d.tar.xz func-461fe996412ee57f49ccc1a265ac813ba14f783d.zip |
Merge branch 'master' of ssh://git.fedoraproject.org/git/hosted/func
Conflicts:
func/overlord/client.py
Diffstat (limited to 'func/overlord')
-rwxr-xr-x | func/overlord/client.py | 15 | ||||
-rw-r--r-- | func/overlord/forkbomb.py | 152 | ||||
-rw-r--r-- | func/overlord/jobthing.py | 118 |
3 files changed, 12 insertions, 273 deletions
diff --git a/func/overlord/client.py b/func/overlord/client.py index c5358bb..a848cb8 100755 --- a/func/overlord/client.py +++ b/func/overlord/client.py @@ -22,9 +22,9 @@ from func.config import read_config, CONFIG_FILE import sslclient import command -import forkbomb import groups -import jobthing +import func.forkbomb as forkbomb +import func.jobthing as jobthing # =================================== @@ -187,7 +187,7 @@ class Client(object): """ Use this to acquire status from jobs when using run with async client handles """ - return jobthing.job_status(jobid) + return jobthing.job_status(jobid, client_class=Client) # ----------------------------------------------- @@ -221,7 +221,16 @@ class Client(object): # we can't call "call" on s, since thats a rpc, so # we call gettatr around it. meth = "%s.%s" % (module, method) + + # async calling signature has an "imaginary" prefix + # so async.abc.def does abc.def as a background task. + # see Wiki docs for details + if self.async: + meth = "async.%s" % meth + + # this is the point at which we make the remote call. retval = getattr(conn, meth)(*args[:]) + if self.interactive: print retval except Exception, e: diff --git a/func/overlord/forkbomb.py b/func/overlord/forkbomb.py deleted file mode 100644 index c30cc9e..0000000 --- a/func/overlord/forkbomb.py +++ /dev/null @@ -1,152 +0,0 @@ -# forkbomb is a module that partitions arbitrary workloads -# among N seperate forks, for a configurable N, and -# collates results upon return, as if it never forked. -# -# Copyright 2007, Red Hat, Inc -# Michael DeHaan <mdehaan@redhat.com> -# -# This software may be freely redistributed under the terms of the GNU -# general public license. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import os -import random # for testing only -import time # for testing only -import shelve -import bsddb -import sys -import tempfile -import fcntl - -DEFAULT_FORKS = 4 -DEFAULT_CACHE_DIR = "/var/lib/func" - -def __get_storage(dir): - """ - Return a tempfile we can use for storing data. - """ - dir = os.path.expanduser(dir) - if not os.path.exists(dir): - os.makedirs(dir) - return tempfile.mktemp(suffix='', prefix='asynctmp', dir=dir) - -def __access_buckets(filename,clear,new_key=None,new_value=None): - """ - Access data in forkbomb cache, potentially clearing or - modifying it as required. - """ - - internal_db = bsddb.btopen(filename, 'c', 0644 ) - handle = open(filename,"r") - fcntl.flock(handle.fileno(), fcntl.LOCK_EX) - storage = shelve.BsdDbShelf(internal_db) - - if clear: - storage.clear() - storage.close() - fcntl.flock(handle.fileno(), fcntl.LOCK_UN) - return {} - - if not storage.has_key("data"): - storage["data"] = {} - - if new_key is not None: - # bsdb is a bit weird about this - newish = storage["data"].copy() - newish[new_key] = new_value - storage["data"] = newish - - rc = storage["data"].copy() - storage.close() - fcntl.flock(handle.fileno(), fcntl.LOCK_UN) - - return rc - -def __bucketize(pool, slots): - """ - Given a pre-existing list of X number of tasks, partition - them into a hash of Y number of slots. - """ - buckets = {} - count = 0 - # print "DEBUG: slots: %s" % slots - for key in pool: - count = count + 1 - slot = count % slots - if not buckets.has_key(slot): - buckets[slot] = [] - buckets[slot].append(key) - # print "DEBUG: buckets: %s" % buckets - return buckets - -def __with_my_bucket(bucket_number,buckets,what_to_do,filename): - """ - Process all tasks assigned to a given fork, and save - them in the shelf. - """ - things_in_my_bucket = buckets[bucket_number] - results = {} - for thing in things_in_my_bucket: - (nkey,nvalue) = what_to_do(bucket_number,buckets,thing) - __access_buckets(filename,False,nkey,nvalue) - -def __forkbomb(mybucket,buckets,what_to_do,filename): - """ - Recursive function to spawn of a lot of worker forks. - """ - nbuckets = len(buckets) - pid = os.fork() - if pid != 0: - if mybucket < (nbuckets-1): - __forkbomb(mybucket+1,buckets,what_to_do,filename) - try: - os.waitpid(pid,0) - except OSError, ose: - if ose.errno == 10: - pass - else: - raise ose - else: - __with_my_bucket(mybucket,buckets,what_to_do,filename) - sys.exit(0) - -def __demo(bucket_number, buckets, my_item): - """ - This is a demo handler for test purposes. - It just multiplies all numbers by 1000, but slowly. - """ - print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item) - # just to verify forks are not sequential - sleep = random.randrange(0,4) - time.sleep(sleep) - return (my_item, my_item * 1000) - -def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR): - """ - Given an array of items (pool), call callback in each one, but divide - the workload over nfork forks. Temporary files used during the - operation will be created in cachedir and subsequently deleted. - """ - if nforks <= 1: - # modulus voodoo gets crazy otherwise and bad things happen - nforks = 2 - shelf_file = __get_storage(cachedir) - __access_buckets(shelf_file,True,None) - buckets = __bucketize(pool, nforks) - # print "DEBUG: buckets: %s" % buckets - __forkbomb(1,buckets,callback,shelf_file) - rc = __access_buckets(shelf_file,False,None) - os.remove(shelf_file) - return rc - -def __test(nforks=4,sample_size=20): - pool = xrange(0,sample_size) - print batch_run(pool,__demo,nforks=nforks) - -if __name__ == "__main__": - __test() - - diff --git a/func/overlord/jobthing.py b/func/overlord/jobthing.py deleted file mode 100644 index e405616..0000000 --- a/func/overlord/jobthing.py +++ /dev/null @@ -1,118 +0,0 @@ -# jobthing is a module that allows for background execution of a task, and -# getting status of that task. The ultimate goal is to allow ajaxyness -# of GUI apps using Func, and also for extremely long running tasks that -# we don't want to block on as called by scripts using the FunC API. The -# CLI should not use this. -# -# Copyright 2007, Red Hat, Inc -# Michael DeHaan <mdehaan@redhat.com> -# -# This software may be freely redistributed under the terms of the GNU -# general public license. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import os -import random # for testing only -import time # for testing only -import shelve -import bsddb -import sys -import tempfile -import fcntl -import forkbomb - -JOB_ID_RUNNING = 0 -JOB_ID_FINISHED = 1 -JOB_ID_LOST_IN_SPACE = 2 - -# how long to retain old job records in the job id database -RETAIN_INTERVAL = 60 * 60 - -# where to store the internal job id database -CACHE_DIR = "/var/lib/func" - -def __update_status(jobid, status, results, clear=False): - return __access_status(jobid=jobid, status=status, results=results, write=True) - -def __get_status(jobid): - return __access_status(jobid=jobid, write=False) - - -def __purge_old_jobs(storage): - """ - Deletes jobs older than RETAIN_INTERVAL seconds. - MINOR FIXME: this probably should be a more intelligent algorithm that only - deletes jobs if the database is too big and then only the oldest jobs - but this will work just as well. - """ - nowtime = time.time() - for x in storage.keys(): - create_time = float(x) - if nowtime - create_time > RETAIN_INTERVAL: - del storage[x] - -def __access_status(jobid=0, status=0, results=0, clear=False, write=False): - - dir = os.path.expanduser(CACHE_DIR) - if not os.path.exists(dir): - os.makedirs(dir) - filename = os.path.join(dir,"status-%s" % os.getuid()) - - internal_db = bsddb.btopen(filename, 'c', 0644 ) - handle = open(filename,"r") - fcntl.flock(handle.fileno(), fcntl.LOCK_EX) - storage = shelve.BsdDbShelf(internal_db) - - if clear: - storage.clear() - storage.close() - fcntl.flock(handle.fileno(), fcntl.LOCK_UN) - return {} - - if write: - __purge_old_jobs(storage) - storage[str(jobid)] = (status, results) - rc = jobid - else: - if storage.has_key(str(jobid)): - # tuple of (status, results) - rc = storage[str(jobid)] - else: - rc = (JOB_ID_LOST_IN_SPACE, 0) - - storage.close() - fcntl.flock(handle.fileno(), fcntl.LOCK_UN) - - return rc - -def batch_run(server, process_server, nforks): - """ - Given an array of items (pool), call callback in each one, but divide - the workload over nfork forks. Temporary files used during the - operation will be created in cachedir and subsequently deleted. - """ - - job_id = time.time() - pid = os.fork() - if pid != 0: - #print "DEBUG: UPDATE STATUS: r1: %s" % job_id - __update_status(job_id, JOB_ID_RUNNING, -1) - return job_id - else: - #print "DEBUG: UPDATE STATUS: r2: %s" % job_id - __update_status(job_id, JOB_ID_RUNNING, -1) - results = forkbomb.batch_run(server, process_server, nforks) - #print "DEBUG: UPDATE STATUS: f1: %s" % job_id - __update_status(job_id, JOB_ID_FINISHED, results) - sys.exit(0) - -def job_status(jobid): - return __get_status(jobid) - -if __name__ == "__main__": - __test() - - |