summaryrefslogtreecommitdiffstats
path: root/func/overlord
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-01-23 12:38:13 -0500
committerMichael DeHaan <mdehaan@redhat.com>2008-01-23 12:38:13 -0500
commit504041eb26aba41092aa528e1a724fa5554063d7 (patch)
treef27263d205c0304e898028c5993cc04a62f1c939 /func/overlord
parent6e901789f539a25b50512c0504a4338fa638a743 (diff)
downloadfunc-504041eb26aba41092aa528e1a724fa5554063d7.tar.gz
func-504041eb26aba41092aa528e1a724fa5554063d7.tar.xz
func-504041eb26aba41092aa528e1a724fa5554063d7.zip
Moving the async and multiprocess stuff to top level so we can use them
on the minion for minion side async funness.
Diffstat (limited to 'func/overlord')
-rwxr-xr-xfunc/overlord/client.py4
-rw-r--r--func/overlord/forkbomb.py152
-rw-r--r--func/overlord/jobthing.py118
3 files changed, 2 insertions, 272 deletions
diff --git a/func/overlord/client.py b/func/overlord/client.py
index 60b5c24..98edaed 100755
--- a/func/overlord/client.py
+++ b/func/overlord/client.py
@@ -22,8 +22,8 @@ from func.config import read_config, CONFIG_FILE
import sslclient
import command
-import forkbomb
-import jobthing
+import func.forkbomb as forkbomb
+import func.jobthing as jobthing
# ===================================
# defaults
diff --git a/func/overlord/forkbomb.py b/func/overlord/forkbomb.py
deleted file mode 100644
index c30cc9e..0000000
--- a/func/overlord/forkbomb.py
+++ /dev/null
@@ -1,152 +0,0 @@
-# forkbomb is a module that partitions arbitrary workloads
-# among N seperate forks, for a configurable N, and
-# collates results upon return, as if it never forked.
-#
-# Copyright 2007, Red Hat, Inc
-# Michael DeHaan <mdehaan@redhat.com>
-#
-# This software may be freely redistributed under the terms of the GNU
-# general public license.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-import os
-import random # for testing only
-import time # for testing only
-import shelve
-import bsddb
-import sys
-import tempfile
-import fcntl
-
-DEFAULT_FORKS = 4
-DEFAULT_CACHE_DIR = "/var/lib/func"
-
-def __get_storage(dir):
- """
- Return a tempfile we can use for storing data.
- """
- dir = os.path.expanduser(dir)
- if not os.path.exists(dir):
- os.makedirs(dir)
- return tempfile.mktemp(suffix='', prefix='asynctmp', dir=dir)
-
-def __access_buckets(filename,clear,new_key=None,new_value=None):
- """
- Access data in forkbomb cache, potentially clearing or
- modifying it as required.
- """
-
- internal_db = bsddb.btopen(filename, 'c', 0644 )
- handle = open(filename,"r")
- fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
- storage = shelve.BsdDbShelf(internal_db)
-
- if clear:
- storage.clear()
- storage.close()
- fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
- return {}
-
- if not storage.has_key("data"):
- storage["data"] = {}
-
- if new_key is not None:
- # bsdb is a bit weird about this
- newish = storage["data"].copy()
- newish[new_key] = new_value
- storage["data"] = newish
-
- rc = storage["data"].copy()
- storage.close()
- fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
-
- return rc
-
-def __bucketize(pool, slots):
- """
- Given a pre-existing list of X number of tasks, partition
- them into a hash of Y number of slots.
- """
- buckets = {}
- count = 0
- # print "DEBUG: slots: %s" % slots
- for key in pool:
- count = count + 1
- slot = count % slots
- if not buckets.has_key(slot):
- buckets[slot] = []
- buckets[slot].append(key)
- # print "DEBUG: buckets: %s" % buckets
- return buckets
-
-def __with_my_bucket(bucket_number,buckets,what_to_do,filename):
- """
- Process all tasks assigned to a given fork, and save
- them in the shelf.
- """
- things_in_my_bucket = buckets[bucket_number]
- results = {}
- for thing in things_in_my_bucket:
- (nkey,nvalue) = what_to_do(bucket_number,buckets,thing)
- __access_buckets(filename,False,nkey,nvalue)
-
-def __forkbomb(mybucket,buckets,what_to_do,filename):
- """
- Recursive function to spawn of a lot of worker forks.
- """
- nbuckets = len(buckets)
- pid = os.fork()
- if pid != 0:
- if mybucket < (nbuckets-1):
- __forkbomb(mybucket+1,buckets,what_to_do,filename)
- try:
- os.waitpid(pid,0)
- except OSError, ose:
- if ose.errno == 10:
- pass
- else:
- raise ose
- else:
- __with_my_bucket(mybucket,buckets,what_to_do,filename)
- sys.exit(0)
-
-def __demo(bucket_number, buckets, my_item):
- """
- This is a demo handler for test purposes.
- It just multiplies all numbers by 1000, but slowly.
- """
- print ">> I am fork (%s) and I am processing item (%s)" % (bucket_number, my_item)
- # just to verify forks are not sequential
- sleep = random.randrange(0,4)
- time.sleep(sleep)
- return (my_item, my_item * 1000)
-
-def batch_run(pool,callback,nforks=DEFAULT_FORKS,cachedir=DEFAULT_CACHE_DIR):
- """
- Given an array of items (pool), call callback in each one, but divide
- the workload over nfork forks. Temporary files used during the
- operation will be created in cachedir and subsequently deleted.
- """
- if nforks <= 1:
- # modulus voodoo gets crazy otherwise and bad things happen
- nforks = 2
- shelf_file = __get_storage(cachedir)
- __access_buckets(shelf_file,True,None)
- buckets = __bucketize(pool, nforks)
- # print "DEBUG: buckets: %s" % buckets
- __forkbomb(1,buckets,callback,shelf_file)
- rc = __access_buckets(shelf_file,False,None)
- os.remove(shelf_file)
- return rc
-
-def __test(nforks=4,sample_size=20):
- pool = xrange(0,sample_size)
- print batch_run(pool,__demo,nforks=nforks)
-
-if __name__ == "__main__":
- __test()
-
-
diff --git a/func/overlord/jobthing.py b/func/overlord/jobthing.py
deleted file mode 100644
index e405616..0000000
--- a/func/overlord/jobthing.py
+++ /dev/null
@@ -1,118 +0,0 @@
-# jobthing is a module that allows for background execution of a task, and
-# getting status of that task. The ultimate goal is to allow ajaxyness
-# of GUI apps using Func, and also for extremely long running tasks that
-# we don't want to block on as called by scripts using the FunC API. The
-# CLI should not use this.
-#
-# Copyright 2007, Red Hat, Inc
-# Michael DeHaan <mdehaan@redhat.com>
-#
-# This software may be freely redistributed under the terms of the GNU
-# general public license.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-import os
-import random # for testing only
-import time # for testing only
-import shelve
-import bsddb
-import sys
-import tempfile
-import fcntl
-import forkbomb
-
-JOB_ID_RUNNING = 0
-JOB_ID_FINISHED = 1
-JOB_ID_LOST_IN_SPACE = 2
-
-# how long to retain old job records in the job id database
-RETAIN_INTERVAL = 60 * 60
-
-# where to store the internal job id database
-CACHE_DIR = "/var/lib/func"
-
-def __update_status(jobid, status, results, clear=False):
- return __access_status(jobid=jobid, status=status, results=results, write=True)
-
-def __get_status(jobid):
- return __access_status(jobid=jobid, write=False)
-
-
-def __purge_old_jobs(storage):
- """
- Deletes jobs older than RETAIN_INTERVAL seconds.
- MINOR FIXME: this probably should be a more intelligent algorithm that only
- deletes jobs if the database is too big and then only the oldest jobs
- but this will work just as well.
- """
- nowtime = time.time()
- for x in storage.keys():
- create_time = float(x)
- if nowtime - create_time > RETAIN_INTERVAL:
- del storage[x]
-
-def __access_status(jobid=0, status=0, results=0, clear=False, write=False):
-
- dir = os.path.expanduser(CACHE_DIR)
- if not os.path.exists(dir):
- os.makedirs(dir)
- filename = os.path.join(dir,"status-%s" % os.getuid())
-
- internal_db = bsddb.btopen(filename, 'c', 0644 )
- handle = open(filename,"r")
- fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
- storage = shelve.BsdDbShelf(internal_db)
-
- if clear:
- storage.clear()
- storage.close()
- fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
- return {}
-
- if write:
- __purge_old_jobs(storage)
- storage[str(jobid)] = (status, results)
- rc = jobid
- else:
- if storage.has_key(str(jobid)):
- # tuple of (status, results)
- rc = storage[str(jobid)]
- else:
- rc = (JOB_ID_LOST_IN_SPACE, 0)
-
- storage.close()
- fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
-
- return rc
-
-def batch_run(server, process_server, nforks):
- """
- Given an array of items (pool), call callback in each one, but divide
- the workload over nfork forks. Temporary files used during the
- operation will be created in cachedir and subsequently deleted.
- """
-
- job_id = time.time()
- pid = os.fork()
- if pid != 0:
- #print "DEBUG: UPDATE STATUS: r1: %s" % job_id
- __update_status(job_id, JOB_ID_RUNNING, -1)
- return job_id
- else:
- #print "DEBUG: UPDATE STATUS: r2: %s" % job_id
- __update_status(job_id, JOB_ID_RUNNING, -1)
- results = forkbomb.batch_run(server, process_server, nforks)
- #print "DEBUG: UPDATE STATUS: f1: %s" % job_id
- __update_status(job_id, JOB_ID_FINISHED, results)
- sys.exit(0)
-
-def job_status(jobid):
- return __get_status(jobid)
-
-if __name__ == "__main__":
- __test()
-
-