diff options
Diffstat (limited to 'func/jobthing.py')
-rw-r--r-- | func/jobthing.py | 204 |
1 files changed, 204 insertions, 0 deletions
diff --git a/func/jobthing.py b/func/jobthing.py new file mode 100644 index 0000000..67ad1a6 --- /dev/null +++ b/func/jobthing.py @@ -0,0 +1,204 @@ +# jobthing is a module that allows for background execution of a task, and +# getting status of that task. The ultimate goal is to allow ajaxyness +# of GUI apps using Func, and also for extremely long running tasks that +# we don't want to block on as called by scripts using the FunC API. The +# CLI should not use this. +# +# Copyright 2007, Red Hat, Inc +# Michael DeHaan <mdehaan@redhat.com> +# +# This software may be freely redistributed under the terms of the GNU +# general public license. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +import os +import random # for testing only +import time # for testing only +import shelve +import bsddb +import sys +import tempfile +import fcntl +import forkbomb +import utils +import traceback + +JOB_ID_RUNNING = 0 +JOB_ID_FINISHED = 1 +JOB_ID_LOST_IN_SPACE = 2 +JOB_ID_ASYNC_PARTIAL = 3 +JOB_ID_ASYNC_FINISHED = 4 + +# how long to retain old job records in the job id database +RETAIN_INTERVAL = 60 * 60 + +# where to store the internal job id database +CACHE_DIR = "/var/lib/func" + +def __update_status(jobid, status, results, clear=False): + return __access_status(jobid=jobid, status=status, results=results, write=True) + +def __get_status(jobid): + return __access_status(jobid=jobid, write=False) + +def purge_old_jobs(): + return __access_status(purge=True) + +def __purge_old_jobs(storage): + """ + Deletes jobs older than RETAIN_INTERVAL seconds. + MINOR FIXME: this probably should be a more intelligent algorithm that only + deletes jobs if the database is too big and then only the oldest jobs + but this will work just as well. + """ + nowtime = time.time() + for x in storage.keys(): + # minion jobs have "-minion" in the job id so disambiguation so we need to remove that + jobkey = x.replace("-","").replace("minion","") + create_time = float(jobkey) + if nowtime - create_time > RETAIN_INTERVAL: + del storage[x] + +def __access_status(jobid=0, status=0, results=0, clear=False, write=False, purge=False): + + dir = os.path.expanduser(CACHE_DIR) + if not os.path.exists(dir): + os.makedirs(dir) + filename = os.path.join(dir,"status-%s" % os.getuid()) + + internal_db = bsddb.btopen(filename, 'c', 0644 ) + handle = open(filename,"r") + fcntl.flock(handle.fileno(), fcntl.LOCK_EX) + storage = shelve.BsdDbShelf(internal_db) + + + if clear: + storage.clear() + storage.close() + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + return {} + + if purge or write: + __purge_old_jobs(storage) + + if write: + storage[str(jobid)] = (status, results) + rc = jobid + elif not purge: + if storage.has_key(str(jobid)): + # tuple of (status, results) + + rc = storage[str(jobid)] + else: + rc = (JOB_ID_LOST_IN_SPACE, 0) + else: + rc = 0 + + storage.close() + fcntl.flock(handle.fileno(), fcntl.LOCK_UN) + + return rc + +def batch_run(server, process_server, nforks): + """ + This is the method used by the overlord side usage of jobthing. + Minion side usage will use minion_async_run instead. + + Given an array of items (pool), call callback in each one, but divide + the workload over nfork forks. Temporary files used during the + operation will be created in cachedir and subsequently deleted. + """ + + job_id = time.time() + pid = os.fork() + if pid != 0: + __update_status(job_id, JOB_ID_RUNNING, -1) + return job_id + else: + # kick off the job + __update_status(job_id, JOB_ID_RUNNING, -1) + results = forkbomb.batch_run(server, process_server, nforks) + + # we now have a list of job id's for each minion, kill the task + __update_status(job_id, JOB_ID_ASYNC_PARTIAL, results) + sys.exit(0) + +def minion_async_run(retriever, method, args): + """ + This is a simpler invocation for minion side async usage. + """ + # to avoid confusion of job id's (we use the same job database) + # minion jobs contain the string "minion". + + + job_id = "%s-minion" % time.time() + pid = os.fork() + if pid != 0: + __update_status(job_id, JOB_ID_RUNNING, -1) + return job_id + else: + __update_status(job_id, JOB_ID_RUNNING, -1) + try: + function_ref = retriever(method) + rc = function_ref(*args) + except Exception, e: + (t, v, tb) = sys.exc_info() + rc = utils.nice_exception(t,v,tb) + + __update_status(job_id, JOB_ID_FINISHED, rc) + sys.exit(0) + +def job_status(jobid, client_class=None): + + # NOTE: client_class is here to get around some evil circular reference + # type stuff. This is intended to be called by minions (who can leave it None) + # or by the Client module code (which does not need to be worried about it). API + # users should not be calling jobthing.py methods directly. + + got_status = __get_status(jobid) + + # if the status comes back as JOB_ID_ASYNC_PARTIAL what we have is actually a hash + # of hostname/minion-jobid pairs. Instantiate a client handle for each and poll them + # for their actual status, filling in only the ones that are actually done. + + (interim_rc, interim_results) = got_status + + if interim_rc == JOB_ID_ASYNC_PARTIAL: + + partial_results = {} + + + some_missing = False + for host in interim_results.keys(): + + minion_job = interim_results[host] + client = client_class(host, noglobs=True, async=False) + minion_result = client.jobs.job_status(minion_job) + + (minion_interim_rc, minion_interim_result) = minion_result + + if minion_interim_rc not in [ JOB_ID_RUNNING ]: + if minion_interim_rc in [ JOB_ID_LOST_IN_SPACE ]: + partial_results[host] = [ utils.REMOTE_ERROR, "lost job" ] + else: + partial_results[host] = minion_interim_result + else: + some_missing = True + + if some_missing: + return (JOB_ID_ASYNC_PARTIAL, partial_results) + else: + return (JOB_ID_ASYNC_FINISHED, partial_results) + + else: + return got_status + + # of job id's on the minion in results. + +if __name__ == "__main__": + __test() + + |