summaryrefslogtreecommitdiffstats
path: root/src/software/openlmi/software/yumdb/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/openlmi/software/yumdb/__init__.py')
-rw-r--r--src/software/openlmi/software/yumdb/__init__.py311
1 files changed, 237 insertions, 74 deletions
diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py
index 512602a..c9fc0c5 100644
--- a/src/software/openlmi/software/yumdb/__init__.py
+++ b/src/software/openlmi/software/yumdb/__init__.py
@@ -33,10 +33,12 @@ only accessor to yum api.
"""
import errno
+import inspect
import os
import re
import time
-from multiprocessing import Process, JoinableQueue, Queue #pylint: disable=W0404
+from multiprocessing import Process, Queue #pylint: disable=W0404
+from pywbem.cim_provider2 import CIMProvider2
import Queue as TQueue # T as threaded
import threading
import yum
@@ -49,21 +51,29 @@ from openlmi.software.yumdb.packagecheck import PackageFile
from openlmi.software.yumdb.packagecheck import PackageCheck
from openlmi.software.yumdb.process import YumWorker
from openlmi.software.yumdb.repository import Repository
+from openlmi.software.yumdb.util import DispatchingFormatter
from openlmi.software.util import get_signal_name, singletonmixin
# Maximum time in seconds to wait for a job to accomplish.
# If timeout expires, spawned process is checked (it might
# be possibly killed) and is respawned in case it's dead.
-MAX_JOB_WAIT_TIME = 120
+MAX_JOB_WAIT_TIME = 30
# this may be used as an argument to YumWorker
YUM_WORKER_DEBUG_LOGGING_CONFIG = {
"version" : 1,
"formatters": {
+ # this is a message format for logging function/method calls
+ # it's manually set up in YumWorker's init method
"default": {
- "format" : "%(asctime)s %(levelname)s:%(module)s:"
- "%(funcName)s:%(lineno)d - %(message)s"
- }
+ "()": DispatchingFormatter,
+ "formatters" : {
+ "openlmi.software.yumdb.util.trace_function":
+ "%(asctime)s %(levelname)s:%(message)s"
+ },
+ "default" : "%(asctime)s %(levelname)s:%(module)s:"
+ "%(funcName)s:%(lineno)d - %(message)s"
+ },
},
"handlers": {
"file" : {
@@ -71,30 +81,121 @@ YUM_WORKER_DEBUG_LOGGING_CONFIG = {
"filename" : "/var/tmp/YumWorker.log",
"level" : "DEBUG",
"formatter": "default",
- }
+ },
},
- "root": {
- "level": "DEBUG",
- "handlers" : ["file"]
- }
+ "loggers" : {
+ "root": {
+ "level": "ERROR",
+ "handlers" : ["file"]
+ },
+ "openlmi.software.yumdb": {
+ "level" : "DEBUG",
+ "handlers" : ["file"],
+ "propagate" : False,
+ },
}
+}
+# *****************************************************************************
+# Utilities
+# *****************************************************************************
def log_reply_error(job, reply):
"""
Raises an exception in case of error occured in worker process
while processing job.
"""
- if isinstance(reply, tuple):
+ if isinstance(reply, (int, long)):
+ # asynchronous job
+ return
+ if not isinstance(reply, jobs.YumJob):
+ raise TypeError('expected instance of jobs.YumJob for reply, not "%s"' %
+ reply.__class__.__name__)
+ if reply.result == jobs.YumJob.RESULT_ERROR:
cmpi_logging.logger.error(
- "YumDB: job %s(id=%s) failed with error %s: %s",
- job.__class__.__name__, job.jobid,
- reply[0].__name__, str(reply[1]))
+ "YumDB: %s failed with error %s: %s",
+ job, reply.result_data[0].__name__, str(reply.result_data[1]))
cmpi_logging.logger.trace_warn(
- "YumDB: job %s(id=%s) exception traceback:\n%s%s: %s",
- job.__class__.__name__, job.jobid, "".join(reply[2]),
- reply[0].__name__, str(reply[1]))
- reply[1].tb_printed = True
- raise reply[1]
+ "YumDB: %s exception traceback:\n%s%s: %s",
+ job, "".join(reply.result_data[2]),
+ reply.result_data[0].__name__, str(reply.result_data[1]))
+ reply.result_data[1].tb_printed = True
+ raise reply.result_data[1]
+ elif reply.result == jobs.YumJob.RESULT_TERMINATED:
+ cmpi_logging.logger.warn('YumDB: %s terminated', job)
+ else:
+ cmpi_logging.logger.debug('YumDB: %s completed with success', job)
+
+def _make_async_job(jobcls, *args, **kwargs):
+ """Creates asynchronous job, filling it wih some metadata."""
+ if not issubclass(jobcls, jobs.YumAsyncJob):
+ raise TypeError("jobcls must be a subclass of YumAsyncJob")
+ job = jobcls(*args, **kwargs)
+ if job.metadata is None:
+ job.metadata = {}
+ job.metadata['name'] = \
+ type(job).__name__[len('Yum'):] + ('-%d' % job.jobid)
+ frm = inspect.currentframe()
+ method_name = None
+ while ( frm is not None
+ and ( not 'self' in frm.f_locals
+ or not isinstance(frm.f_locals['self'], CIMProvider2))):
+ frm = frm.f_back
+ if frm is not None:
+ prov = frm.f_locals['self']
+ method_name = frm.f_code.co_name.lower()
+ if method_name.startswith('cim_method_'):
+ method_name = method_name[len('cim_method_'):]
+ if hasattr(prov, 'values'):
+ lowertocorrectcase = {
+ k.lower(): k for k in prov.values.__dict__ }
+ try:
+ method_name = lowertocorrectcase[method_name]
+ except KeyError:
+ pass
+ if method_name is not None:
+ job.metadata['method_name'] = method_name
+ return job
+
+# *****************************************************************************
+# Decorators
+# *****************************************************************************
+def job_request(async=False):
+ """
+ Decorator factory for job entry points. They are YumDB methods.
+ All of them must return either job objects or jobid for asynchronous calls.
+ Job objects are processed by this decorator for caller to obtain only the
+ information he needs.
+
+ It wrapps them with logger wrapper and in case of asynchronous jobs,
+ it returns just the jobid.
+ """
+ def _decorator(method):
+ """
+ Decorator that just logs the method's call and returns job's result.
+ """
+ logged = cmpi_logging.trace_method(method)
+ def _new_func(self, *args, **kwargs):
+ """Wrapper for YumDB's method."""
+ return logged(self, *args, **kwargs).result_data
+ return _new_func
+
+ def _decorator_async(method):
+ """
+ Decorator for methods accepting async argument. In case of async=True,
+ the method returns jobid. Job's result is returned otherwise.
+ """
+ logged = cmpi_logging.trace_method(method)
+ def _new_func(self, *args, **kwargs):
+ """Wrapper for YumDB's method."""
+ callargs = inspect.getcallargs(method, self, *args, **kwargs)
+ result = logged(self, *args, **kwargs)
+ if callargs.get('async', False):
+ return result
+ else:
+ return result.result_data
+ return _new_func
+
+ return _decorator_async if async else _decorator
class YumDB(singletonmixin.Singleton):
"""
@@ -122,12 +223,15 @@ class YumDB(singletonmixin.Singleton):
# this is to inform Singleton, that __init__ should be called only once
ignoreSubsequent = True
- def __init__(self, *args, **kwargs): #pylint: disable=W0231
+ @cmpi_logging.trace_method
+ def __init__(self, **kwargs): #pylint: disable=W0231
"""
All arguments are passed to yum.YumBase constructor.
"""
self._process = None
- self._yum_args = (args, kwargs)
+ if kwargs is None:
+ kwargs = {}
+ self._yum_kwargs = kwargs
self._session_lock = threading.RLock()
self._session_level = 0
@@ -194,21 +298,21 @@ class YumDB(singletonmixin.Singleton):
cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue",
job.jobid)
try:
- jobid, reply = self._worker.downlink.get(
+ jobout = self._worker.downlink.get(
block=True, timeout=MAX_JOB_WAIT_TIME)
- if jobid == job.jobid:
+ if jobout.jobid == job.jobid:
with self._reply_lock:
cmpi_logging.logger.debug(
"[jobid=%d] received desired reply", job.jobid)
self._expected.remove(job.jobid)
if len(self._expected):
self._reply_cond.notify()
- return reply
+ return jobout
else:
# this should not happen
cmpi_logging.logger.error("[jobid=%d] received reply"
" for another thread (jobid=%d)",
- job.jobid, jobid)
+ job.jobid, jobout.jobid)
except TQueue.Empty:
cmpi_logging.logger.warn("[jobid=%d] wait for job reply timeout"
"(%d seconds) occured", job.jobid, MAX_JOB_WAIT_TIME)
@@ -231,6 +335,8 @@ class YumDB(singletonmixin.Singleton):
"""
with self._reply_lock:
self._worker.uplink.put(job)
+ if getattr(job, 'async', False) is True:
+ return job.jobid
self._expected.append(job.jobid)
while True:
if job.jobid not in self._expected:
@@ -259,12 +365,10 @@ class YumDB(singletonmixin.Singleton):
(exception_type, exception_value, formated_traceback_as_string)
@return reply
"""
- cmpi_logging.logger.trace_verbose("YumDB: doing %s(id=%s) job",
- job.__class__.__name__, job.jobid)
+ cmpi_logging.logger.trace_verbose("YumDB: doing %s", job)
reply = self._send_and_receive(job)
log_reply_error(job, reply)
- cmpi_logging.logger.trace_verbose("YumDB: job %s(id=%s) done",
- job.__class__.__name__, job.jobid)
+ cmpi_logging.logger.trace_verbose("YumDB: job %s done", job.jobid)
return reply
@property
@@ -274,11 +378,11 @@ class YumDB(singletonmixin.Singleton):
"""
if self._process is None:
cmpi_logging.logger.trace_info("YumDB: starting YumWorker")
- uplink = JoinableQueue()
+ uplink = Queue()
downlink = Queue()
self._process = YumWorker(uplink, downlink,
- yum_args=self._yum_args[0], yum_kwargs=self._yum_args[1])
- #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG)
+ yum_kwargs=self._yum_kwargs,
+ logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG)
self._process.start()
cmpi_logging.logger.trace_info(
"YumDB: YumWorker started with pid=%s", self._process.pid)
@@ -327,7 +431,6 @@ class YumDB(singletonmixin.Singleton):
if self._process is not None:
cmpi_logging.logger.info('YumDB: terminating YumWorker')
self._process.uplink.put(None) # terminating command
- self._process.uplink.join()
self._process.join()
cmpi_logging.logger.info('YumDB: YumWorker terminated')
self._process = None
@@ -335,7 +438,10 @@ class YumDB(singletonmixin.Singleton):
cmpi_logging.logger.warn("YunDB: clean_up called, when process"
" not initialized!")
- @cmpi_logging.trace_method
+ # *************************************************************************
+ # Jobs with simple results
+ # *************************************************************************
+ @job_request()
def get_package_list(self, kind,
allow_duplicates=False,
sort=False,
@@ -351,7 +457,7 @@ class YumDB(singletonmixin.Singleton):
kind, allow_duplicates=allow_duplicates, sort=sort,
include_repos=include_repos, exclude_repos=exclude_repos))
- @cmpi_logging.trace_method
+ @job_request()
def filter_packages(self, kind,
allow_duplicates=False,
sort=False,
@@ -367,75 +473,132 @@ class YumDB(singletonmixin.Singleton):
include_repos=include_repos, exclude_repos=exclude_repos,
**filters))
- @cmpi_logging.trace_method
- def install_package(self, pkg):
+ @job_request()
+ def get_repository_list(self, kind):
+ """
+ @param kind is one of: jobs.YumGetRepositoryList.SUPPORTED_KINDS
+ @param allow_duplicates says, whether to list all found versions
+ of single package
+ @return [pkg1, pkg2, ...], pkgi is instance of yumdb.Repository
+ """
+ return self._do_job(jobs.YumGetRepositoryList(kind))
+
+ @job_request()
+ def filter_repositories(self, kind, **filters):
+ """
+ Similar to get_repository_list(), but applies filter on packages.
+ @see yumdb.jobs.YumFilterRepositories job for supported filter keys
+ """
+ return self._do_job(jobs.YumFilterRepositories(kind, **filters))
+
+ @job_request()
+ def set_repository_enabled(self, repoid, enable):
+ """
+ Enable or disable repository.
+ @param enable is a boolean
+ """
+ return self._do_job(jobs.YumSetRepositoryEnabled(repoid, enable))
+
+ # *************************************************************************
+ # Asynchronous jobs
+ # *************************************************************************
+ @job_request(async=True)
+ def install_package(self, pkg, async=False, force=False):
"""
Install package.
@param pkg is an instance of PackageInfo obtained with
- get_package_list() or filter_packages(), which must be not installed
+ get_package_list() or filter_packages() or a valid nevra as string.
+ Package must not be installed if force is False.
"""
- return self._do_job(jobs.YumInstallPackage(pkg))
+ return self._do_job(_make_async_job(jobs.YumInstallPackage,
+ pkg, force=force, async=async))
- @cmpi_logging.trace_method
- def remove_package(self, pkg):
+ @job_request(async=True)
+ def remove_package(self, pkg, async=False):
"""
@param pkg is an instance of PackageInfo obtained with
get_package_list() or filter_packages(), which must be installed
"""
- return self._do_job(jobs.YumRemovePackage(pkg))
+ return self._do_job(_make_async_job(jobs.YumRemovePackage,
+ pkg, async=async))
- @cmpi_logging.trace_method
- def update_to_package(self, desired_pkg):
+ @job_request(async=True)
+ def update_to_package(self, desired_pkg, async=False):
"""
@param desired_pkg is an instance of PackageInfo,
which must be available
"""
- return self._do_job(jobs.YumUpdateToPackage(desired_pkg))
+ return self._do_job(_make_async_job(jobs.YumUpdateToPackage,
+ desired_pkg, async=async))
- @cmpi_logging.trace_method
+ @job_request(async=True)
def update_package(self, pkg,
+ async=False,
to_epoch=None,
to_version=None,
- to_release=None):
+ to_release=None,
+ force=False):
"""
@param pkg is an instance of PackageInfo, which must be installed
The other parameters filter candidate available packages for update.
"""
- return self._do_job(jobs.YumUpdatePackage(
- pkg, to_epoch, to_version, to_release))
+ return self._do_job(_make_async_job(jobs.YumUpdatePackage,
+ pkg, async, to_epoch, to_version, to_release, force=force))
- @cmpi_logging.trace_method
- def check_package(self, pkg):
+ @job_request(async=True)
+ def check_package(self, pkg, async=False):
"""
@param pkg is an instance of PackageInfo representing installed package
@return instance of yumdb.PackageCheck
"""
- return self._do_job(jobs.YumCheckPackage(pkg))
+ return self._do_job(_make_async_job(jobs.YumCheckPackage,
+ pkg, async=async))
- @cmpi_logging.trace_method
- def get_repository_list(self, kind):
+ @job_request(async=True)
+ def install_package_from_uri(self, uri,
+ async=False, update_only=False, force=False):
"""
- @param kind is one of: jobs.YumGetRepositoryList.SUPPORTED_KINDS
- @param allow_duplicates says, whether to list all found versions
- of single package
- @return [pkg1, pkg2, ...], pkgi is instance of yumdb.Repository
+ Install package from uri.
+ @param uri is either remote url or local path.
"""
- return self._do_job(jobs.YumGetRepositoryList(kind))
+ return self._do_job(jobs.YumInstallPackageFromURI(
+ uri, async, update_only, force=force))
- @cmpi_logging.trace_method
- def filter_repositories(self, kind, **filters):
- """
- Similar to get_repository_list(), but applies filter on packages.
- @see yumdb.jobs.YumFilterRepositories job for supported filter keys
- """
- return self._do_job(jobs.YumFilterRepositories(kind, **filters))
-
- @cmpi_logging.trace_method
- def set_repository_enabled(self, repoid, enable):
- """
- Enable or disable repository.
- @param enable is a boolean
- """
- return self._do_job(jobs.YumSetRepositoryEnabled(repoid, enable))
+ # *************************************************************************
+ # Control of asynchronous jobs
+ # *************************************************************************
+ @job_request()
+ def get_job(self, jobid):
+ return self._do_job(jobs.YumJobGet(jobid))
+
+ @job_request()
+ def get_job_list(self):
+ return self._do_job(jobs.YumJobGetList())
+
+ @job_request()
+ def get_job_by_name(self, name):
+ return self._do_job(jobs.YumJobGetByName(name))
+
+ @job_request()
+ def set_job_priority(self, jobid, priority):
+ return self._do_job(jobs.YumJobSetPriority(jobid, priority))
+
+ @job_request()
+ def update_job(self, jobid, **kwargs):
+ return self._do_job(jobs.YumJobUpdate(jobid, **kwargs))
+
+ @job_request()
+ def reschedule_job(self, jobid,
+ delete_on_completion, time_before_removal):
+ return self._do_job(jobs.YumJobReschedule(jobid,
+ delete_on_completion, time_before_removal))
+
+ @job_request()
+ def delete_job(self, jobid):
+ return self._do_job(jobs.YumJobDelete(jobid))
+
+ @job_request()
+ def terminate_job(self, jobid):
+ return self._do_job(jobs.YumJobTerminate(jobid))