summaryrefslogtreecommitdiffstats
path: root/src/software/openlmi/software/yumdb/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/openlmi/software/yumdb/__init__.py')
-rw-r--r--src/software/openlmi/software/yumdb/__init__.py488
1 files changed, 387 insertions, 101 deletions
diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py
index d52b469..b5160b4 100644
--- a/src/software/openlmi/software/yumdb/__init__.py
+++ b/src/software/openlmi/software/yumdb/__init__.py
@@ -33,10 +33,12 @@ only accessor to yum api.
"""
import errno
+import inspect
import os
import re
import time
-from multiprocessing import Process, JoinableQueue, Queue
+from multiprocessing import Process, Queue #pylint: disable=W0404
+from pywbem.cim_provider2 import CIMProvider2
import Queue as TQueue # T as threaded
import threading
import yum
@@ -48,16 +50,30 @@ from openlmi.software.yumdb.packageinfo import PackageInfo
from openlmi.software.yumdb.packagecheck import PackageFile
from openlmi.software.yumdb.packagecheck import PackageCheck
from openlmi.software.yumdb.process import YumWorker
-from openlmi.software.util import singletonmixin
+from openlmi.software.yumdb.repository import Repository
+from openlmi.software.yumdb.util import DispatchingFormatter
+from openlmi.software.util import get_signal_name, singletonmixin
+
+# Maximum time in seconds to wait for a job to accomplish.
+# If timeout expires, spawned process is checked (it might
+# be possibly killed) and is respawned in case it's dead.
+MAX_JOB_WAIT_TIME = 30
# this may be used as an argument to YumWorker
YUM_WORKER_DEBUG_LOGGING_CONFIG = {
"version" : 1,
"formatters": {
+ # this is a message format for logging function/method calls
+ # it's manually set up in YumWorker's init method
"default": {
- "format" : "%(asctime)s %(levelname)s:%(module)s:"
- "%(funcName)s:%(lineno)d - %(message)s"
- }
+ "()": DispatchingFormatter,
+ "formatters" : {
+ "openlmi.software.yumdb.util.trace_function":
+ "%(asctime)s %(levelname)s:%(message)s"
+ },
+ "default" : "%(asctime)s %(levelname)s:%(module)s:"
+ "%(funcName)s:%(lineno)d - %(message)s"
+ },
},
"handlers": {
"file" : {
@@ -65,20 +81,128 @@ YUM_WORKER_DEBUG_LOGGING_CONFIG = {
"filename" : "/var/tmp/YumWorker.log",
"level" : "DEBUG",
"formatter": "default",
- }
+ },
},
- "root": {
- "level": "DEBUG",
- "handlers" : ["file"]
- }
+ "loggers" : {
+ "root": {
+ "level": "ERROR",
+ "handlers" : ["file"]
+ },
+ "openlmi.software.yumdb": {
+ "level" : "DEBUG",
+ "handlers" : ["file"],
+ "propagate" : False,
+ },
}
+}
+
+# *****************************************************************************
+# Utilities
+# *****************************************************************************
+def log_reply_error(job, reply):
+ """
+ Raises an exception in case of error occured in worker process
+ while processing job.
+ """
+ if isinstance(reply, (int, long)):
+ # asynchronous job
+ return
+ if not isinstance(reply, jobs.YumJob):
+ raise TypeError('expected instance of jobs.YumJob for reply, not "%s"' %
+ reply.__class__.__name__)
+ if reply.result == jobs.YumJob.RESULT_ERROR:
+ cmpi_logging.logger.error(
+ "YumDB: %s failed with error %s: %s",
+ job, reply.result_data[0].__name__, str(reply.result_data[1]))
+ cmpi_logging.logger.trace_warn(
+ "YumDB: %s exception traceback:\n%s%s: %s",
+ job, "".join(reply.result_data[2]),
+ reply.result_data[0].__name__, str(reply.result_data[1]))
+ reply.result_data[1].tb_printed = True
+ raise reply.result_data[1]
+ elif reply.result == jobs.YumJob.RESULT_TERMINATED:
+ cmpi_logging.logger.warn('YumDB: %s terminated', job)
+ else:
+ cmpi_logging.logger.debug('YumDB: %s completed with success', job)
+
+def _make_async_job(jobcls, *args, **kwargs):
+ """Creates asynchronous job, filling it wih some metadata."""
+ if not issubclass(jobcls, jobs.YumAsyncJob):
+ raise TypeError("jobcls must be a subclass of YumAsyncJob")
+ job = jobcls(*args, **kwargs)
+ if job.metadata is None:
+ job.metadata = {}
+ job.metadata['name'] = \
+ type(job).__name__[len('Yum'):] + ('-%d' % job.jobid)
+ frm = inspect.currentframe()
+ method_name = None
+ while ( frm is not None
+ and ( not 'self' in frm.f_locals
+ or not isinstance(frm.f_locals['self'], CIMProvider2))):
+ frm = frm.f_back
+ if frm is not None:
+ prov = frm.f_locals['self']
+ method_name = frm.f_code.co_name.lower()
+ if method_name.startswith('cim_method_'):
+ method_name = method_name[len('cim_method_'):]
+ if hasattr(prov, 'values'):
+ lowertocorrectcase = {
+ k.lower(): k for k in prov.values.__dict__ }
+ try:
+ method_name = lowertocorrectcase[method_name]
+ except KeyError:
+ pass
+ if method_name is not None:
+ job.metadata['method_name'] = method_name
+ return job
+
+# *****************************************************************************
+# Decorators
+# *****************************************************************************
+def job_request(async=False):
+ """
+ Decorator factory for job entry points. They are YumDB methods.
+ All of them must return either job objects or jobid for asynchronous calls.
+ Job objects are processed by this decorator for caller to obtain only the
+ information he needs.
+
+ It wrapps them with logger wrapper and in case of asynchronous jobs,
+ it returns just the jobid.
+ """
+ def _decorator(method):
+ """
+ Decorator that just logs the method's call and returns job's result.
+ """
+ logged = cmpi_logging.trace_method(method)
+ def _new_func(self, *args, **kwargs):
+ """Wrapper for YumDB's method."""
+ return logged(self, *args, **kwargs).result_data
+ return _new_func
+
+ def _decorator_async(method):
+ """
+ Decorator for methods accepting async argument. In case of async=True,
+ the method returns jobid. Job's result is returned otherwise.
+ """
+ logged = cmpi_logging.trace_method(method)
+ def _new_func(self, *args, **kwargs):
+ """Wrapper for YumDB's method."""
+ callargs = inspect.getcallargs(method, self, *args, **kwargs)
+ result = logged(self, *args, **kwargs)
+ if callargs.get('async', False):
+ return result
+ else:
+ return result.result_data
+ return _new_func
+
+ return _decorator_async if async else _decorator
class YumDB(singletonmixin.Singleton):
"""
Context manager for accessing yum/rpm database.
All requests are bundled into jobs -- instances of jobs.YumJob and
sent to YumWorker for processing.
-
+
YumWorker is a separate process handling all calls to yum api.
Communication is done via queues (uplink and downlink).
Uplink is used to send jobs to YumWorker and downlink for obtaining
@@ -99,19 +223,23 @@ class YumDB(singletonmixin.Singleton):
# this is to inform Singleton, that __init__ should be called only once
ignoreSubsequent = True
- def __init__(self, *args, **kwargs): #pylint: disable=W0231
+ @cmpi_logging.trace_method
+ def __init__(self, **kwargs): #pylint: disable=W0231
"""
All arguments are passed to yum.YumBase constructor.
"""
self._process = None
- self._yum_args = (args, kwargs)
+ if kwargs is None:
+ kwargs = {}
+ self._yum_kwargs = kwargs
+
+ self._session_lock = threading.RLock()
+ self._session_level = 0
- # used to access _replies dictionary and _expected list
+ # used to guard access to _expected list and _process
self._reply_lock = threading.Lock()
# used to wait for job to be processed and received
self._reply_cond = threading.Condition(self._reply_lock)
- # { job_id : reply, ... }
- self._replies = {}
# ids of all expected jobs -- those to be processed by YumWorker
self._expected = []
cmpi_logging.logger.trace_info('YumDB: initialized')
@@ -120,49 +248,114 @@ class YumDB(singletonmixin.Singleton):
# Private methods
# *************************************************************************
@cmpi_logging.trace_method
- def _wait_for_reply(self, job):
+ def _handle_reply_timeout(self, job):
"""
- Blocks until job is processed by YumWorker and received.
+ This is called when timeout occurs while waiting on downlink queue for
+ reply. Delay can be caused by worker process's early termination (bug).
+ This handler tries to recover from such an situation.
+ """
+ if not self._worker.is_alive():
+ if self._worker.exitcode < 0:
+ cmpi_logging.logger.error("[jobid=%d] worker"
+ " process(pid=%d) killed by signal %s", job.jobid,
+ self._worker.pid, get_signal_name(-self._process.exitcode))
+ else:
+ cmpi_logging.logger.error("[jobid=%d] worker"
+ " process(pid=%d) is dead - exit code: %d",
+ job.jobid, self._process.pid, self._worker.exitcode)
+ with self._reply_lock:
+ self._process = None
+ cmpi_logging.logger.error(
+ "[jobid=%d] starting new worker process", job.jobid)
+ self._expected = []
+ if not isinstance(job, jobs.YumBeginSession):
+ with self._session_lock:
+ if self._session_level > 0:
+ cmpi_logging.logger.info('restoring session '
+ 'level=%d', self._session_level)
+ new_session_job = jobs.YumBeginSession()
+ self._worker.uplink.put(new_session_job)
+ reply = self._worker.downlink.get()
+ log_reply_error(new_session_job, reply)
+ self._worker.uplink.put(job)
+ self._expected.append(job.jobid)
+ # other waiting processes need to resend their requests
+ self._reply_cond.notifyAll()
+ else:
+ cmpi_logging.logger.info("[jobid=%d] process is running,"
+ " waiting some more", job.jobid)
- Only one thread can block on downlink channel to obtain reply. If
- it's reply for him, he takes it and leaves, otherwise he adds it to
- _replies dictionary and notifies other threads. This thread is the
- one, whose job appears as first in _expected list.
+ @cmpi_logging.trace_method
+ def _receive_reply(self, job):
+ """
+ Block on downlink queue to receive expected replies from worker
+ process. Only one thread can be executing this code at any time.
+
+ In case, that worker process terminated due to some error. Restart it
+ and resend all the job requests again.
+ """
+ while True:
+ cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue",
+ job.jobid)
+ try:
+ jobout = self._worker.downlink.get(
+ block=True, timeout=MAX_JOB_WAIT_TIME)
+ if jobout.jobid == job.jobid:
+ with self._reply_lock:
+ cmpi_logging.logger.debug(
+ "[jobid=%d] received desired reply", job.jobid)
+ self._expected.remove(job.jobid)
+ if len(self._expected):
+ self._reply_cond.notify()
+ return jobout
+ else:
+ # this should not happen
+ cmpi_logging.logger.error("[jobid=%d] received reply"
+ " for another thread (jobid=%d)",
+ job.jobid, jobout.jobid)
+ except TQueue.Empty:
+ cmpi_logging.logger.warn("[jobid=%d] wait for job reply timeout"
+ "(%d seconds) occured", job.jobid, MAX_JOB_WAIT_TIME)
+ self._handle_reply_timeout(job)
+
+ @cmpi_logging.trace_method
+ def _send_and_receive(self, job):
+ """
+ Sends a request to server and blocks until job is processed by
+ YumWorker and reply is received.
+
+ Only one thread can block on downlink channel to obtain reply. This
+ thread is the one, whose job appears as first in _expected list. Server
+ processes input jobs sequentially. That's why it's safe to presume,
+ that jobs are received in the same order as they were send. Thanks to
+ that we don't have to care about receiving replies for the other
+ waiting threads.
@return result of job
"""
with self._reply_lock:
- # until our job is not at the head of
+ self._worker.uplink.put(job)
+ if getattr(job, 'async', False) is True:
+ return job.jobid
self._expected.append(job.jobid)
- while job.jobid != self._expected[0]:
- if job.jobid in self._replies:
- self._expected.remove(job.jobid)
- return self._replies.pop(job.jobid)
- else:
+ while True:
+ if job.jobid not in self._expected:
+ # process terminated, resending job
+ cmpi_logging.logger.warn("[jobid=%d] job removed"
+ " from expected list, sending request again", job.jobid)
+ self._worker.uplink.put(job)
+ self._expected.append(job.jobid)
+ elif job.jobid == self._expected[0]:
+ # now it's our turn to block on downlink
+ break
+ else: # another thread blocks on downlink -> let's sleep
cmpi_logging.logger.debug(
- "[jobid=%d] another %s threads expecting reply,"
+ "[jobid=%d] another %d threads expecting reply,"
" suspending...", job.jobid, len(self._expected) - 1)
self._reply_cond.wait()
cmpi_logging.logger.debug(
"[jobid=%d] received reply, waking up", job.jobid)
- while True:
- cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue",
- job.jobid)
- jobid, reply = self._worker.downlink.get()
- with self._reply_lock:
- if jobid != job.jobid:
- cmpi_logging.logger.debug("[jobid=%d] received reply"
- " for another thread (jobid=%d)", job.jobid, jobid)
- self._replies[jobid] = reply
- self._reply_cond.notifyAll()
- else:
- cmpi_logging.logger.debug(
- "[jobid=%d] received desired reply", job.jobid)
- self._expected.remove(job.jobid)
- if len(self._expected):
- self._reply_cond.notify()
- break
- return reply
+ return self._receive_reply(job)
def _do_job(self, job):
"""
@@ -172,23 +365,10 @@ class YumDB(singletonmixin.Singleton):
(exception_type, exception_value, formated_traceback_as_string)
@return reply
"""
- cmpi_logging.logger.trace_verbose("YumDB: doing %s(id=%s) job",
- job.__class__.__name__, job.jobid)
- self._worker.uplink.put(job)
- reply = self._wait_for_reply(job)
- if isinstance(reply, tuple):
- cmpi_logging.logger.error(
- "YumDB: job %s(id=%s) failed with error %s: %s",
- job.__class__.__name__, job.jobid,
- reply[0].__name__, str(reply[1]))
- cmpi_logging.logger.trace_warn(
- "YumDB: job %s(id=%s) exception traceback:\n%s%s: %s",
- job.__class__.__name__, job.jobid, "".join(reply[2]),
- reply[0].__name__, str(reply[1]))
- reply[1].tb_printed = True
- raise reply[1]
- cmpi_logging.logger.trace_verbose("YumDB: job %s(id=%s) done",
- job.__class__.__name__, job.jobid)
+ cmpi_logging.logger.trace_verbose("YumDB: doing %s", job)
+ reply = self._send_and_receive(job)
+ log_reply_error(job, reply)
+ cmpi_logging.logger.trace_verbose("YumDB: job %s done", job.jobid)
return reply
@property
@@ -198,12 +378,14 @@ class YumDB(singletonmixin.Singleton):
"""
if self._process is None:
cmpi_logging.logger.trace_info("YumDB: starting YumWorker")
- uplink = JoinableQueue()
+ uplink = Queue()
downlink = Queue()
self._process = YumWorker(uplink, downlink,
- yum_args=self._yum_args[0], yum_kwargs=self._yum_args[1])
+ yum_kwargs=self._yum_kwargs)
#logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG)
self._process.start()
+ cmpi_logging.logger.trace_info(
+ "YumDB: YumWorker started with pid=%s", self._process.pid)
return self._process
# *************************************************************************
@@ -218,14 +400,24 @@ class YumDB(singletonmixin.Singleton):
@cmpi_logging.trace_method
def __enter__(self):
- self._do_job(jobs.YumBeginSession())
- cmpi_logging.logger.trace_info('YumDB: new session started')
- return self
+ with self._session_lock:
+ if self._session_level == 0:
+ self._do_job(jobs.YumBeginSession())
+ cmpi_logging.logger.trace_info('YumDB: new session started')
+ self._session_level += 1
+ cmpi_logging.logger.trace_info('YumDB: nested to session level=%d',
+ self._session_level)
+ return self
@cmpi_logging.trace_method
def __exit__(self, exc_type, exc_value, traceback):
- self._do_job(jobs.YumEndSession())
- cmpi_logging.logger.trace_info('YumDB: session ended')
+ with self._session_lock:
+ if self._session_level == 1:
+ self._do_job(jobs.YumEndSession())
+ cmpi_logging.logger.trace_info('YumDB: session ended')
+ cmpi_logging.logger.trace_info('YumDB: emerged from session'
+ ' level=%d', self._session_level)
+ self._session_level = max(self._session_level - 1, 0)
# *************************************************************************
# Public methods
@@ -235,32 +427,42 @@ class YumDB(singletonmixin.Singleton):
"""
Shut down the YumWorker process.
"""
- cmpi_logging.logger.info('YumDB: cleanup called')
- if self._process:
- cmpi_logging.logger.info('YumDB: terminating YumWorker')
- self._process.uplink.put(None) # terminating command
- self._process.uplink.join()
- self._process.join()
- cmpi_logging.logger.info('YumDB: YumWorker terminated')
- self._process = None
+ with self._reply_lock:
+ if self._process is not None:
+ cmpi_logging.logger.info('YumDB: terminating YumWorker')
+ self._process.uplink.put(None) # terminating command
+ self._process.join()
+ cmpi_logging.logger.info('YumDB: YumWorker terminated')
+ self._process = None
+ else:
+ cmpi_logging.logger.warn("YunDB: clean_up called, when process"
+ " not initialized!")
- @cmpi_logging.trace_method
+ # *************************************************************************
+ # Jobs with simple results
+ # *************************************************************************
+ @job_request()
def get_package_list(self, kind,
allow_duplicates=False,
- sort=False):
+ sort=False,
+ include_repos=None,
+ exclude_repos=None):
"""
- @param kind is one of: {"installed", "available", "all"}
+ @param kind is one of: jobs.YumGetPackageList.SUPPORTED_KINDS
@param allow_duplicates says, whether to list all found versions
of single package
@return [pkg1, pkg2, ...], pkgi is instance of yumdb.PackageInfo
"""
return self._do_job(jobs.YumGetPackageList(
- kind, allow_duplicates=allow_duplicates, sort=sort))
+ kind, allow_duplicates=allow_duplicates, sort=sort,
+ include_repos=include_repos, exclude_repos=exclude_repos))
- @cmpi_logging.trace_method
+ @job_request()
def filter_packages(self, kind,
allow_duplicates=False,
sort=False,
+ include_repos=None,
+ exclude_repos=None,
**filters):
"""
Similar to get_package_list(), but applies filter on packages.
@@ -268,51 +470,135 @@ class YumDB(singletonmixin.Singleton):
"""
return self._do_job(jobs.YumFilterPackages(
kind, allow_duplicates=allow_duplicates, sort=sort,
+ include_repos=include_repos, exclude_repos=exclude_repos,
**filters))
- @cmpi_logging.trace_method
- def install_package(self, pkg):
+ @job_request()
+ def get_repository_list(self, kind):
+ """
+ @param kind is one of: jobs.YumGetRepositoryList.SUPPORTED_KINDS
+ @param allow_duplicates says, whether to list all found versions
+ of single package
+ @return [pkg1, pkg2, ...], pkgi is instance of yumdb.Repository
+ """
+ return self._do_job(jobs.YumGetRepositoryList(kind))
+
+ @job_request()
+ def filter_repositories(self, kind, **filters):
+ """
+ Similar to get_repository_list(), but applies filter on packages.
+ @see yumdb.jobs.YumFilterRepositories job for supported filter keys
+ """
+ return self._do_job(jobs.YumFilterRepositories(kind, **filters))
+
+ @job_request()
+ def set_repository_enabled(self, repoid, enable):
+ """
+ Enable or disable repository.
+ @param enable is a boolean
+ """
+ return self._do_job(jobs.YumSetRepositoryEnabled(repoid, enable))
+
+ # *************************************************************************
+ # Asynchronous jobs
+ # *************************************************************************
+ @job_request(async=True)
+ def install_package(self, pkg, async=False, force=False):
"""
Install package.
@param pkg is an instance of PackageInfo obtained with
- get_package_list() or filter_packages(), which must be not installed
+ get_package_list() or filter_packages() or a valid nevra as string.
+ Package must not be installed if force is False.
"""
- return self._do_job(jobs.YumInstallPackage(pkg))
+ return self._do_job(_make_async_job(jobs.YumInstallPackage,
+ pkg, force=force, async=async))
- @cmpi_logging.trace_method
- def remove_package(self, pkg):
+ @job_request(async=True)
+ def remove_package(self, pkg, async=False):
"""
@param pkg is an instance of PackageInfo obtained with
get_package_list() or filter_packages(), which must be installed
"""
- return self._do_job(jobs.YumRemovePackage(pkg))
+ return self._do_job(_make_async_job(jobs.YumRemovePackage,
+ pkg, async=async))
- @cmpi_logging.trace_method
- def update_to_package(self, desired_pkg):
+ @job_request(async=True)
+ def update_to_package(self, desired_pkg, async=False):
"""
@param desired_pkg is an instance of PackageInfo,
which must be available
"""
- return self._do_job(jobs.YumUpdateToPackage(desired_pkg))
+ return self._do_job(_make_async_job(jobs.YumUpdateToPackage,
+ desired_pkg, async=async))
- @cmpi_logging.trace_method
+ @job_request(async=True)
def update_package(self, pkg,
+ async=False,
to_epoch=None,
to_version=None,
- to_release=None):
+ to_release=None,
+ force=False):
"""
@param pkg is an instance of PackageInfo, which must be installed
The other parameters filter candidate available packages for update.
"""
- return self._do_job(jobs.YumUpdatePackage(
- pkg, to_epoch, to_version, to_release))
+ return self._do_job(_make_async_job(jobs.YumUpdatePackage,
+ pkg, async, to_epoch, to_version, to_release, force=force))
- @cmpi_logging.trace_method
- def check_package(self, pkg):
+ @job_request(async=True)
+ def check_package(self, pkg, async=False):
"""
@param pkg is an instance of PackageInfo representing installed package
@return instance of yumdb.PackageCheck
"""
- return self._do_job(jobs.YumCheckPackage(pkg))
+ return self._do_job(_make_async_job(jobs.YumCheckPackage,
+ pkg, async=async))
+
+ @job_request(async=True)
+ def install_package_from_uri(self, uri,
+ async=False, update_only=False, force=False):
+ """
+ Install package from uri.
+ @param uri is either remote url or local path.
+ """
+ return self._do_job(jobs.YumInstallPackageFromURI(
+ uri, async, update_only, force=force))
+
+ # *************************************************************************
+ # Control of asynchronous jobs
+ # *************************************************************************
+ @job_request()
+ def get_job(self, jobid):
+ return self._do_job(jobs.YumJobGet(jobid))
+
+ @job_request()
+ def get_job_list(self):
+ return self._do_job(jobs.YumJobGetList())
+
+ @job_request()
+ def get_job_by_name(self, name):
+ return self._do_job(jobs.YumJobGetByName(name))
+
+ @job_request()
+ def set_job_priority(self, jobid, priority):
+ return self._do_job(jobs.YumJobSetPriority(jobid, priority))
+
+ @job_request()
+ def update_job(self, jobid, **kwargs):
+ return self._do_job(jobs.YumJobUpdate(jobid, **kwargs))
+
+ @job_request()
+ def reschedule_job(self, jobid,
+ delete_on_completion, time_before_removal):
+ return self._do_job(jobs.YumJobReschedule(jobid,
+ delete_on_completion, time_before_removal))
+
+ @job_request()
+ def delete_job(self, jobid):
+ return self._do_job(jobs.YumJobDelete(jobid))
+
+ @job_request()
+ def terminate_job(self, jobid):
+ return self._do_job(jobs.YumJobTerminate(jobid))