From 504041eb26aba41092aa528e1a724fa5554063d7 Mon Sep 17 00:00:00 2001 From: Michael DeHaan Date: Wed, 23 Jan 2008 12:38:13 -0500 Subject: Moving the async and multiprocess stuff to top level so we can use them on the minion for minion side async funness. --- func/forkbomb.py | 152 +++++++++++++++++++++++++++++++++++++++++++ func/jobthing.py | 121 ++++++++++++++++++++++++++++++++++ func/overlord/client.py | 4 +- func/overlord/forkbomb.py | 152 ------------------------------------------- func/overlord/jobthing.py | 118 --------------------------------- test/async_test.py | 2 +- test/unittest/test_client.py | 3 +- 7 files changed, 278 insertions(+), 274 deletions(-) create mode 100644 func/forkbomb.py create mode 100644 func/jobthing.py delete mode 100644 func/overlord/forkbomb.py delete mode 100644 func/overlord/jobthing.py diff --git a/func/forkbomb.py b/func/forkbomb.py new file mode 100644 index 0000000..c30cc9e --- /dev/null +++ b/func/forkbomb.py @@ -0,0 +1,152 @@ +# forkbomb is a module that partitions arbitrary workloads +# among N seperate forks, for a configurable N, and +# collates results upon return, as if it never forked. +# +# Copyright 2007, Red Hat, Inc +# Michael DeHaan +# +# This software may be freely redistributed under the terms of the GNU +# general public license. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +import os +import random # for testing only +import time # for testing only +import shelve +import bsddb +import sys +import tempfile +import fcntl + +DEFAULT_FORKS = 4 +DEFAULT_CACHE_DIR = "/var/lib/func" + +def __get_storage(dir): + """ + Return a tempfile we can use for storing data. + """ + dir = os.path.expanduser(dir) + if not os.path.exists(dir): + os.makedirs(dir) + return tempfile.mktemp(suffix='', prefix='asynctmp', dir=dir) + +def __access_buckets(filename,clear,new_key=None,new_value=None): + """ + Access data in forkbomb cache, potentially clearing or + modifying it as required. + """ + + internal_db = bsddb.btopen(filename, 'c', 0644 ) + handle = open(filename,"r") + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + storage = shelve.BsdDbShelf(internal_db) + + if clear: + storage.clear() + storage.close() + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + return {} + + if not storage.has_key("data"): + storage["data"] = {} + + if new_key is not None: + # bsdb is a bit weird about this + newish = storage["data"].copy() + newish[new_key] = new_value + storage["data"] = newish + + rc = storage["data"].copy() + storage.close() + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + + return rc + +def __bucketize(pool, slots): + """ + Given a pre-existing list of X number of tasks, partition + them into a hash of Y number of slots. + """ + buckets = {} + count = 0 + # print "DEBUG: slots: %s" % slots + for key in pool: + count = count + 1 + slot = count % slots + if not buckets.has_key(slot): + buckets[slot] = [] + buckets[slot].append(key) + # print "DEBUG: buckets: %s" % buckets + return buckets + +def __with_my_bucket(bucket_number,buckets,what_to_do,filename): + """ + Process all tasks assigned to a given fork, and save + them in the shelf. + """ + things_in_my_bucket = buckets[bucket_number] + results = {} + for thing in things_in_my_bucket: + (nkey,nvalue) = what_to_do(bucket_number,buckets,thing) + __access_buckets(filename,False,nkey,nvalue) + +def __forkbomb(mybucket,buckets,what_to_do,filename): + """ + Recursive function to spawn of a lot of worker forks. + """ + nbuckets = len(buckets) + pid = os.fork() + if pid != 0: + if mybucket < (nbuckets-1): + __forkbomb(mybucket+1,buckets,what_to_do,filename) + try: + os.waitpid(pid,0) + except OSError, ose: + if ose.errno == 10: + pass + else: + raise ose + else: + __with_my_bucket(mybucket,buckets,what_to_do,filename) + sys.exit(0) + +def __demo(bucket_number, buckets, my_item): + """ + This is a demo handler for test purposes. + It just multiplies all numbers by 1000, but slowly. + """ + print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item) + # just to verify forks are not sequential + sleep = random.randrange(0,4) + time.sleep(sleep) + return (my_item, my_item * 1000) + +def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR): + """ + Given an array of items (pool), call callback in each one, but divide + the workload over nfork forks. Temporary files used during the + operation will be created in cachedir and subsequently deleted. + """ + if nforks <= 1: + # modulus voodoo gets crazy otherwise and bad things happen + nforks = 2 + shelf_file = __get_storage(cachedir) + __access_buckets(shelf_file,True,None) + buckets = __bucketize(pool, nforks) + # print "DEBUG: buckets: %s" % buckets + __forkbomb(1,buckets,callback,shelf_file) + rc = __access_buckets(shelf_file,False,None) + os.remove(shelf_file) + return rc + +def __test(nforks=4,sample_size=20): + pool = xrange(0,sample_size) + print batch_run(pool,__demo,nforks=nforks) + +if __name__ == "__main__": + __test() + + diff --git a/func/jobthing.py b/func/jobthing.py new file mode 100644 index 0000000..4923daa --- /dev/null +++ b/func/jobthing.py @@ -0,0 +1,121 @@ +# jobthing is a module that allows for background execution of a task, and +# getting status of that task. The ultimate goal is to allow ajaxyness +# of GUI apps using Func, and also for extremely long running tasks that +# we don't want to block on as called by scripts using the FunC API. The +# CLI should not use this. +# +# Copyright 2007, Red Hat, Inc +# Michael DeHaan +# +# This software may be freely redistributed under the terms of the GNU +# general public license. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +import os +import random # for testing only +import time # for testing only +import shelve +import bsddb +import sys +import tempfile +import fcntl +import forkbomb + +JOB_ID_RUNNING = 0 +JOB_ID_FINISHED = 1 +JOB_ID_LOST_IN_SPACE = 2 + +# how long to retain old job records in the job id database +RETAIN_INTERVAL = 60 * 60 + +# where to store the internal job id database +CACHE_DIR = "/var/lib/func" + +def __update_status(jobid, status, results, clear=False): + return __access_status(jobid=jobid, status=status, results=results, write=True) + +def __get_status(jobid): + return __access_status(jobid=jobid, write=False) + + +def __purge_old_jobs(storage): + """ + Deletes jobs older than RETAIN_INTERVAL seconds. + MINOR FIXME: this probably should be a more intelligent algorithm that only + deletes jobs if the database is too big and then only the oldest jobs + but this will work just as well. + """ + nowtime = time.time() + for x in storage.keys(): + create_time = float(x) + if nowtime - create_time > RETAIN_INTERVAL: + del storage[x] + +def __access_status(jobid=0, status=0, results=0, clear=False, write=False): + + dir = os.path.expanduser(CACHE_DIR) + if not os.path.exists(dir): + os.makedirs(dir) + filename = os.path.join(dir,"status-%s" % os.getuid()) + + internal_db = bsddb.btopen(filename, 'c', 0644 ) + handle = open(filename,"r") + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + storage = shelve.BsdDbShelf(internal_db) + + if clear: + storage.clear() + storage.close() + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + return {} + + if write: + __purge_old_jobs(storage) + storage[str(jobid)] = (status, results) + rc = jobid + else: + if storage.has_key(str(jobid)): + # tuple of (status, results) + rc = storage[str(jobid)] + else: + rc = (JOB_ID_LOST_IN_SPACE, 0) + + storage.close() + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + + return rc + +def batch_run(server, process_server, nforks): + """ + This is the method used by the overlord side usage of jobthing. + It likely makes little sense for the minion async usage (yet). + + Given an array of items (pool), call callback in each one, but divide + the workload over nfork forks. Temporary files used during the + operation will be created in cachedir and subsequently deleted. + """ + + job_id = time.time() + pid = os.fork() + if pid != 0: + #print "DEBUG: UPDATE STATUS: r1: %s" % job_id + __update_status(job_id, JOB_ID_RUNNING, -1) + return job_id + else: + #print "DEBUG: UPDATE STATUS: r2: %s" % job_id + __update_status(job_id, JOB_ID_RUNNING, -1) + results = forkbomb.batch_run(server, process_server, nforks) + #print "DEBUG: UPDATE STATUS: f1: %s" % job_id + __update_status(job_id, JOB_ID_FINISHED, results) + sys.exit(0) + +def job_status(jobid): + return __get_status(jobid) + +if __name__ == "__main__": + __test() + + diff --git a/func/overlord/client.py b/func/overlord/client.py index 60b5c24..98edaed 100755 --- a/func/overlord/client.py +++ b/func/overlord/client.py @@ -22,8 +22,8 @@ from func.config import read_config, CONFIG_FILE import sslclient import command -import forkbomb -import jobthing +import func.forkbomb as forkbomb +import func.jobthing as jobthing # =================================== # defaults diff --git a/func/overlord/forkbomb.py b/func/overlord/forkbomb.py deleted file mode 100644 index c30cc9e..0000000 --- a/func/overlord/forkbomb.py +++ /dev/null @@ -1,152 +0,0 @@ -# forkbomb is a module that partitions arbitrary workloads -# among N seperate forks, for a configurable N, and -# collates results upon return, as if it never forked. -# -# Copyright 2007, Red Hat, Inc -# Michael DeHaan -# -# This software may be freely redistributed under the terms of the GNU -# general public license. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import os -import random # for testing only -import time # for testing only -import shelve -import bsddb -import sys -import tempfile -import fcntl - -DEFAULT_FORKS = 4 -DEFAULT_CACHE_DIR = "/var/lib/func" - -def __get_storage(dir): - """ - Return a tempfile we can use for storing data. - """ - dir = os.path.expanduser(dir) - if not os.path.exists(dir): - os.makedirs(dir) - return tempfile.mktemp(suffix='', prefix='asynctmp', dir=dir) - -def __access_buckets(filename,clear,new_key=None,new_value=None): - """ - Access data in forkbomb cache, potentially clearing or - modifying it as required. - """ - - internal_db = bsddb.btopen(filename, 'c', 0644 ) - handle = open(filename,"r") - fcntl.flock(handle.fileno(), fcntl.LOCK_EX) - storage = shelve.BsdDbShelf(internal_db) - - if clear: - storage.clear() - storage.close() - fcntl.flock(handle.fileno(), fcntl.LOCK_UN) - return {} - - if not storage.has_key("data"): - storage["data"] = {} - - if new_key is not None: - # bsdb is a bit weird about this - newish = storage["data"].copy() - newish[new_key] = new_value - storage["data"] = newish - - rc = storage["data"].copy() - storage.close() - fcntl.flock(handle.fileno(), fcntl.LOCK_UN) - - return rc - -def __bucketize(pool, slots): - """ - Given a pre-existing list of X number of tasks, partition - them into a hash of Y number of slots. - """ - buckets = {} - count = 0 - # print "DEBUG: slots: %s" % slots - for key in pool: - count = count + 1 - slot = count % slots - if not buckets.has_key(slot): - buckets[slot] = [] - buckets[slot].append(key) - # print "DEBUG: buckets: %s" % buckets - return buckets - -def __with_my_bucket(bucket_number,buckets,what_to_do,filename): - """ - Process all tasks assigned to a given fork, and save - them in the shelf. - """ - things_in_my_bucket = buckets[bucket_number] - results = {} - for thing in things_in_my_bucket: - (nkey,nvalue) = what_to_do(bucket_number,buckets,thing) - __access_buckets(filename,False,nkey,nvalue) - -def __forkbomb(mybucket,buckets,what_to_do,filename): - """ - Recursive function to spawn of a lot of worker forks. - """ - nbuckets = len(buckets) - pid = os.fork() - if pid != 0: - if mybucket < (nbuckets-1): - __forkbomb(mybucket+1,buckets,what_to_do,filename) - try: - os.waitpid(pid,0) - except OSError, ose: - if ose.errno == 10: - pass - else: - raise ose - else: - __with_my_bucket(mybucket,buckets,what_to_do,filename) - sys.exit(0) - -def __demo(bucket_number, buckets, my_item): - """ - This is a demo handler for test purposes. - It just multiplies all numbers by 1000, but slowly. - """ - print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item) - # just to verify forks are not sequential - sleep = random.randrange(0,4) - time.sleep(sleep) - return (my_item, my_item * 1000) - -def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR): - """ - Given an array of items (pool), call callback in each one, but divide - the workload over nfork forks. Temporary files used during the - operation will be created in cachedir and subsequently deleted. - """ - if nforks <= 1: - # modulus voodoo gets crazy otherwise and bad things happen - nforks = 2 - shelf_file = __get_storage(cachedir) - __access_buckets(shelf_file,True,None) - buckets = __bucketize(pool, nforks) - # print "DEBUG: buckets: %s" % buckets - __forkbomb(1,buckets,callback,shelf_file) - rc = __access_buckets(shelf_file,False,None) - os.remove(shelf_file) - return rc - -def __test(nforks=4,sample_size=20): - pool = xrange(0,sample_size) - print batch_run(pool,__demo,nforks=nforks) - -if __name__ == "__main__": - __test() - - diff --git a/func/overlord/jobthing.py b/func/overlord/jobthing.py deleted file mode 100644 index e405616..0000000 --- a/func/overlord/jobthing.py +++ /dev/null @@ -1,118 +0,0 @@ -# jobthing is a module that allows for background execution of a task, and -# getting status of that task. The ultimate goal is to allow ajaxyness -# of GUI apps using Func, and also for extremely long running tasks that -# we don't want to block on as called by scripts using the FunC API. The -# CLI should not use this. -# -# Copyright 2007, Red Hat, Inc -# Michael DeHaan -# -# This software may be freely redistributed under the terms of the GNU -# general public license. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import os -import random # for testing only -import time # for testing only -import shelve -import bsddb -import sys -import tempfile -import fcntl -import forkbomb - -JOB_ID_RUNNING = 0 -JOB_ID_FINISHED = 1 -JOB_ID_LOST_IN_SPACE = 2 - -# how long to retain old job records in the job id database -RETAIN_INTERVAL = 60 * 60 - -# where to store the internal job id database -CACHE_DIR = "/var/lib/func" - -def __update_status(jobid, status, results, clear=False): - return __access_status(jobid=jobid, status=status, results=results, write=True) - -def __get_status(jobid): - return __access_status(jobid=jobid, write=False) - - -def __purge_old_jobs(storage): - """ - Deletes jobs older than RETAIN_INTERVAL seconds. - MINOR FIXME: this probably should be a more intelligent algorithm that only - deletes jobs if the database is too big and then only the oldest jobs - but this will work just as well. - """ - nowtime = time.time() - for x in storage.keys(): - create_time = float(x) - if nowtime - create_time > RETAIN_INTERVAL: - del storage[x] - -def __access_status(jobid=0, status=0, results=0, clear=False, write=False): - - dir = os.path.expanduser(CACHE_DIR) - if not os.path.exists(dir): - os.makedirs(dir) - filename = os.path.join(dir,"status-%s" % os.getuid()) - - internal_db = bsddb.btopen(filename, 'c', 0644 ) - handle = open(filename,"r") - fcntl.flock(handle.fileno(), fcntl.LOCK_EX) - storage = shelve.BsdDbShelf(internal_db) - - if clear: - storage.clear() - storage.close() - fcntl.flock(handle.fileno(), fcntl.LOCK_UN) - return {} - - if write: - __purge_old_jobs(storage) - storage[str(jobid)] = (status, results) - rc = jobid - else: - if storage.has_key(str(jobid)): - # tuple of (status, results) - rc = storage[str(jobid)] - else: - rc = (JOB_ID_LOST_IN_SPACE, 0) - - storage.close() - fcntl.flock(handle.fileno(), fcntl.LOCK_UN) - - return rc - -def batch_run(server, process_server, nforks): - """ - Given an array of items (pool), call callback in each one, but divide - the workload over nfork forks. Temporary files used during the - operation will be created in cachedir and subsequently deleted. - """ - - job_id = time.time() - pid = os.fork() - if pid != 0: - #print "DEBUG: UPDATE STATUS: r1: %s" % job_id - __update_status(job_id, JOB_ID_RUNNING, -1) - return job_id - else: - #print "DEBUG: UPDATE STATUS: r2: %s" % job_id - __update_status(job_id, JOB_ID_RUNNING, -1) - results = forkbomb.batch_run(server, process_server, nforks) - #print "DEBUG: UPDATE STATUS: f1: %s" % job_id - __update_status(job_id, JOB_ID_FINISHED, results) - sys.exit(0) - -def job_status(jobid): - return __get_status(jobid) - -if __name__ == "__main__": - __test() - - diff --git a/test/async_test.py b/test/async_test.py index 4c1acf5..43904c4 100644 --- a/test/async_test.py +++ b/test/async_test.py @@ -1,5 +1,5 @@ from func.overlord.client import Client -import func.overlord.jobthing as jobthing +import func.jobthing as jobthing import time import sys diff --git a/test/unittest/test_client.py b/test/unittest/test_client.py index 12cb40b..f4f2029 100644 --- a/test/unittest/test_client.py +++ b/test/unittest/test_client.py @@ -5,11 +5,12 @@ import unittest import xmlrpclib import func.overlord.client as fc +import socket class BaseTest: - th = "grimlock.devel.redhat.com" + th = "mdehaan.rdu.redhat.com" def __init__(self): pass -- cgit