summaryrefslogtreecommitdiffstats
path: root/src/software/openlmi/software/yumdb/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/openlmi/software/yumdb/__init__.py')
-rw-r--r--src/software/openlmi/software/yumdb/__init__.py307
1 files changed, 307 insertions, 0 deletions
diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py
new file mode 100644
index 0000000..54eeae0
--- /dev/null
+++ b/src/software/openlmi/software/yumdb/__init__.py
@@ -0,0 +1,307 @@
+# Software Management Providers
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Since yum API functions should not be called with different thread_ids
+repeatedly in the same program. It's neccessary, to make these calls
+in single thread. But the provider needs to be able to clean up itself,
+when its not needed. That's why the yum API needs to be accessed from
+separated process, that is created and terminated when needed.
+
+This package contains all the bowels of this separate process together
+with its management and communication facilities.
+
+YumDB is a context manager supposed to be used by any provider as the
+only accessor to yum api.
+"""
+
+import errno
+import os
+import re
+import time
+from multiprocessing import Process, JoinableQueue, Queue
+import Queue as TQueue # T as threaded
+import threading
+import yum
+
+from openlmi.software.yumdb import jobs
+from openlmi.software.yumdb import errors
+from openlmi.software.yumdb.packageinfo import PackageInfo
+from openlmi.software.yumdb.packagecheck import PackageFile
+from openlmi.software.yumdb.packagecheck import PackageCheck
+from openlmi.software.yumdb.process import YumWorker
+from openlmi.software.util import cmpi_logging
+from openlmi.software.util import singletonmixin
+
+# this may be used as an argument to YumWorker
+YUM_WORKER_DEBUG_LOGGING_CONFIG = {
+ "version" : 1,
+ "formatters": {
+ "default": {
+ "format" : "%(asctime)s %(levelname)s:%(module)s:"
+ "%(funcName)s:%(lineno)d - %(message)s"
+ }
+ },
+ "handlers": {
+ "file" : {
+ "class" : "logging.handlers.RotatingFileHandler",
+ "filename" : "/var/tmp/YumWorker.log",
+ "level" : "DEBUG",
+ "formatter": "default",
+ }
+ },
+ "root": {
+ "level": "DEBUG",
+ "handlers" : ["file"]
+ }
+ }
+
+class YumDB(singletonmixin.Singleton):
+ """
+ Context manager for accessing yum/rpm database.
+ All requests are bundled into jobs -- instances of jobs.YumJob and
+ sent to YumWorker for processing.
+
+ YumWorker is a separate process handling all calls to yum api.
+ Communication is done via queues (uplink and downlink).
+ Uplink is used to send jobs to YumWorker and downlink for obtaining
+ results.
+
+ This is implemented in thread safe manner.
+
+ It should be used as a context manager in case, we want to process
+ multiple jobs in single transaction. The example of usage:
+ with YumDB.getInstance() as ydb:
+ pkgs = ydb.filter_packages(...)
+ for pkg in pkgs:
+ ydb.install_package(pkg)
+ ...
+ Yum database stays locked in whole block of code under with statement.
+ """
+
+ # this is to inform Singleton, that __init__ should be called only once
+ ignoreSubsequent = True
+
+ def __init__(self, *args, **kwargs): #pylint: disable=W0231
+ """
+ All arguments are passed to yum.YumBase constructor.
+ """
+ self._process = None
+ self._yum_args = (args, kwargs)
+
+ # used to access _replies dictionary and _expected list
+ self._reply_lock = threading.Lock()
+ # used to wait for job to be processed and received
+ self._reply_cond = threading.Condition(self._reply_lock)
+ # { job_id : reply, ... }
+ self._replies = {}
+ # ids of all expected jobs -- those to be processed by YumWorker
+ self._expected = []
+ cmpi_logging.logger.trace_info('YumDB: initialized')
+
+ # *************************************************************************
+ # Private methods
+ # *************************************************************************
+ @cmpi_logging.trace_method
+ def _wait_for_reply(self, job):
+ """
+ Blocks until job is processed by YumWorker and received.
+
+ Only one thread can block on downlink channel to obtain reply. If
+ it's reply for him, he takes it and leaves, otherwise he adds it to
+ _replies dictionary and notifies other threads. This thread is the
+ one, whose job appears as first in _expected list.
+
+ @return result of job
+ """
+ with self._reply_lock:
+ # until our job is not at the head of
+ self._expected.append(job.jobid)
+ while job.jobid != self._expected[0]:
+ if job.jobid in self._replies:
+ self._expected.remove(job.jobid)
+ return self._replies.pop(job.jobid)
+ else:
+ self._reply_cond.wait()
+ while True:
+ jobid, reply = self._worker.downlink.get()
+ with self._reply_lock:
+ if jobid != job.jobid:
+ self._replies[jobid] = reply
+ self._reply_cond.notifyAll()
+ else:
+ self._expected.remove(job.jobid)
+ if len(self._expected):
+ self._reply_cond.notify()
+ break
+ return reply
+
+ def _do_job(self, job):
+ """
+ Sends the job to YumWorker process and waits for reply.
+ If reply is a tuple, there was an error, while job processing.
+ Incoming exception is in format:
+ (exception_type, exception_value, formated_traceback_as_string)
+ @return reply
+ """
+ cmpi_logging.logger.trace_verbose("YumDB: doing %s(id=%s) job",
+ job.__class__.__name__, job.jobid)
+ self._worker.uplink.put(job)
+ reply = self._wait_for_reply(job)
+ if isinstance(reply, tuple):
+ cmpi_logging.logger.error(
+ "YumDB: job %s(id=%s) failed with error %s: %s",
+ job.__class__.__name__, job.jobid,
+ reply[0].__name__, str(reply[1]))
+ cmpi_logging.logger.trace_warn(
+ "YumDB: job %s(id=%s) exception traceback:\n%s%s: %s",
+ job.__class__.__name__, job.jobid, "".join(reply[2]),
+ reply[0].__name__, str(reply[1]))
+ reply[1].tb_printed = True
+ raise reply[1]
+ cmpi_logging.logger.trace_verbose("YumDB: job %s(id=%s) done",
+ job.__class__.__name__, job.jobid)
+ return reply
+
+ @property
+ def _worker(self):
+ """
+ YumWorker process accessor. It's created upon first need.
+ """
+ if self._process is None:
+ cmpi_logging.logger.trace_info("YumDB: starting YumWorker")
+ uplink = JoinableQueue()
+ downlink = Queue()
+ self._process = YumWorker(uplink, downlink,
+ yum_args=self._yum_args[0], yum_kwargs=self._yum_args[1])
+ #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG)
+ self._process.start()
+ return self._process
+
+ # *************************************************************************
+ # Special methods
+ # *************************************************************************
+ def __del__(self):
+ """
+ Ensure, that YumWorker process is correctly shutted down.
+ """
+ self.clean_up()
+ YumDB.__del__(self)
+
+ @cmpi_logging.trace_method
+ def __enter__(self):
+ self._do_job(jobs.YumBeginSession())
+ cmpi_logging.logger.trace_info('YumDB: new session started')
+ return self
+
+ @cmpi_logging.trace_method
+ def __exit__(self, exc_type, exc_value, traceback):
+ self._do_job(jobs.YumEndSession())
+ cmpi_logging.logger.trace_info('YumDB: session ended')
+
+ # *************************************************************************
+ # Public methods
+ # *************************************************************************
+ @cmpi_logging.trace_method
+ def clean_up(self):
+ """
+ Shut down the YumWorker process.
+ """
+ cmpi_logging.logger.info('YumDB: cleanup called')
+ if self._process:
+ cmpi_logging.logger.info('YumDB: terminating YumWorker')
+ self._process.uplink.put(None) # terminating command
+ self._process.uplink.join()
+ self._process.join()
+ cmpi_logging.logger.info('YumDB: YumWorker terminated')
+ self._process = None
+
+ @cmpi_logging.trace_method
+ def get_package_list(self, kind,
+ allow_duplicates=False,
+ sort=False):
+ """
+ @param kind is one of: {"installed", "available", "all"}
+ @param allow_duplicates says, whether to list all found versions
+ of single package
+ @return [pkg1, pkg2, ...], pkgi is instance of yumdb.PackageInfo
+ """
+ return self._do_job(jobs.YumGetPackageList(
+ kind, allow_duplicates=allow_duplicates, sort=sort))
+
+ @cmpi_logging.trace_method
+ def filter_packages(self, kind,
+ allow_duplicates=False,
+ sort=False,
+ **filters):
+ """
+ Similar to get_package_list(), but applies filter on packages.
+ @see yumdb.jobs.YumFilterPackages job for supported filter keys
+ """
+ return self._do_job(jobs.YumFilterPackages(
+ kind, allow_duplicates=allow_duplicates, sort=sort,
+ **filters))
+
+ @cmpi_logging.trace_method
+ def install_package(self, pkg):
+ """
+ Install package.
+ @param pkg is an instance of PackageInfo obtained with
+ get_package_list() or filter_packages(), which must be not installed
+ """
+ return self._do_job(jobs.YumInstallPackage(pkg))
+
+ @cmpi_logging.trace_method
+ def remove_package(self, pkg):
+ """
+ @param pkg is an instance of PackageInfo obtained with
+ get_package_list() or filter_packages(), which must be installed
+ """
+ return self._do_job(jobs.YumRemovePackage(pkg))
+
+ @cmpi_logging.trace_method
+ def update_to_package(self, desired_pkg):
+ """
+ @param desired_pkg is an instance of PackageInfo,
+ which must be available
+ """
+ return self._do_job(jobs.YumUpdateToPackage(desired_pkg))
+
+ @cmpi_logging.trace_method
+ def update_package(self, pkg,
+ to_epoch=None,
+ to_version=None,
+ to_release=None):
+ """
+ @param pkg is an instance of PackageInfo, which must be installed
+
+ The other parameters filter candidate available packages for update.
+ """
+ return self._do_job(jobs.YumUpdatePackage(
+ pkg, to_epoch, to_version, to_release))
+
+ @cmpi_logging.trace_method
+ def check_package(self, pkg):
+ """
+ @param pkg is an instance of PackageInfo representing installed package
+ @return instance of yumdb.PackageCheck
+ """
+ return self._do_job(jobs.YumCheckPackage(pkg))
+