summaryrefslogtreecommitdiffstats
path: root/src/software/lmi/software/yumdb
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/lmi/software/yumdb')
-rw-r--r--src/software/lmi/software/yumdb/__init__.py677
-rw-r--r--src/software/lmi/software/yumdb/errors.py111
-rw-r--r--src/software/lmi/software/yumdb/jobmanager.py574
-rw-r--r--src/software/lmi/software/yumdb/jobs.py668
-rw-r--r--src/software/lmi/software/yumdb/packagecheck.py235
-rw-r--r--src/software/lmi/software/yumdb/packageinfo.py181
-rw-r--r--src/software/lmi/software/yumdb/process.py914
-rw-r--r--src/software/lmi/software/yumdb/repository.py202
-rw-r--r--src/software/lmi/software/yumdb/util.py163
9 files changed, 3725 insertions, 0 deletions
diff --git a/src/software/lmi/software/yumdb/__init__.py b/src/software/lmi/software/yumdb/__init__.py
new file mode 100644
index 0000000..656b090
--- /dev/null
+++ b/src/software/lmi/software/yumdb/__init__.py
@@ -0,0 +1,677 @@
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Since yum API functions should not be called with different thread_ids
+repeatedly in the same program. It's neccessary, to make these calls
+in single thread. But the provider needs to be able to clean up itself,
+when its not needed. That's why the yum API needs to be accessed from
+separated process, that is created and terminated when needed.
+
+This package contains all the bowels of this separate process together
+with its management and communication facilities.
+
+YumDB is a context manager supposed to be used by any provider as the
+only accessor to yum api.
+"""
+
+import errno
+import inspect
+import os
+import re
+import time
+from multiprocessing import Process, Queue #pylint: disable=W0404
+from pywbem.cim_provider2 import CIMProvider2
+import Queue as TQueue # T as threaded
+import threading
+import yum
+
+from lmi.common import cmpi_logging, singletonmixin
+from lmi.common.IndicationManager import IndicationManager
+from lmi.software.yumdb import jobs
+from lmi.software.yumdb import errors
+from lmi.software.yumdb.packageinfo import PackageInfo
+from lmi.software.yumdb.packagecheck import PackageFile
+from lmi.software.yumdb.packagecheck import PackageCheck
+from lmi.software.yumdb.process import YumWorker
+from lmi.software.yumdb.repository import Repository
+from lmi.software.yumdb.util import DispatchingFormatter
+from lmi.software.util import get_signal_name
+
+# Maximum time in seconds to wait for a job to accomplish.
+# If timeout expires, spawned process is checked (it might
+# be possibly killed) and is respawned in case it's dead.
+MAX_JOB_WAIT_TIME = 30
+
+# this may be used as an argument to YumWorker to setup logging
+YUM_WORKER_DEBUG_LOGGING_CONFIG = {
+ "version" : 1,
+ 'disable_existing_loggers' : True,
+ "formatters": {
+ # this is a message format for logging function/method calls
+ # it's manually set up in YumWorker's init method
+ "default": {
+ "()": DispatchingFormatter,
+ "formatters" : {
+ "lmi.software.yumdb.util.trace_function":
+ "%(asctime)s %(levelname)s:%(message)s"
+ },
+ "default" : "%(asctime)s %(levelname)s:%(module)s:"
+ "%(funcName)s:%(lineno)d - %(message)s"
+ },
+ },
+ "handlers": {
+ "file" : {
+ "class" : "logging.handlers.RotatingFileHandler",
+ "filename" : "/var/tmp/YumWorker.log",
+ "level" : "DEBUG",
+ "formatter": "default",
+ },
+ },
+ "loggers" : {
+ "root": {
+ "level": "DEBUG",
+ "handlers" : ["file"]
+ },
+ "lmi.software.yumdb": {
+ "level" : "DEBUG",
+ "handlers" : ["file"],
+ "propagate" : False,
+ },
+ }
+}
+
+# *****************************************************************************
+# Utilities
+# *****************************************************************************
+def log_reply_error(job, reply):
+ """
+ Raises an exception in case of error occured in worker process
+ while processing job.
+ """
+ if isinstance(reply, (int, long)):
+ # asynchronous job
+ return
+ if not isinstance(reply, jobs.YumJob):
+ raise TypeError('expected instance of jobs.YumJob for reply, not "%s"' %
+ reply.__class__.__name__)
+ if reply.result == jobs.YumJob.RESULT_ERROR:
+ cmpi_logging.logger.error(
+ "YumDB: %s failed with error %s: %s",
+ job, reply.result_data[0].__name__, str(reply.result_data[1]))
+ cmpi_logging.logger.trace_warn(
+ "YumDB: %s exception traceback:\n%s%s: %s",
+ job, "".join(reply.result_data[2]),
+ reply.result_data[0].__name__, str(reply.result_data[1]))
+ reply.result_data[1].tb_printed = True
+ raise reply.result_data[1]
+ elif reply.result == jobs.YumJob.RESULT_TERMINATED:
+ cmpi_logging.logger.warn('YumDB: %s terminated', job)
+ else:
+ cmpi_logging.logger.debug('YumDB: %s completed with success', job)
+
+def _make_async_job(jobcls, *args, **kwargs):
+ """Creates asynchronous job, filling it wih some metadata."""
+ if not issubclass(jobcls, jobs.YumAsyncJob):
+ raise TypeError("jobcls must be a subclass of YumAsyncJob")
+ job = jobcls(*args, **kwargs)
+ if job.metadata is None:
+ job.metadata = {}
+ job.metadata['name'] = \
+ type(job).__name__[len('Yum'):] + ('-%d' % job.jobid)
+ return job
+
+# *****************************************************************************
+# Decorators
+# *****************************************************************************
+def job_request(async=False):
+ """
+ Decorator factory for job entry points. They are YumDB methods.
+ All of them must return either job objects or jobid for asynchronous calls.
+ Job objects are processed by this decorator for caller to obtain only the
+ information he needs.
+
+ It wrapps them with logger wrapper and in case of asynchronous jobs,
+ it returns just the jobid.
+ """
+ def _decorator(method):
+ """
+ Decorator that just logs the method's call and returns job's result.
+ """
+ logged = cmpi_logging.trace_method(method)
+ def _new_func(self, *args, **kwargs):
+ """Wrapper for YumDB's method."""
+ return logged(self, *args, **kwargs).result_data
+ return _new_func
+
+ def _decorator_async(method):
+ """
+ Decorator for methods accepting async argument. In case of async=True,
+ the method returns jobid. Job's result is returned otherwise.
+ """
+ logged = cmpi_logging.trace_method(method)
+ def _new_func(self, *args, **kwargs):
+ """Wrapper for YumDB's method."""
+ callargs = inspect.getcallargs(method, self, *args, **kwargs)
+ result = logged(self, *args, **kwargs)
+ if callargs.get('async', False):
+ return result
+ else:
+ return result.result_data
+ return _new_func
+
+ return _decorator_async if async else _decorator
+
+class YumDB(singletonmixin.Singleton):
+ """
+ Context manager for accessing yum/rpm database.
+ All requests are bundled into jobs -- instances of jobs.YumJob and
+ sent to YumWorker for processing.
+
+ YumWorker is a separate process handling all calls to yum api.
+ Communication is done via queues (uplink and downlink).
+ Uplink is used to send jobs to YumWorker and downlink for obtaining
+ results.
+
+ This is implemented in thread safe manner.
+
+ It should be used as a context manager in case, we want to process
+ multiple jobs in single transaction. The example of usage:
+ with YumDB.getInstance() as ydb:
+ pkgs = ydb.filter_packages(...)
+ for pkg in pkgs:
+ ydb.install_package(pkg)
+ ...
+ Yum database stays locked in whole block of code under with statement.
+ """
+
+ # this is to inform Singleton, that __init__ should be called only once
+ ignoreSubsequent = True
+
+ # This serves to all code base as a global variable used to check,
+ # whether YumDB instance is running under cimom broker or under worker
+ # process. This is important for code used in callback functions passed
+ # to worker responsible for creating instances of ConcreteJob. This code
+ # must avoid using calls to YumDB while running under worker. This
+ #
+ # Worker process must set this to False before starting its event handling
+ # loop.
+ RUNNING_UNDER_CIMOM_PROCESS = True
+
+ @cmpi_logging.trace_method
+ def __init__(self, **kwargs): #pylint: disable=W0231
+ """
+ All arguments are passed to yum.YumBase constructor.
+ """
+ self._process = None
+ if kwargs is None:
+ kwargs = {}
+ self._yum_kwargs = kwargs
+
+ self._session_lock = threading.RLock()
+ self._session_level = 0
+
+ # used to guard access to _expected list and _process
+ self._reply_lock = threading.Lock()
+ # used to wait for job to be processed and received
+ self._reply_cond = threading.Condition(self._reply_lock)
+ # ids of all expected jobs -- those to be processed by YumWorker
+ self._expected = []
+ # {job_id : reply, ... }
+ self._replies = {}
+ cmpi_logging.logger.trace_info('YumDB: initialized')
+
+ # *************************************************************************
+ # Private methods
+ # *************************************************************************
+ @cmpi_logging.trace_method
+ def _handle_reply_timeout(self, job):
+ """
+ This is called when timeout occurs while waiting on downlink queue for
+ reply. Delay can be caused by worker process's early termination (bug).
+ This handler tries to recover from such an situation.
+ """
+ if not self._worker.is_alive():
+ if self._worker.exitcode < 0:
+ cmpi_logging.logger.error("[jobid=%d] worker"
+ " process(pid=%d) killed by signal %s", job.jobid,
+ self._worker.pid, get_signal_name(-self._process.exitcode))
+ else:
+ cmpi_logging.logger.error("[jobid=%d] worker"
+ " process(pid=%d) is dead - exit code: %d",
+ job.jobid, self._process.pid, self._worker.exitcode)
+ with self._reply_lock:
+ self._process = None
+ cmpi_logging.logger.error(
+ "[jobid=%d] starting new worker process", job.jobid)
+ self._expected = []
+ if not isinstance(job, jobs.YumBeginSession):
+ with self._session_lock:
+ if self._session_level > 0:
+ cmpi_logging.logger.info('restoring session '
+ 'level=%d', self._session_level)
+ new_session_job = jobs.YumBeginSession()
+ self._worker.uplink.put(new_session_job)
+ reply = self._worker.downlink.get()
+ log_reply_error(new_session_job, reply)
+ self._worker.uplink.put(job)
+ self._expected.append(job.jobid)
+ # other waiting processes need to resend their requests
+ self._reply_cond.notifyAll()
+ else:
+ cmpi_logging.logger.info("[jobid=%d] process is running,"
+ " waiting some more", job.jobid)
+
+ @cmpi_logging.trace_method
+ def _receive_reply(self, job):
+ """
+ Block on downlink queue to receive expected replies from worker
+ process. Only one thread can be executing this code at any time.
+
+ Only one thread can block on downlink channel to obtain reply. If
+ it's reply for him, he takes it and leaves, otherwise he adds it to
+ _replies dictionary and notifies other threads. This thread is the
+ one, whose job appears as first in _expected list.
+
+ In case, that worker process terminated due to some error. Restart it
+ and resend all the job requests again.
+ """
+ while True:
+ cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue",
+ job.jobid)
+ try:
+ jobout = self._worker.downlink.get(
+ block=True, timeout=MAX_JOB_WAIT_TIME)
+ if jobout.jobid == job.jobid:
+ cmpi_logging.logger.debug(
+ "[jobid=%d] received desired reply", job.jobid)
+ with self._reply_lock:
+ self._expected.remove(job.jobid)
+ if len(self._expected):
+ self._reply_cond.notify()
+ return jobout
+ else:
+ cmpi_logging.logger.info("[jobid=%d] received reply"
+ " for another thread (jobid=%d)",
+ job.jobid, jobout.jobid)
+ with self._reply_lock:
+ self._replies[jobout.jobid] = jobout
+ self._reply_cond.notifyAll()
+ except TQueue.Empty:
+ cmpi_logging.logger.warn("[jobid=%d] wait for job reply timeout"
+ "(%d seconds) occured", job.jobid, MAX_JOB_WAIT_TIME)
+ self._handle_reply_timeout(job)
+
+ @cmpi_logging.trace_method
+ def _send_and_receive(self, job):
+ """
+ Sends a request to server and blocks until job is processed by
+ YumWorker and reply is received.
+
+ Only one thread can block on downlink channel to obtain reply. This
+ thread is the one, whose job appears as first in _expected list. Server
+ processes input jobs sequentially. That's why it's safe to presume,
+ that jobs are received in the same order as they were send. Thanks to
+ that we don't have to care about receiving replies for the other
+ waiting threads.
+
+ @return result of job
+ """
+ with self._reply_lock:
+ self._worker.uplink.put(job)
+ if getattr(job, 'async', False) is True:
+ return job.jobid
+ self._expected.append(job.jobid)
+ while True:
+ if job.jobid in self._replies:
+ cmpi_logging.logger.debug(
+ "[jobid=%d] desired reply already received",
+ job.jobid)
+ try:
+ self._expected.remove(job.jobid)
+ except ValueError:
+ cmpi_logging.logger.warn(
+ "[jobid=%d] reply not in expected list",
+ job.jobid)
+ return self._replies.pop(job.jobid)
+ elif job.jobid not in self._expected:
+ # process terminated, resending job
+ cmpi_logging.logger.warn("[jobid=%d] job removed"
+ " from expected list, sending request again", job.jobid)
+ self._worker.uplink.put(job)
+ self._expected.append(job.jobid)
+ elif job.jobid == self._expected[0]:
+ # now it's our turn to block on downlink
+ break
+ else: # another thread blocks on downlink -> let's sleep
+ cmpi_logging.logger.debug(
+ "[jobid=%d] another %d threads expecting reply,"
+ " suspending...", job.jobid, len(self._expected) - 1)
+ self._reply_cond.wait()
+ cmpi_logging.logger.debug(
+ "[jobid=%d] received reply, waking up", job.jobid)
+ return self._receive_reply(job)
+
+ def _do_job(self, job):
+ """
+ Sends the job to YumWorker process and waits for reply.
+ If reply is a tuple, there was an error, while job processing.
+ Incoming exception is in format:
+ (exception_type, exception_value, formated_traceback_as_string)
+ @return reply
+ """
+ cmpi_logging.logger.trace_verbose("YumDB: doing %s", job)
+ reply = self._send_and_receive(job)
+ log_reply_error(job, reply)
+ cmpi_logging.logger.trace_verbose("YumDB: job %s done", job.jobid)
+ return reply
+
+ @property
+ def _worker(self):
+ """
+ YumWorker process accessor. It's created upon first need.
+ """
+ if self._process is None:
+ cmpi_logging.logger.trace_info("YumDB: starting YumWorker")
+ uplink = Queue()
+ downlink = Queue()
+ self._process = YumWorker(uplink, downlink,
+ indication_manager=IndicationManager.get_instance(),
+ yum_kwargs=self._yum_kwargs)
+ #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG)
+ self._process.start()
+ cmpi_logging.logger.trace_info(
+ "YumDB: YumWorker started with pid=%s", self._process.pid)
+ return self._process
+
+ # *************************************************************************
+ # Special methods
+ # *************************************************************************
+ def __del__(self):
+ """
+ Ensure, that YumWorker process is correctly shutted down.
+ """
+ self.clean_up()
+ singletonmixing.Singleton.__del__(self)
+
+ @cmpi_logging.trace_method
+ def __enter__(self):
+ with self._session_lock:
+ if self._session_level == 0:
+ self._do_job(jobs.YumBeginSession())
+ cmpi_logging.logger.trace_info('YumDB: new session started')
+ self._session_level += 1
+ cmpi_logging.logger.trace_info('YumDB: nested to session level=%d',
+ self._session_level)
+ return self
+
+ @cmpi_logging.trace_method
+ def __exit__(self, exc_type, exc_value, traceback):
+ with self._session_lock:
+ if self._session_level == 1:
+ self._do_job(jobs.YumEndSession())
+ cmpi_logging.logger.trace_info('YumDB: session ended')
+ cmpi_logging.logger.trace_info('YumDB: emerged from session'
+ ' level=%d', self._session_level)
+ self._session_level = max(self._session_level - 1, 0)
+
+ # *************************************************************************
+ # Public methods
+ # *************************************************************************
+ @cmpi_logging.trace_method
+ def clean_up(self):
+ """
+ Shut down the YumWorker process.
+ """
+ with self._reply_lock:
+ if self._process is not None:
+ cmpi_logging.logger.info('YumDB: terminating YumWorker')
+ self._process.uplink.put(None) # terminating command
+ self._process.join()
+ cmpi_logging.logger.info('YumDB: YumWorker terminated')
+ self._process = None
+ else:
+ cmpi_logging.logger.warn("YumDB: clean_up called, when process"
+ " not initialized!")
+
+ # *************************************************************************
+ # Jobs with simple results
+ # *************************************************************************
+ @job_request()
+ def get_package_list(self, kind,
+ allow_duplicates=False,
+ sort=False,
+ include_repos=None,
+ exclude_repos=None):
+ """
+ @param kind is one of: jobs.YumGetPackageList.SUPPORTED_KINDS
+ @param allow_duplicates says, whether to list all found versions
+ of single package
+ @return [pkg1, pkg2, ...], pkgi is instance of yumdb.PackageInfo
+ """
+ return self._do_job(jobs.YumGetPackageList(
+ kind, allow_duplicates=allow_duplicates, sort=sort,
+ include_repos=include_repos, exclude_repos=exclude_repos))
+
+ @job_request()
+ def filter_packages(self, kind,
+ allow_duplicates=False,
+ sort=False,
+ include_repos=None,
+ exclude_repos=None,
+ **filters):
+ """
+ Similar to get_package_list(), but applies filter on packages.
+ @see yumdb.jobs.YumFilterPackages job for supported filter keys
+ """
+ return self._do_job(jobs.YumFilterPackages(
+ kind, allow_duplicates=allow_duplicates, sort=sort,
+ include_repos=include_repos, exclude_repos=exclude_repos,
+ **filters))
+
+ @job_request()
+ def get_repository_list(self, kind):
+ """
+ @param kind is one of: jobs.YumGetRepositoryList.SUPPORTED_KINDS
+ @param allow_duplicates says, whether to list all found versions
+ of single package
+ @return [pkg1, pkg2, ...], pkgi is instance of yumdb.Repository
+ """
+ return self._do_job(jobs.YumGetRepositoryList(kind))
+
+ @job_request()
+ def filter_repositories(self, kind, **filters):
+ """
+ Similar to get_repository_list(), but applies filter on packages.
+ @see yumdb.jobs.YumFilterRepositories job for supported filter keys
+ """
+ return self._do_job(jobs.YumFilterRepositories(kind, **filters))
+
+ @job_request()
+ def set_repository_enabled(self, repoid, enable):
+ """
+ Enable or disable repository.
+ @param enable is a boolean
+ """
+ return self._do_job(jobs.YumSetRepositoryEnabled(repoid, enable))
+
+ # *************************************************************************
+ # Asynchronous jobs
+ # *************************************************************************
+ @job_request(async=True)
+ def install_package(self, pkg, async=False, force=False, **metadata):
+ """
+ Install package.
+ @param pkg is an instance of PackageInfo obtained with
+ get_package_list() or filter_packages() or a valid nevra as string.
+ Package must not be installed if force is False.
+ """
+ return self._do_job(_make_async_job(jobs.YumInstallPackage,
+ pkg, force=force, async=async, metadata=metadata))
+
+ @job_request(async=True)
+ def remove_package(self, pkg, async=False, **metadata):
+ """
+ @param pkg is an instance of PackageInfo obtained with
+ get_package_list() or filter_packages(), which must be installed
+ """
+ return self._do_job(_make_async_job(jobs.YumRemovePackage,
+ pkg, async=async, metadata=metadata))
+
+ @job_request(async=True)
+ def update_to_package(self, desired_pkg, async=False, **metadata):
+ """
+ @param desired_pkg is an instance of PackageInfo,
+ which must be available
+ """
+ return self._do_job(_make_async_job(jobs.YumUpdateToPackage,
+ desired_pkg, async=async, metadata=metadata))
+
+ @job_request(async=True)
+ def update_package(self, pkg,
+ async=False,
+ to_epoch=None,
+ to_version=None,
+ to_release=None,
+ force=False,
+ **metadata):
+ """
+ @param pkg is an instance of PackageInfo, which must be installed
+
+ The other parameters filter candidate available packages for update.
+ """
+ return self._do_job(_make_async_job(jobs.YumUpdatePackage,
+ pkg, async, to_epoch, to_version, to_release, force=force,
+ metadata=metadata))
+
+ @job_request(async=True)
+ def check_package(self, pkg, async=False, **metadata):
+ """
+ Return all necessary information from package database for package
+ verification.
+
+ :param pkg: (``PackageInfo``) An instance of PackageInfo
+ representing installed package or its nevra string.
+ :rtype: (``PackageCheck``)
+ """
+ return self._do_job(_make_async_job(jobs.YumCheckPackage,
+ pkg, async=async, metadata=metadata))
+
+ @job_request(async=True)
+ def check_package_file(self, pkg, file_name, async=False):
+ """
+ Return all necessary information from package database concerning
+ on particular file of package. If ``pkg`` does not contain
+ ``file_name``, ``FileNotFound`` error is raised.
+
+ :param pkg: (``PackageInfo``) An instance of PackageInfo
+ representing installed package or its nevra string.
+ :rtype: (``PackageFile``)
+ """
+ return self._do_job(_make_async_job(jobs.YumCheckPackageFile,
+ pkg, file_name, async=async))
+
+ @job_request(async=True)
+ def install_package_from_uri(self, uri,
+ async=False, update_only=False, force=False, **metadata):
+ """
+ Install package from uri.
+ @param uri is either remote url or local path.
+ """
+ return self._do_job(_make_async_job(jobs.YumInstallPackageFromURI,
+ uri, async, update_only, force=force, metadata=metadata))
+
+ # *************************************************************************
+ # Control of asynchronous jobs
+ # *************************************************************************
+ @job_request()
+ def get_job(self, jobid):
+ """
+ Return instance of ``YumJob`` with given ``jobid``.
+ """
+ return self._do_job(jobs.YumJobGet(jobid))
+
+ @job_request()
+ def get_job_list(self):
+ """
+ Return list of all asynchronous jobs.
+ """
+ return self._do_job(jobs.YumJobGetList())
+
+ @job_request()
+ def get_job_by_name(self, name):
+ """
+ Return asynchronous job filtered by its name.
+ """
+ return self._do_job(jobs.YumJobGetByName(name))
+
+ @job_request()
+ def set_job_priority(self, jobid, priority):
+ """
+ Change priority of asynchronous job. This will change
+ its order in queue, if it is still enqeueued.
+
+ Return object of job.
+ """
+ return self._do_job(jobs.YumJobSetPriority(jobid, priority))
+
+ @job_request()
+ def update_job(self, jobid, **kwargs):
+ """
+ Update metadata of job.
+
+ :param kwargs: (``dict``) Is a dictionary of job's property names
+ with mapped new values. Only keys given will be changed in
+ desired job.
+
+ **Note** that only keys, that do not affect job's priority or its
+ scheduling for deletion can be changed. See :ref:`YumJobUpdate`.
+ """
+ return self._do_job(jobs.YumJobUpdate(jobid, **kwargs))
+
+ @job_request()
+ def reschedule_job(self, jobid,
+ delete_on_completion, time_before_removal):
+ """
+ Change the scheduling of job for deletion.
+
+ :param delete_on_completion: (``bool``) Says, whether the job will
+ be scheduled for deletion at ``finished + time_before_removal``
+ time.
+ :param time_before_removal: (``int``) Number of seconds, after the job
+ is finished, it will be kept alive.
+ """
+ return self._do_job(jobs.YumJobReschedule(jobid,
+ delete_on_completion, time_before_removal))
+
+ @job_request()
+ def delete_job(self, jobid):
+ """
+ Delete job object. This can be called only on finished job.
+ """
+ return self._do_job(jobs.YumJobDelete(jobid))
+
+ @job_request()
+ def terminate_job(self, jobid):
+ """
+ Terminate job. This can be called only on *NEW* job.
+ """
+ return self._do_job(jobs.YumJobTerminate(jobid))
+
diff --git a/src/software/lmi/software/yumdb/errors.py b/src/software/lmi/software/yumdb/errors.py
new file mode 100644
index 0000000..44eb1c7
--- /dev/null
+++ b/src/software/lmi/software/yumdb/errors.py
@@ -0,0 +1,111 @@
+# -*- encoding: utf-8 -*-
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Exceptions raisable by YumWorker.
+"""
+
+class YumDBError(Exception):
+ """Base class for all errors under yumdb package."""
+
+class DatabaseLockError(YumDBError):
+ """Raised, when the yum database can not be locked."""
+ pass
+
+class TransactionError(YumDBError):
+ """Base exception representing yum transaction processing error."""
+ pass
+class TransactionBuildFailed(TransactionError):
+ """Raised, when transaction building fails."""
+ pass
+class PackageAlreadyInstalled(TransactionError):
+ """Raised, when trying to install already installed package."""
+ def __init__(self, pkg):
+ TransactionError.__init__(self,
+ 'Package "%s" is already installed.' % pkg)
+class PackageOpenError(TransactionError):
+ """Raised, when trying to open package obtained from URI."""
+ def __init__(self, pkg, msg):
+ TransactionError.__init__(self,
+ 'Failed to open package "%s": %s' % (pkg, msg))
+class TransactionExecutionFailed(TransactionError):
+ """Raised, when YumBase.doTransaction() method fails."""
+ pass
+
+class PackageError(YumDBError):
+ """Generic exception for error concerning package handling."""
+ pass
+class PackageNotFound(PackageError):
+ """Raised, when requested package could not be found."""
+ pass
+class PackageNotInstalled(PackageError):
+ """Raised, when requested package is not installed for desired action."""
+ def __init__(self, pkg):
+ PackageError.__init__(self, 'Package "%s" is not installed.' % pkg)
+class FileNotFound(PackageError):
+ """
+ Raised, when requesting check on file that does not belong to
+ particular package.
+ """
+ pass
+
+class RepositoryError(YumDBError):
+ """Generic exception for error concerning repository handling."""
+ pass
+class RepositoryNotFound(RepositoryError):
+ """Raised, when requested repository cound not be found."""
+ def __init__(self, repoid):
+ RepositoryError.__init__(self, "No such repository: %s" % repoid)
+class RepositoryChangeError(RepositoryError):
+ """Raised, when modification of repository failed."""
+ pass
+
+class JobError(YumDBError):
+ """Generic exception for job handling."""
+ pass
+class UnknownJob(JobError):
+ """Raised, when no handler is available for given job on worker."""
+ pass
+class InvalidURI(JobError):
+ """Raised, when passed uri is not a valid one."""
+ def __init__(self, uri):
+ JobError.__init__(self, "Invalid uri: \"%s\"" % uri)
+class InvalidNevra(JobError):
+ """Raised when trying to instantiate job with invalid nevra string."""
+ pass
+class JobControlError(JobError):
+ """Generic exception for management of asynchronous jobs."""
+ pass
+class JobNotFound(JobControlError):
+ """Raised upon request for not existing asynchronous job."""
+ def __init__(self, target):
+ JobControlError.__init__(self, "job %s could not be found" % target)
+class InvalidJobState(JobControlError):
+ """
+ Raised when requested operation can not be executed on job in
+ its current state.
+ """
+ pass
+
+class IndicationError(YumDBError):
+ """Generic error for indication handling."""
+ pass
+
diff --git a/src/software/lmi/software/yumdb/jobmanager.py b/src/software/lmi/software/yumdb/jobmanager.py
new file mode 100644
index 0000000..a224905
--- /dev/null
+++ b/src/software/lmi/software/yumdb/jobmanager.py
@@ -0,0 +1,574 @@
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+This is a module for ``JobManager`` which is a separate thread of
+``YumWorker`` process. It keeps a cache of asynchronous jobs and handles
+input and output queues.
+
+This module uses its own logging facilities because it runs in separeted
+process not having access to broker logging features.
+
+Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should
+be set to callable taking ``YumJob`` instance and returning its matching
+CIM abstraction instance.
+"""
+import heapq
+import inspect
+import logging
+import Queue
+import sys
+import threading
+import time
+import traceback
+
+from lmi.common.IndicationManager import IndicationManager
+from lmi.common.JobManager import JobManager as JM
+from lmi.software.yumdb import errors, jobs
+from lmi.software.yumdb.util import trace_function
+
+# This is a callable, which must be initialized before JobManager is used.
+# It should be a pointer to function, which takes a job and returns
+# corresponding CIM instance. It's used for sending indications.
+JOB_TO_MODEL = lambda job: None
+
+# Minimum time to keep asynchronous job in cache after completion. In seconds.
+MINIMUM_TIME_BEFORE_REMOVAL = 10
+
+# replacement for cmpi_logging.logger
+LOG = None
+
+# *****************************************************************************
+# Decorators
+# *****************************************************************************
+def job_handler(job_from_target=True):
+ """
+ Decorator for JobManager methods serving as handlers for control jobs.
+
+ Decorator locks the job_lock of manager's instance.
+ """
+ def _wrapper_jft(method):
+ """
+ It consumes "target" keyword argument (which is job's id) and makes
+ it an instance of YumJob. The method is then called with "job" argument
+ instead of "target".
+ """
+ logged = trace_function(method)
+
+ def _new_func(self, *args, **kwargs):
+ """Wrapper around method."""
+ if 'target' in kwargs:
+ kwargs['job'] = kwargs.pop('target')
+ callargs = inspect.getcallargs(method, self, *args, **kwargs)
+ target = callargs.pop('job')
+ with self._job_lock: #pylint: disable=W0212
+ if not target in self._async_jobs: #pylint: disable=W0212
+ raise errors.JobNotFound(target)
+ job = self._async_jobs[target] #pylint: disable=W0212
+ callargs['job'] = job
+ return logged(**callargs)
+ return _new_func
+
+ def _simple_wrapper(method):
+ """Just locks the job lock."""
+ def _new_func(self, *args, **kwargs):
+ """Wrapper around method."""
+ with self._job_lock: #pylint: disable=W0212
+ return method(self, *args, **kwargs)
+ return _new_func
+
+ if job_from_target:
+ return _wrapper_jft
+ else:
+ return _simple_wrapper
+
+class JobIndicationSender(object):
+ """
+ Makes creation and sending of indications easy. It keeps a reference
+ to job, which can be *snapshotted* for making CIM instance out of it.
+ These instances are then used to send indications via IndicationManager.
+
+ Typical usage::
+
+ sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2])
+ ... # modify job
+ sender.snapshot()
+ sender.send()
+
+ **Note** that number of kept CIM instances won't exceed 2. First one
+ is created upon instantiation and the second one be calling
+ ``snapshot()``. Any successive call to ``snapshot()`` will overwrite
+ the second instance.
+ """
+
+ def __init__(self, indication_manager, job,
+ indications=JM.IND_JOB_CHANGED, new=None):
+ """
+ :param job (``YumJob``) Is job instance, which will be immediately
+ snapshoted as old instance and later as a new one.
+ :param indications (``list``) Can either be a list of indication ids
+ or a single indication id.
+ :param new (``YumJob``) A job instance stored as new.
+ """
+ if not isinstance(indication_manager, IndicationManager):
+ raise TypeError("indication_manager must be a subclass of"
+ " IndicationManager")
+ if not isinstance(job, jobs.YumJob):
+ raise TypeError("job must be an instance of YumJob")
+ if not new is None and not isinstance(new, jobs.YumJob):
+ raise TypeError("new must be an instance of YumJob")
+ self._indication_manager = indication_manager
+ self._job = job
+ self._old_instance = JOB_TO_MODEL(job)
+ if new is not None:
+ new = JOB_TO_MODEL(job)
+ self._new_instance = new
+ self._indications = set()
+ self.indication_ids = indications
+
+ @property
+ def job(self):
+ """
+ Return instance of ``YumJob``.
+ """
+ return self._job
+
+ @property
+ def indication_ids(self):
+ """
+ Return set of indication filter IDs.
+ """
+ return self._indications.copy()
+
+ @indication_ids.setter
+ def indication_ids(self, indication_ids):
+ """
+ Set the indication filter IDs.
+
+ :param indication_ids (``list``) Can be even single id.
+ """
+ if isinstance(indication_ids, basestring):
+ indication_ids = set([indication_ids])
+ self._indications = set(indication_ids)
+
+ @trace_function
+ def add_indication_ids(self, indication_ids):
+ """
+ Add filter IDs.
+ """
+ if isinstance(indication_ids, basestring):
+ indication_ids = set([indication_ids])
+ self._indications.update(indication_ids)
+
+ @trace_function
+ def snapshot(self):
+ """
+ Make a second CIM instance, overwriting previous one (not the first).
+ """
+ self._new_instance = JOB_TO_MODEL(self._job)
+
+ @trace_function
+ def send(self, make_snapshot=False):
+ """
+ Send all requested indications for given job.
+ """
+ if not self._indications:
+ raise errors.IndicationError(
+ "can not send any indication without id")
+ if make_snapshot:
+ self.snapshot()
+ if ( JM.IND_JOB_CHANGED in self._indications
+ and self._new_instance is None):
+ raise errors.IndicationError("no snapshot made for modified job")
+ for fltr_id in self._indications:
+ if fltr_id == JM.IND_JOB_CREATED:
+ LOG.debug("sending instance creation indication for job %s",
+ self._job)
+ self._indication_manager.send_instcreation(
+ self._new_instance if self._new_instance is not None
+ else self._old_instance,
+ fltr_id)
+ else:
+ LOG.debug("sending instance modification indication for job %s"
+ " with ID: %s", self._job, fltr_id)
+ self._indication_manager.send_instmodification(
+ self._old_instance, self._new_instance,
+ fltr_id)
+
+class JobManager(threading.Thread):
+ """
+ Separate thread for managing queue of jobs requested by client.
+ There are three kinds of jobs, that are handled differently:
+ * asynchronous - kept in _async_jobs dictionary until job is
+ deleted by request or it expires;
+ no reply is sent to client upon job's completion
+ * synchronous - reply is sent to client after job's completion;
+ no reference to the job is kept afterwards
+ * job control - they are not enqueued in _job_queue for YumWorker
+ to process, but are handled directly and in the FIFO order
+
+ Both asynchronous and synchronous jobs are enqueued in _job_queue
+ for YumWorker to obtain them. It's a priority queue sorting jobs by their
+ priority.
+ """
+ # enumeration of actions, that may be enqueued in calendar
+ ACTION_REMOVE = 0
+
+ ACTION_NAMES = ['remove']
+
+ def __init__(self, queue_in, queue_out, indication_manager):
+ threading.Thread.__init__(self, name="JobManager")
+ self._queue_in = queue_in
+ self._queue_out = queue_out
+ self._indication_manager = indication_manager
+ self._terminate = False
+
+ # (time, jobid, action)
+ self._calendar = []
+ # {jobid : job}
+ self._async_jobs = {}
+
+ # lock for critical access to _calendar, _async_jobs and _job_queue
+ self._job_lock = threading.RLock()
+ # priority queue of jobs that are processed by YumWorker
+ self._job_queue = []
+ # condition for YumWorker waiting on empty _job_queue
+ self._job_enqueued = threading.Condition(self._job_lock)
+
+ # *************************************************************************
+ # Private methods
+ # *************************************************************************
+ @trace_function
+ def _control_job(self, job):
+ """
+ Function dispatching job to handler for particular YumJob subclass.
+ """
+ try:
+ handler = {
+ # these are from YumDB client
+ jobs.YumJobGetList : self._handle_get_list,
+ jobs.YumJobGet : self._handle_get,
+ jobs.YumJobGetByName : self._handle_get_by_name,
+ jobs.YumJobSetPriority : self._handle_set_priority,
+ jobs.YumJobReschedule : self._handle_reschedule,
+ jobs.YumJobUpdate : self._handle_update,
+ jobs.YumJobDelete : self._handle_delete,
+ jobs.YumJobTerminate : self._handle_terminate,
+ }[job.__class__]
+ LOG.info("processing control job %s", str(job))
+ except KeyError:
+ raise errors.UnknownJob("No handler for job \"%s\"." %
+ job.__class__.__name__)
+ return handler(**job.job_kwargs)
+
+ @trace_function
+ def _enqueue_job(self, job):
+ """
+ Insert incoming job into _job_queue.
+ """
+ if isinstance(job, jobs.YumJobControl):
+ result = job.RESULT_SUCCESS
+ job.start()
+ try:
+ data = self._control_job(job)
+ except Exception: #pylint: disable=W0703
+ result = job.RESULT_ERROR
+ data = sys.exc_info()
+ data = (data[0], data[1], traceback.format_tb(data[2]))
+ LOG.exception("control job %s failed", job)
+ job.finish(result, data)
+ LOG.debug("sending reply for %s: (%s, %s)", job,
+ job.ResultNames[job.result], job.result_data)
+ self._queue_out.put(job)
+ else:
+ if job is None:
+ LOG.debug('received terminating command')
+ self._terminate = True
+ LOG.debug('job %s enqued for YumWorker to handle', job)
+ heapq.heappush(self._job_queue, job)
+ if getattr(job, 'async', False) is True:
+ ind = self._prepare_indication_for(job, JM.IND_JOB_CREATED)
+ self._async_jobs[job.jobid] = job
+ ind.send()
+ self._job_enqueued.notify()
+
+ @trace_function
+ def _schedule_event(self, after, jobid, action):
+ """
+ Enqueue event into calendar. Event consists of time, jobid and
+ action.
+ """
+ schedule_at = time.time() + after
+ for (sched, jid, act) in self._calendar:
+ if jid == jobid and act == action:
+ if sched <= schedule_at: # same event already scheduled
+ return
+ # schedule it for early time
+ LOG.debug('rescheduling action %s on job %d to take place'
+ ' after %d seconds (instead of %d)',
+ self.ACTION_NAMES[action], jid, after,
+ sched - schedule_at + after)
+ self._calendar.remove((sched, jid, act))
+ self._calendar.append((schedule_at, jid, act))
+ heapq.heapify(self._calendar)
+ return
+ LOG.debug('scheduling action %s on job %d to take place after '
+ ' %d seconds', self.ACTION_NAMES[action], jobid, after)
+ heapq.heappush(self._calendar, (schedule_at, jobid, action))
+
+ @trace_function
+ def _run_event(self, jobid, action):
+ """
+ Process event from calendar.
+ """
+ if action == self.ACTION_REMOVE:
+ with self._job_lock:
+ del self._async_jobs[jobid]
+ else:
+ msg = "unsupported action: %s" % action
+ raise ValueError(msg)
+
+ @trace_function
+ def _prepare_indication_for(self, job, *args, **kwargs):
+ """
+ Return instance of ``JobIndicationSender``.
+ """
+ return JobIndicationSender(self._indication_manager, job,
+ *args, **kwargs)
+
+ # *************************************************************************
+ # Job handlers
+ # *************************************************************************
+ @job_handler()
+ def _handle_get(self, job): #pylint: disable=R0201
+ """@return job object"""
+ return job
+
+ @job_handler(False)
+ def _handle_get_list(self):
+ """@return list of all asynchronous jobs"""
+ with self._job_lock:
+ return sorted(self._async_jobs.values())
+
+ @job_handler(False)
+ def _handle_get_by_name(self, target):
+ """@return job object filtered by name"""
+ for job in self._async_jobs.values():
+ if 'name' in job.metadata and target == job.metadata['name']:
+ return job
+ raise errors.JobNotFound(target)
+
+ @job_handler()
+ def _handle_set_priority(self, job, new_priority):
+ """
+ Modify job's priority and updates its position in queue.
+ @return modified job object
+ """
+ if not isinstance(new_priority, (int, long)):
+ raise TypeError('priority must be an integer')
+ if job.priority != new_priority:
+ ind = self._prepare_indication_for(job)
+ job.update(priority=new_priority)
+ if job in self._job_queue:
+ heapq.heapify(self._job_queue)
+ ind.send(True)
+ return job
+
+ @job_handler()
+ def _handle_reschedule(self, job,
+ delete_on_completion,
+ time_before_removal):
+ """
+ Changes job's schedule for its deletion.
+ """
+ if ( job.delete_on_completion == delete_on_completion
+ and job.time_before_removal == time_before_removal):
+ return
+ if job.finished and job.delete_on_completion:
+ for i, event in enumerate(self._calendar):
+ if event[1] == job.jobid and event[2] == self.ACTION_REMOVE:
+ del self._calendar[i]
+ heapq.heapify(self._calendar)
+ break
+ ind = self._prepare_indication_for(job)
+ if delete_on_completion:
+ schedule_at = time_before_removal
+ if job.finished:
+ schedule_at = job.finished + schedule_at - time.time()
+ self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE)
+ job.delete_on_completion = delete_on_completion
+ job.time_before_removal = time_before_removal
+ ind.send(True)
+ return job
+
+ @job_handler()
+ def _handle_update(self, job, data): #pylint: disable=R0201
+ """
+ Updates any job metadata.
+ """
+ ind = self._prepare_indication_for(job)
+ job.update(**data)
+ ind.send(True)
+ return job
+
+ @job_handler()
+ def _handle_delete(self, job):
+ """
+ Deletes finished asynchronous job.
+ """
+ if not job.finished:
+ raise errors.InvalidJobState(
+ 'can not delete unfinished job "%s"' % job)
+ try:
+ self._job_queue.remove(job)
+ heapq.heapify(self._job_queue)
+ LOG.debug('job "%s" removed from queue', job)
+ except ValueError:
+ LOG.debug('job "%s" not started and not enqueued', job)
+ del self._async_jobs[job.jobid]
+ return job
+
+ @job_handler()
+ def _handle_terminate(self, job):
+ """
+ Terminates not started job.
+ """
+ if job.started and not job.finished:
+ raise errors.InvalidJobState('can not kill running job "%s"' % job)
+ if job.finished:
+ raise errors.InvalidJobState('job "%s" already finished' % job)
+ self._job_queue.remove(job)
+ heapq.heapify(self._job_queue)
+ ind = self._prepare_indication_for(job)
+ job.finish(result=job.RESULT_TERMINATED)
+ ind.send(True)
+ LOG.info('terminated not started job "%s"', job)
+ return job
+
+ # *************************************************************************
+ # Public properties
+ # *************************************************************************
+ @property
+ def queue_in(self):
+ """Incoming queue for YumJob instances."""
+ return self._queue_in
+
+ @property
+ def queue_out(self):
+ """Output queue for results."""
+ return self._queue_out
+
+ # *************************************************************************
+ # Public methods
+ # *************************************************************************
+ @trace_function
+ def finish_job(self, job, result, result_data):
+ """
+ This should be called for any job by YumWorker after the job is
+ processed.
+
+ If the job is synchronous, reply is send at once. Otherwise the result
+ is stored for later client's query in the job itself.
+ """
+ with self._job_lock:
+ if job.state != job.RUNNING:
+ raise errors.InvalidJobState(
+ 'can not finish not started job "%s"' % job)
+ if getattr(job, 'async', False):
+ ind = self._prepare_indication_for(job,
+ (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
+ job.finish(result, result_data)
+ if getattr(job, 'async', False):
+ if job.delete_on_completion:
+ schedule_at = max( job.time_before_removal
+ , MINIMUM_TIME_BEFORE_REMOVAL)
+ self._schedule_event(schedule_at, job.jobid,
+ self.ACTION_REMOVE)
+ if result == job.RESULT_SUCCESS:
+ ind.add_indication_ids(JM.IND_JOB_SUCCEEDED)
+ elif result == job.RESULT_ERROR:
+ ind.add_indication_ids(JM.IND_JOB_FAILED)
+ ind.send(True)
+ else:
+ LOG.debug("sending reply for %s: (%s, %s)", job,
+ job.ResultNames[job.result], job.result_data)
+ self._queue_out.put(job)
+ return job
+
+ @trace_function
+ def get_job(self, block=True, timeout=None):
+ """
+ Method supposed to be used only by YumWorker. It pops the first job
+ from _job_queue, starts it and returns it.
+ """
+ start = time.time()
+ with self._job_lock:
+ if len(self._job_queue) == 0 and not block:
+ raise Queue.Empty
+ while len(self._job_queue) == 0:
+ if timeout:
+ LOG.debug('waiting for job for %s seconds' % timeout)
+ self._job_enqueued.wait(timeout)
+ if len(self._job_queue) == 0:
+ now = time.time()
+ if timeout > now - start:
+ raise Queue.Empty
+ job = heapq.heappop(self._job_queue)
+ if job is not None:
+ if getattr(job, "async", False):
+ ind = self._prepare_indication_for(job,
+ (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
+ job.start()
+ ind.send(True)
+ else:
+ job.start()
+ return job
+
+ def run(self):
+ """The entry point of thread."""
+ global LOG #pylint: disable=W0603
+ LOG = logging.getLogger(__name__)
+ LOG.info("%s thread started", self.name)
+
+ while self._terminate is False:
+ try:
+ timeout = None
+ with self._job_lock:
+ if len(self._calendar) > 0:
+ timeout = self._calendar[0][0] - time.time()
+ LOG.debug('waiting on input queue for job%s',
+ (' with timeout %s' % timeout) if timeout else '')
+ job = self._queue_in.get(timeout=timeout)
+ with self._job_lock:
+ self._enqueue_job(job)
+ while not self._queue_in.empty():
+ # this won't throw
+ self._enqueue_job(self._queue_in.get_nowait())
+
+ except Queue.Empty:
+ with self._job_lock:
+ while ( len(self._calendar)
+ and self._calendar[0][0] < time.time()):
+ _, jobid, action = heapq.heappop(self._calendar)
+ LOG.info('running action %s on job(id=%d)',
+ self.ACTION_NAMES[action], jobid)
+ self._run_event(jobid, action)
+ LOG.info('%s thread terminating', self.name)
+
diff --git a/src/software/lmi/software/yumdb/jobs.py b/src/software/lmi/software/yumdb/jobs.py
new file mode 100644
index 0000000..346ff17
--- /dev/null
+++ b/src/software/lmi/software/yumdb/jobs.py
@@ -0,0 +1,668 @@
+# -*- encoding: utf-8 -*-
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Define job classes representing kinds of jobs of worker process.
+"""
+
+import os
+import threading
+import time
+import yum
+
+from lmi.software import util
+from lmi.software.yumdb import errors
+from lmi.software.yumdb.packageinfo import PackageInfo
+from lmi.software.yumdb.repository import Repository
+
+DEFAULT_JOB_PRIORITY = 10
+# in seconds
+DEFAULT_TIME_BEFORE_REMOVAL = 60 * 5
+
+class YumJob(object): #pylint: disable=R0903
+ """
+ Base class for any job, that is processable by YumWorker process.
+ It contains jobid attribute, that must be unique for
+ each job, it's counted from zero a incremented after each creation.
+
+ metadata attribute typically contain:
+ name - name of job, that is modifiable by user
+ method - identificator of method, that lead to creation of job
+ """
+ __slots__ = ( 'jobid', 'created', 'started', 'finished', 'last_change'
+ , 'priority', 'result', 'result_data')
+
+ # jobs can be created concurrently from multiple threads, that's
+ # why we need to make its creation thread safe
+ _JOB_ID_LOCK = threading.Lock()
+ _JOB_ID = 0
+
+ # job state enumeration
+ NEW, RUNNING, COMPLETED, TERMINATED, EXCEPTION = range(5)
+ # job result enumeration
+ RESULT_SUCCESS, RESULT_TERMINATED, RESULT_ERROR = range(3)
+
+ ResultNames = ("success", "terminated", "error")
+
+ @staticmethod
+ def _get_job_id():
+ """
+ Generates new job ids. It should be called only from constructor
+ of YumJob. Ensures, that each job has a unique number.
+ @return number of jobs created since program start -1
+ """
+ with YumJob._JOB_ID_LOCK:
+ val = YumJob._JOB_ID
+ YumJob._JOB_ID += 1
+ return val
+
+ @classmethod
+ def handle_ignore_job_props(cls):
+ """
+ @return set of job properties, that does not count as job's handler
+ arguments - job handler does not care fore metadata, jobid, priority,
+ etc...
+ """
+ return set(YumJob.__slots__)
+
+ def __init__(self, priority=10):
+ if not isinstance(priority, (int, long)):
+ raise TypeError("priority must be integer")
+ self.jobid = self._get_job_id()
+ self.started = None
+ self.finished = None
+ self.priority = priority
+ self.created = time.time()
+ self.last_change = self.created
+ self.result = None
+ self.result_data = None
+
+ @property
+ def state(self):
+ """
+ @return integer representing job's state
+ """
+ if not self.started:
+ return self.NEW
+ if not self.finished:
+ return self.RUNNING
+ if self.result == self.RESULT_ERROR:
+ return self.EXCEPTION
+ if self.result == self.RESULT_TERMINATED:
+ return self.TERMINATED
+ return self.COMPLETED
+
+ @property
+ def job_kwargs(self):
+ """
+ Jobs are in worker handled in handlers specific for each subclass.
+ These handlers are methods of worker. They accepts concrete arguments
+ that can be obtained from job by invoking this property.
+ @return dictionary of keyword arguments of job
+ """
+ kwargs = {}
+ cls = self.__class__
+ while not cls in (YumJob, object):
+ for slot in cls.__slots__:
+ if ( not slot in kwargs
+ and not slot in cls.handle_ignore_job_props()):
+ kwargs[slot] = getattr(self, slot)
+ cls = cls.__bases__[0]
+ for prop in YumJob.__slots__:
+ kwargs.pop(prop, None)
+ return kwargs
+
+ def start(self):
+ """Modify the state of job to RUNNING."""
+ if self.started:
+ raise errors.InvalidJobState("can not start already started job")
+ self.started = time.time()
+ self.last_change = self.started
+
+ def finish(self, result, data=None):
+ """
+ Modify the state of job to one of {COMPLETED, EXCEPTION, TERMINATED}.
+ Depending on result parameter.
+ """
+ if not self.started and result != self.RESULT_TERMINATED:
+ raise errors.InvalidJobState("can not finish not started job")
+ self.finished = time.time()
+ if result == self.RESULT_TERMINATED:
+ self.started = self.finished
+ self.result = result
+ self.result_data = data
+ self.last_change = self.finished
+
+ def update(self, **kwargs):
+ """Change job's properties."""
+ change = False
+ for key, value in kwargs.items():
+ if getattr(self, key) != value:
+ setattr(self, key, value)
+ change = True
+ if change is True:
+ self.last_change = time.time()
+
+ def __eq__(self, other):
+ return self.__class__ is other.__class__ and self.jobid == other.jobid
+
+ def __ne__(self, other):
+ return ( self.__class__ is not other.__class__
+ or self.jobid != other.jobid)
+
+ def __lt__(self, other):
+ """
+ JobControl jobs have the highest priority.
+ """
+ return ( ( isinstance(self, YumJobControl)
+ and not isinstance(other, YumJobControl))
+ or ( self.priority < other.priority
+ or ( self.priority == other.priority
+ and ( self.jobid < other.jobid
+ or ( self.jobid == other.jobid
+ and (self.created < other.created))))))
+
+ def __cmp__(self, other):
+ if ( isinstance(self, YumJobControl)
+ and not isinstance(other, YumJobControl)):
+ return -1
+ if ( not isinstance(self, YumJobControl)
+ and isinstance(other, YumJobControl)):
+ return 1
+ if self.priority < other.priority:
+ return -1
+ if self.priority > other.priority:
+ return 1
+ if self.jobid < other.jobid:
+ return -1
+ if self.jobid > other.jobid:
+ return 1
+ if self.created < other.created:
+ return -1
+ if self.created > other.created:
+ return 1
+ return 0
+
+ def __str__(self):
+ return "%s(id=%d,p=%d)" % (
+ self.__class__.__name__, self.jobid, self.priority)
+
+ def __getstate__(self):
+ ret = self.job_kwargs
+ for prop in self.handle_ignore_job_props():
+ ret[prop] = getattr(self, prop)
+ return ret
+
+ def __setstate__(self, state):
+ for k, value in state.items():
+ setattr(self, k, value)
+
+class YumAsyncJob(YumJob): #pylint: disable=R0903
+ """
+ Base class for jobs, that support asynchronnous execution.
+ No reply is sent upon job completition or error. The results are
+ kept on server.
+ """
+ __slots__ = ( 'async'
+ , 'delete_on_completion'
+ , 'time_before_removal'
+ , 'metadata')
+
+ @classmethod
+ def handle_ignore_job_props(cls):
+ return YumJob.handle_ignore_job_props().union(YumAsyncJob.__slots__)
+
+ def __init__(self, priority=10, async=False, metadata=None):
+ YumJob.__init__(self, priority)
+ self.async = bool(async)
+ self.delete_on_completion = True
+ self.time_before_removal = DEFAULT_TIME_BEFORE_REMOVAL
+ if metadata is None and self.async is True:
+ metadata = {}
+ self.metadata = metadata
+
+ def __str__(self):
+ return "%s(id=%d,p=%d%s%s)" % (
+ self.__class__.__name__, self.jobid,
+ self.priority,
+ ',async' if self.async else '',
+ (',name="%s"'%self.metadata['name'])
+ if self.metadata and 'name' in self.metadata else '')
+
+ def update(self, **kwargs):
+ if 'metadata' in kwargs:
+ self.metadata.update(kwargs.pop('metadata'))
+ return YumJob.update(self, **kwargs)
+
+# *****************************************************************************
+# Job control funtions
+# *****************************************************************************
+class YumJobControl(YumJob): #pylint: disable=R0903
+ """Base class for any job used for asynchronous jobs management."""
+ pass
+
+class YumJobGetList(YumJobControl): #pylint: disable=R0903
+ """Request for obtaining list of all asynchronous jobs."""
+ pass
+
+class YumJobOnJob(YumJobControl):
+ """
+ Base class for any control job acting upon particular asynchronous job.
+ """
+ __slots__ = ('target', )
+ def __init__(self, target):
+ YumJobControl.__init__(self)
+ if not isinstance(target, (int, long)):
+ raise TypeError("target must be an integer")
+ self.target = target
+
+class YumJobGet(YumJobOnJob): #pylint: disable=R0903
+ """Get job object by its id."""
+ pass
+
+class YumJobGetByName(YumJobOnJob): #pylint: disable=R0903
+ """Get job object by its name property."""
+ def __init__(self, name):
+ YumJobOnJob.__init__(self, -1)
+ self.target = name
+
+class YumJobSetPriority(YumJobOnJob): #pylint: disable=R0903
+ """Change priority of job."""
+ __slots__ = ('new_priority', )
+
+ def __init__(self, target, priority):
+ YumJobOnJob.__init__(self, target)
+ self.new_priority = priority
+
+class YumJobUpdate(YumJobOnJob): #pylint: disable=R0903
+ """
+ .. _YumJobUpdate:
+
+ Update job's metadata. There are some forbidden properties, that
+ can not be changed in this way. Those are all affecting job's priority
+ and its scheduling for deletion. Plus any that store job's state.
+ All forbidden properties are listed in ``FORBIDDEN_PROPERTIES``.
+ """
+ __slots__ = ('data', )
+ FORBIDDEN_PROPERTIES = (
+ 'async', 'jobid', 'created', 'started', 'priority', 'finished',
+ 'delete_on_completion', 'time_before_removal', 'last_change')
+
+ def __init__(self, target, **kwargs):
+ YumJobOnJob.__init__(self, target)
+ assert not set.intersection(
+ set(YumJobUpdate.FORBIDDEN_PROPERTIES), set(kwargs))
+ self.data = kwargs
+
+class YumJobReschedule(YumJobOnJob): #pylint: disable=R0903
+ """Change the schedule of job's deletion."""
+ __slots__ = ('delete_on_completion', 'time_before_removal')
+ def __init__(self, target, delete_on_completion, time_before_removal):
+ YumJobOnJob.__init__(self, target)
+ if not isinstance(time_before_removal, (int, long, float)):
+ raise TypeError("time_before_removal must be float")
+ self.delete_on_completion = bool(delete_on_completion)
+ self.time_before_removal = time_before_removal
+
+class YumJobDelete(YumJobOnJob): #pylint: disable=R0903
+ """Delete job - can only be called on finished job."""
+ pass
+
+class YumJobTerminate(YumJobOnJob): #pylint: disable=R0903
+ """
+ Can only be called on not yet started job.
+ Running job can not be terminated.
+ """
+ pass
+
+# *****************************************************************************
+# Yum API functions
+# *****************************************************************************
+class YumBeginSession(YumJob): #pylint: disable=R0903
+ """
+ Begin session on YumWorker which ensures that yum database is locked
+ during its lifetime. Sessions can be nested, but the number of
+ YumEndSession jobs must be processed to make the database unlocked.
+ """
+ pass
+class YumEndSession(YumJob): #pylint: disable=R0903
+ """
+ End the session started with YumBeginSession. If the last active session
+ is ended, database will be unlocked.
+ """
+ pass
+
+class YumGetPackageList(YumJob): #pylint: disable=R0903
+ """
+ Job requesing a list of packages.
+ Arguments:
+ kind - supported values are in SUPPORTED_KINDS tuple
+ * installed lists all installed packages; more packages with
+ the same name can be installed varying in their architecture
+ * avail_notinst lists all available, not installed packages;
+ allow_duplicates must be True to include older packages (but still
+ available)
+ * avail_reinst lists all installed packages, that are available;
+ package can be installed, but not available anymore due to updates
+ of repository, where only the newest packages are kept
+ * available lists a union of avail_notinst and avail_reinst
+ * all lists union of installed and avail_notinst
+
+ allow_duplicates - whether multiple packages can be present
+ in result for single (name, arch) of package differing
+ in their version
+
+ sort - whether to sort packages by nevra
+
+ include_repos - either a string passable to RepoStorage.enableRepo()
+ or a list of repository names, that will be temporared enabled before
+ listing packages; this is applied after disabling of repositories
+
+ exclude_repos - either a string passable to RepoStorage.disableRepo()
+ or a list of repository names, that will be temporared disabled before
+ listing packages; this is applied before enabling of repositories
+
+ Worker replies with [pkg1, pkg2, ...].
+ """
+ __slots__ = ('kind', 'allow_duplicates', 'sort', 'include_repos',
+ 'exclude_repos')
+
+ SUPPORTED_KINDS = ( 'installed', 'available', 'avail_reinst'
+ , 'avail_notinst', 'all')
+
+ def __init__(self, kind, allow_duplicates, sort=False,
+ include_repos=None, exclude_repos=None):
+ YumJob.__init__(self)
+ if not isinstance(kind, basestring):
+ raise TypeError("kind must be a string")
+ if not kind in self.SUPPORTED_KINDS:
+ raise ValueError("kind must be one of {%s}" %
+ ", ".join(self.SUPPORTED_KINDS))
+ for arg in ('include_repos', 'exclude_repos'):
+ val = locals()[arg]
+ if ( not val is None
+ and not isinstance(arg, (tuple, list, basestring))):
+ raise TypeError("expected list or string for %s" % arg)
+ self.kind = kind
+ self.allow_duplicates = bool(allow_duplicates)
+ self.sort = bool(sort)
+ self.include_repos = include_repos
+ self.exclude_repos = exclude_repos
+
+class YumFilterPackages(YumGetPackageList): #pylint: disable=R0903
+ """
+ Job similar to YumGetPackageList, but allowing to specify
+ filter on packages.
+ Arguments (plus those in YumGetPackageList):
+ name, epoch, version, release, arch, nevra, envra, evra
+
+ Some of those are redundant, but filtering is optimized for
+ speed, so supplying all of them won't affect performance.
+
+ Worker replies with [pkg1, pkg2, ...].
+ """
+ __slots__ = (
+ 'name', 'epoch', 'version', 'release', 'arch',
+ 'nevra', 'envra', 'evra', 'repoid')
+
+ def __init__(self, kind, allow_duplicates,
+ sort=False, include_repos=None, exclude_repos=None,
+ name=None, epoch=None, version=None,
+ release=None, arch=None,
+ nevra=None, evra=None,
+ envra=None,
+ repoid=None):
+ if nevra is not None and not util.RE_NEVRA.match(nevra):
+ raise ValueError("Invalid nevra: %s" % nevra)
+ if evra is not None and not util.RE_EVRA.match(evra):
+ raise ValueError("Invalid evra: %s" % evra)
+ if envra is not None and not util.RE_ENVRA.match(evra):
+ raise ValueError("Invalid envra: %s" % envra)
+ YumGetPackageList.__init__(self, kind, allow_duplicates, sort,
+ include_repos=include_repos, exclude_repos=exclude_repos)
+ self.name = name
+ self.epoch = None if epoch is None else str(epoch)
+ self.version = version
+ self.release = release
+ self.arch = arch
+ self.nevra = nevra
+ self.evra = evra
+ self.envra = envra
+ self.repoid = repoid
+
+class YumSpecificPackageJob(YumAsyncJob): #pylint: disable=R0903
+ """
+ Abstract job taking instance of yumdb.PackageInfo as argument or
+ package's nevra.
+ Arguments:
+ pkg - plays different role depending on job subclass;
+ can also be a nevra
+ """
+ __slots__ = ('pkg', )
+ def __init__(self, pkg, async=False, metadata=None):
+ if isinstance(pkg, basestring):
+ if not util.RE_NEVRA_OPT_EPOCH.match(pkg):
+ raise errors.InvalidNevra('not a valid nevra "%s"' % pkg)
+ elif not isinstance(pkg, PackageInfo):
+ raise TypeError("pkg must be either string or instance"
+ " of PackageInfo")
+ YumAsyncJob.__init__(self, async=async, metadata=metadata)
+ self.pkg = pkg
+
+class YumInstallPackage(YumSpecificPackageJob): #pylint: disable=R0903
+ """
+ Job requesting installation of specific package.
+ pkg argument should be available.
+ Arguments:
+ pkg - same as in YumSpecificPackageJob
+ force is a boolean saying:
+ True -> reinstall the package if it's already installed
+ False -> fail if the package is already installed
+
+ Worker replies with new instance of package.
+ """
+ __slots__ = ('force', )
+ def __init__(self, pkg, async=False, force=False, metadata=None):
+ YumSpecificPackageJob.__init__(
+ self, pkg, async=async, metadata=metadata)
+ self.force = bool(force)
+
+class YumRemovePackage(YumSpecificPackageJob): #pylint: disable=R0903
+ """
+ Job requesting removal of specific package.
+ pkg argument should be installed.
+ """
+ pass
+
+class YumUpdateToPackage(YumSpecificPackageJob): #pylint: disable=R0903
+ """
+ Job requesting update to provided specific package.
+ Package is updated to epoch, version and release of this
+ provided available package.
+
+ Worker replies with new instance of package.
+ """
+ pass
+
+class YumUpdatePackage(YumSpecificPackageJob): #pylint: disable=R0903
+ """
+ Job requesting update of package, optionally reducing possible
+ candidate packages to ones with specific evr.
+ Arguments:
+ to_epoch, to_version, to_release
+ force is a boolean, that has meaning only when update_only is False:
+ True -> reinstall the package if it's already installed
+ False -> fail if the package is already installed
+
+ The arguments more given, the more complete filter of candidates.
+
+ Worker replies with new instance of package.
+ """
+ __slots__ = ('to_epoch', 'to_version', 'to_release', 'force')
+
+ def __init__(self, pkg, async=False,
+ to_epoch=None, to_version=None, to_release=None, force=False,
+ metadata=None):
+ if not isinstance(pkg, PackageInfo):
+ raise TypeError("pkg must be instance of yumdb.PackageInfo")
+ YumSpecificPackageJob.__init__(
+ self, pkg, async=async, metadata=metadata)
+ self.to_epoch = to_epoch
+ self.to_version = to_version
+ self.to_release = to_release
+ self.force = bool(force)
+
+class YumCheckPackage(YumSpecificPackageJob): #pylint: disable=R0903
+ """
+ Request verification information for instaled package and its files.
+
+ Arguments:
+ pkg - either instance of PackageInfo or nevra string.
+ In latter case it will be replaced for YumWorker with instance
+ of PackageInfo.
+
+ Worker replies with ``(pkg_info, pkg_check)``.
+ where:
+ ``pkg_info`` - is instance of PackageInfo
+ ``pkg_check`` - new instance of yumdb.PackageCheck
+ """
+ def __init__(self, pkg, async=False, metadata=None):
+ YumSpecificPackageJob.__init__(self, pkg, async=async,
+ metadata=metadata)
+ if isinstance(pkg, PackageInfo) and not pkg.installed:
+ raise ValueError("package must be installed to check it")
+
+class YumCheckPackageFile(YumCheckPackage): #pylint: disable=R0903
+ """
+ Request verification information for particular file of installed
+ package.
+
+ Worker replies with ``(pkg_info, pkg_check)``.
+ where:
+ ``pkg_info`` - is instance of PackageInfo
+ ``pkg_check`` - new instance of yumdb.PackageCheck containing only
+ requested file.
+ """
+ __slots__ = ('file_name', )
+ def __init__(self, pkg, file_name, *args, **kwargs):
+ YumCheckPackage.__init__(self, pkg, *args, **kwargs)
+ if not isinstance(file_name, basestring):
+ raise TypeError("file_name must be string")
+ self.file_name = file_name
+
+class YumInstallPackageFromURI(YumAsyncJob): #pylint: disable=R0903
+ """
+ Job requesting installation of specific package from URI.
+ Arguments:
+ uri is either a path to rpm package on local filesystem or url
+ of rpm stored on remote host
+ update_only is a boolean:
+ True -> install the package only if the older version is installed
+ False -> install the package if it's not already installed
+ force is a boolean, that has meaning only when update_only is False:
+ True -> reinstall the package if it's already installed
+ False -> fail if the package is already installed
+
+ Worker replies with new instance of package.
+ """
+ __slots__ = ('uri', 'update_only', "force")
+ def __init__(self, uri, async=False, update_only=False, force=False,
+ metadata=None):
+ if not isinstance(uri, basestring):
+ raise TypeError("uri must be a string")
+ if uri.startswith('file://'):
+ uri = uri[len('file://'):]
+ if not yum.misc.re_remote_url(uri) and not os.path.exists(uri):
+ raise errors.InvalidURI(uri)
+ YumAsyncJob.__init__(self, async=async, metadata=metadata)
+ self.uri = uri
+ self.update_only = bool(update_only)
+ self.force = bool(force)
+
+class YumGetRepositoryList(YumJob): #pylint: disable=R0903
+ """
+ Job requesing a list of repositories.
+ Arguments:
+ kind - supported values are in SUPPORTED_KINDS tuple
+
+ Worker replies with [repo1, repo2, ...].
+ """
+ __slots__ = ('kind', )
+
+ SUPPORTED_KINDS = ('all', 'enabled', 'disabled')
+
+ def __init__(self, kind):
+ YumJob.__init__(self)
+ if not isinstance(kind, basestring):
+ raise TypeError("kind must be a string")
+ if not kind in self.SUPPORTED_KINDS:
+ raise ValueError("kind must be one of {%s}" %
+ ", ".join(self.SUPPORTED_KINDS))
+ self.kind = kind
+
+class YumFilterRepositories(YumGetRepositoryList): #pylint: disable=R0903
+ """
+ Job similar to YumGetRepositoryList, but allowing to specify
+ filter on packages.
+ Arguments (plus those in YumGetRepositoryList):
+ name, gpg_check, repo_gpg_check
+
+ Some of those are redundant, but filtering is optimized for
+ speed, so supplying all of them won't affect performance.
+
+ Worker replies with [repo1, repo2, ...].
+ """
+ __slots__ = ('repoid', 'gpg_check', 'repo_gpg_check')
+
+ def __init__(self, kind,
+ repoid=None, gpg_check=None, repo_gpg_check=None):
+ YumGetRepositoryList.__init__(self, kind)
+ self.repoid = repoid
+ self.gpg_check = None if gpg_check is None else bool(gpg_check)
+ self.repo_gpg_check = (
+ None if repo_gpg_check is None else bool(repo_gpg_check))
+
+class YumSpecificRepositoryJob(YumJob): #pylint: disable=R0903
+ """
+ Abstract job taking instance of yumdb.Repository as argument.
+ Arguments:
+ repo - (``Repository`` or ``str``) plays different role depending
+ on job subclass
+ """
+ __slots__ = ('repo', )
+ def __init__(self, repo):
+ if not isinstance(repo, (Repository, basestring)):
+ raise TypeError("repoid must be either instance of"
+ " yumdb.Repository or string")
+ YumJob.__init__(self)
+ self.repo = repo
+
+class YumSetRepositoryEnabled(YumSpecificRepositoryJob):#pylint: disable=R0903
+ """
+ Job allowing to enable or disable repository.
+ Arguments:
+ enable - (``boolean``) representing next state
+ """
+ __slots__ = ('enable', )
+ def __init__(self, repo, enable):
+ YumSpecificRepositoryJob.__init__(self, repo)
+ self.enable = bool(enable)
+
diff --git a/src/software/lmi/software/yumdb/packagecheck.py b/src/software/lmi/software/yumdb/packagecheck.py
new file mode 100644
index 0000000..e66078c
--- /dev/null
+++ b/src/software/lmi/software/yumdb/packagecheck.py
@@ -0,0 +1,235 @@
+# -*- encoding: utf-8 -*-
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Module with definition of RPM package check class.
+"""
+
+from collections import OrderedDict
+from datetime import datetime
+import grp
+import logging
+import os
+import pwd
+import rpm
+import yum
+
+from lmi.software.yumdb import errors
+
+CHECKSUMTYPE_STR2NUM = dict((val.lower(), k) for (k, val) in
+ yum.constants.RPM_CHECKSUM_TYPES.items())
+
+( FILE_TYPE_UNKNOWN
+, FILE_TYPE_FILE
+, FILE_TYPE_DIRECTORY
+, FILE_TYPE_SYMLINK
+, FILE_TYPE_FIFO
+, FILE_TYPE_CHARACTER_DEVICE
+, FILE_TYPE_BLOCK_DEVICE
+) = range(7)
+
+FILE_TYPE_NAMES = ( 'unknown', 'file', 'directory', 'symlink', 'fifo'
+ , 'character device', 'block device')
+
+class PackageFile(object):
+ """
+ Metadata related to particular file on filesystem belonging to RPM package.
+ Data contained here are from RPM database.
+
+ Attributes:
+ ``path`` - (``str``) Absolute path of file.
+ ``file_type`` - (``int``) One of ``FILE_TYPE_*`` identifiers above.
+ ``uid`` - (``int``) User ID.
+ ``gid`` - (``int``) Group ID.
+ ``mode`` - (``int``) Raw file mode.
+ ``device`` - (``int``) Device number.
+ ``mtime`` - (``int``) Last modification time in seconds.
+ ``size`` - (``long``) File size as a number of bytes.
+ ``link_target`` - (``str``) Link target of symlink. None if ``file_type``
+ is not symlink.
+ ``checksum`` - (``str``) Checksum as string in hexadecimal format.
+ None if file is not a regular file.
+ """
+ __slots__ = ("path", "file_type", "uid", "gid", "mode", "device", "mtime",
+ "size", "link_target", "checksum")
+
+ def __init__(self, path, file_type, uid, gid, mode, device, mtime, size,
+ link_target, checksum):
+ if not isinstance(file_type, basestring):
+ raise TypeError("file_type must be a string")
+ for arg in ('uid', 'gid', 'mode', 'mtime', 'size'):
+ if not isinstance(locals()[arg], (int, long)):
+ raise TypeError("%s must be integer" % arg)
+ if not os.path.isabs(path):
+ raise ValueError("path must be an absolute path")
+ self.path = path
+ try:
+ self.file_type = FILE_TYPE_NAMES.index(file_type.lower())
+ except ValueError:
+ logging.getLogger(__name__).error('unrecognized file type "%s" for'
+ ' file "%s"', file_type, path)
+ self.file_type = FILE_TYPE_NAMES[FILE_TYPE_UNKNOWN]
+ self.uid = uid
+ self.gid = gid
+ self.mode = mode
+ self.device = device
+ self.mtime = mtime
+ self.size = size
+ self.link_target = (link_target
+ if self.file_type == FILE_TYPE_SYMLINK else None)
+ self.checksum = checksum if self.file_type == FILE_TYPE_FILE else None
+
+ @property
+ def last_modification_datetime(self):
+ """
+ @return instance datetime for last modification time of file
+ """
+ return datetime.fromtimestamp(self.mtime)
+
+ def __getstate__(self):
+ """
+ Used for serialization with pickle.
+ @return container content that will be serialized
+ """
+ return dict((k, getattr(self, k)) for k in self.__slots__)
+
+ def __setstate__(self, state):
+ """
+ Used for deserialization with pickle.
+ Restores the object from serialized form.
+ @param state is an object created by __setstate__() method
+ """
+ for k, value in state.items():
+ setattr(self, k, value)
+
+class PackageCheck(object):
+ """
+ Metadata for package concerning verification.
+ It contains metadata for each file installed in "files" attribute.
+ """
+ __slots__ = ("objid", "file_checksum_type", "files")
+
+ def __init__(self, objid, file_checksum_type, files=None):
+ """
+ @param objid is an in of original yum package object, which is used
+ by server for subsequent operations on this package requested by client
+ """
+ if files is not None and not isinstance(
+ files, (list, tuple, set, dict)):
+ raise TypeError("files must be an iterable container")
+ self.objid = objid
+ self.file_checksum_type = file_checksum_type
+ if not isinstance(files, dict):
+ self.files = OrderedDict()
+ if files is not None:
+ for file_check in sorted(files, key=lambda f: f.path):
+ self.files[file_check.path] = file_check
+ else:
+ for path in sorted(files):
+ self.files[path] = files[path]
+
+ def __iter__(self):
+ return iter(self.files)
+
+ def __len__(self):
+ return len(self.files)
+
+ def __getitem__(self, filepath):
+ return self.files[filepath]
+
+ def __setitem__(self, filepath, package_file):
+ if not isinstance(package_file, PackageFile):
+ raise TypeError("package_file must be a PackageFile instance")
+ self.files[filepath] = package_file
+
+ def __contains__(self, fileobj):
+ if isinstance(fileobj, basestring):
+ return fileobj in self.files
+ elif isinstance(fileobj, PackageFile):
+ return fileobj.path in self.files
+ else:
+ raise TypeError("expected file path for argument")
+
+ def __getstate__(self):
+ """
+ Used for serialization with pickle.
+ @return container content that will be serialized
+ """
+ return dict((k, getattr(self, k)) for k in self.__slots__)
+
+ def __setstate__(self, state):
+ """
+ Used for deserialization with pickle.
+ Restores the object from serialized form.
+ @param state is an object created by __setstate__() method
+ """
+ for k, value in state.items():
+ setattr(self, k, value)
+
+def pkg_checksum_type(pkg):
+ """
+ @return integer representation of checksum type
+ """
+ if not isinstance(pkg, yum.packages.YumAvailablePackage):
+ raise TypeError("pkg must be an instance of YumAvailablePackage")
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO]
+ return CHECKSUMTYPE_STR2NUM[pkg.yumdb_info.checksum_type.lower()]
+
+def make_package_check_from_db(vpkg, file_name=None):
+ """
+ Create instance of PackageCheck from instance of
+ yum.packages._RPMVerifyPackage.
+
+ :param file_name: (``str``) If not None, causes result to have just
+ one instance of ``PackageFile`` matching this file_name.
+ If it's not found in the package, ``FileNotFound`` will be raised.
+ :rtype (``PackageCheck``)
+ """
+ if not isinstance(vpkg, yum.packages._RPMVerifyPackage):
+ raise TypeError("vpkg must be instance of"
+ " yum.packages._RPMVerifyPackage")
+ pkg = vpkg.po
+
+ res = PackageCheck(id(pkg), pkg_checksum_type(pkg))
+ files = res.files
+ for vpf in vpkg:
+ if file_name is not None and file_name != vpf.filename:
+ continue
+ files[vpf.filename] = PackageFile(
+ vpf.filename,
+ vpf.ftype,
+ pwd.getpwnam(vpf.user).pw_uid,
+ grp.getgrnam(vpf.group).gr_gid,
+ vpf.mode,
+ vpf.dev,
+ vpf.mtime,
+ vpf.size,
+ vpf.readlink,
+ vpf.digest[1]
+ )
+ if file_name is not None:
+ break
+ if file_name is not None and len(files) < 1:
+ raise errors.FileNotFound('File "%s" not found in package "%s".' % (
+ file_name, pkg.nevra))
+ return res
+
diff --git a/src/software/lmi/software/yumdb/packageinfo.py b/src/software/lmi/software/yumdb/packageinfo.py
new file mode 100644
index 0000000..7a40d19
--- /dev/null
+++ b/src/software/lmi/software/yumdb/packageinfo.py
@@ -0,0 +1,181 @@
+# -*- encoding: utf-8 -*-
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Module holding an abstraction for RPM package.
+"""
+
+from datetime import datetime
+import yum
+
+from lmi.software import util
+
+class PackageInfo(object):
+ """
+ Container for package metadata. It represents rpm package in yum
+ database. It's supposed to be passed from YumWorker to YumDB client
+ and vice-versa. Instances of YumAvailablePackage can not be exchanged
+ -- results in segfaults.
+
+ To speed up looking up of original yum package object on server, an
+ atribute "objid" is provided.
+ """
+ __slots__ = (
+ "objid",
+ "name", "epoch", "version", "release", "architecture",
+ 'summary', 'description', 'license', 'group', 'vendor',
+ "repoid", 'size',
+ 'installed', # boolean
+ 'install_time' # datetime instance
+ )
+
+ def __init__(self, objid, name, epoch, version, release, arch, **kwargs):
+ """
+ @param objid is an in of original yum package object, which is used
+ by server for subsequent operations on this package requested by client
+ """
+ self.objid = objid
+ self.name = name
+ self.epoch = epoch
+ self.version = version
+ self.release = release
+ self.architecture = arch
+ self.summary = kwargs.pop('summary', None)
+ self.description = kwargs.pop('description', None)
+ self.license = kwargs.pop('license', None)
+ self.group = kwargs.pop('group', None)
+ self.vendor = kwargs.pop('vendor', None)
+ self.repoid = kwargs.pop("repoid", None)
+ self.size = kwargs.pop('size', None)
+ if self.size is not None and not isinstance(self.size, (int, long)):
+ raise TypeError('size must be an integer')
+ self.installed = kwargs.pop('installed', None)
+ if self.installed is not None:
+ self.installed = bool(self.installed)
+ self.install_time = kwargs.pop('install_time', None)
+ if ( self.install_time is not None
+ and not isinstance(self.install_time, datetime)):
+ raise TypeError('install_time must be a datetime')
+
+ # *************************************************************************
+ # Properties
+ # *************************************************************************
+ @property
+ def ver(self):
+ """Shortcut for version property."""
+ return self.version
+
+ @property
+ def rel(self):
+ """Shortcut for release property."""
+ return self.release
+
+ @property
+ def arch(self):
+ """Shortcut for architecture property."""
+ return self.architecture
+
+ @property
+ def nevra(self):
+ """@return nevra of package with epoch always present."""
+ return self.get_nevra(with_epoch="ALWAYS")
+
+ @property
+ def evra(self):
+ """@return evra of package."""
+ return "%s:%s-%s.%s" % (
+ self.epoch if self.epoch and self.epoch != "(none)" else "0",
+ self.version,
+ self.release,
+ self.architecture)
+
+ @property
+ def key_props(self):
+ """
+ @return package properties as dictionary,
+ that uniquelly identify package in database
+ """
+ return dict((k, getattr(self, k)) for k in (
+ 'name', 'epoch', 'version', 'release', 'arch', 'repoid'))
+
+ # *************************************************************************
+ # Public methods
+ # *************************************************************************
+ def get_nevra(self, with_epoch='NOT_ZERO'):
+ """
+ @param with_epoch may be one of:
+ "NOT_ZERO" - include epoch only if it's not zero
+ "ALWAYS" - include epoch always
+ "NEVER" - do not include epoch at all
+ @return nevra of package
+ """
+ return util.make_nevra(self.name, self.epoch, self.version,
+ self.release, self.arch, with_epoch)
+
+ # *************************************************************************
+ # Special methods
+ # *************************************************************************
+ def __str__(self):
+ return self.nevra
+
+ def __getstate__(self):
+ """
+ Used for serialization with pickle.
+ @return container content that will be serialized
+ """
+ return dict((k, getattr(self, k)) for k in self.__slots__)
+
+ def __setstate__(self, state):
+ """
+ Used for deserialization with pickle.
+ Restores the object from serialized form.
+ @param state is an object created by __setstate__() method
+ """
+ for k, value in state.items():
+ setattr(self, k, value)
+
+ def __eq__(self, other):
+ return ( self.name == other.name
+ and self.version == other.version
+ and self.release == other.release
+ and self.arch == other.arch
+ and self.epoch == other.epoch
+ and ( (self.repoid is None or other.repoid is None)
+ or (self.repoid == other.repoid)))
+
+def make_package_from_db(pkg):
+ """
+ Create instance of PackageInfo from instance of
+ yum.packages.YumAvailablePackage.
+ @return instance of PackageInfo
+ """
+ metadata = dict((k, getattr(pkg, k)) for k in (
+ 'summary', 'description', 'license', 'group', 'vendor', 'size',
+ 'repoid'))
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ metadata['installed'] = True
+ metadata['install_time'] = datetime.fromtimestamp(pkg.installtime)
+ else:
+ metadata['installed'] = False
+ res = PackageInfo(id(pkg), pkg.name, pkg.epoch, pkg.version, pkg.release,
+ pkg.arch, **metadata)
+ return res
+
diff --git a/src/software/lmi/software/yumdb/process.py b/src/software/lmi/software/yumdb/process.py
new file mode 100644
index 0000000..85abe61
--- /dev/null
+++ b/src/software/lmi/software/yumdb/process.py
@@ -0,0 +1,914 @@
+# -*- encoding: utf-8 -*-
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Module holding the code of separate process accessing the YUM API.
+"""
+
+import errno
+from itertools import chain
+import logging
+from multiprocessing import Process
+import os
+import Queue as TQueue # T as threaded
+import sys
+import time
+import traceback
+from urlgrabber.grabber import default_grabber
+import weakref
+import yum
+
+from lmi.software import util
+from lmi.software.yumdb import errors
+from lmi.software.yumdb import jobs
+from lmi.software.yumdb import packageinfo
+from lmi.software.yumdb import packagecheck
+from lmi.software.yumdb import repository
+from lmi.software.yumdb.jobmanager import JobManager
+from lmi.software.yumdb.util import trace_function, setup_logging
+
+# *****************************************************************************
+# Constants
+# *****************************************************************************
+# interval in seconds
+FREE_DATABASE_TIMEOUT = 60
+LOCK_WAIT_INTERVAL = 0.5
+RPMDB_PATH = '/var/lib/rpm/Packages'
+LOG = None
+
+# *****************************************************************************
+# Utilities
+# ****************************************************************************
+def _get_package_filter_function(filters):
+ """
+ @param filters is a dictionary, where keys are package property
+ names and values are their desired values.
+ @return a function used to filter list of packages
+ """
+ if not isinstance(filters, dict):
+ raise TypeError("filters must be a dictionary")
+
+ filters = dict((k, value) for k, value in filters.items()
+ if value is not None)
+
+ match = None
+ if "nevra" in filters:
+ match = util.RE_NEVRA.match(filters["nevra"])
+ elif "envra" in filters:
+ match = util.RE_ENVRA.match(filters["envra"])
+ if match is not None:
+ for attr in ("name", "epoch", "version", "release", "arch"):
+ match_attr = attr
+ filters[attr] = match.group(match_attr)
+ filters.pop('nevra', None)
+ filters.pop('envra', None)
+ elif "evra" in filters:
+ for prop_name in ("epoch", "version", "release", "epoch"):
+ filters.pop(prop_name, None)
+ filter_list = []
+ # properties are sorted by their filtering ability
+ # (the most unprobable property, that can match, comes first)
+ for prop_name in ("evra", "name", "version", "epoch",
+ "release", "repoid", "arch"):
+ if not prop_name in filters:
+ continue
+ filter_list.append((prop_name, filters.pop(prop_name)))
+ def _cmp_props(pkg):
+ """@return True if pkg matches properies filter"""
+ return all(getattr(pkg, p) == v for p, v in filter_list)
+ return _cmp_props
+
+class RepoFilterSetter(object):
+ """
+ A context manager, that will set a repository filter lasting
+ as long as the object itself.
+ """
+ def __init__(self, yum_base, include_repos=None, exclude_repos=None):
+ if not isinstance(yum_base, yum.YumBase):
+ raise TypeError("yum_base must be a YumBase instance")
+ self._yum_base = yum_base
+ self._include = include_repos
+ self._exclude = exclude_repos
+ # after __enter__ this will be dictionary containing (
+ # repoid, enabled) pairs
+ self._prev_states = None
+
+ def __enter__(self):
+ self._prev_states = { r.id: r.enabled
+ for r in self._yum_base.repos.repos.values()}
+ if isinstance(self._exclude, (list, tuple, set)):
+ exclude = ",".join(self._exclude)
+ else:
+ exclude = self._exclude
+ # set of repositories, that were affected
+ repos = set()
+ if exclude:
+ repos.update(self._yum_base.repos.disableRepo(exclude))
+ LOG.info('disabling repositories: [%s]', ", ".join(repos))
+ if isinstance(self._include, (list, tuple, set)):
+ include = ",".join(self._include)
+ else:
+ include = self._include
+ if include:
+ affected = self._yum_base.repos.enableRepo(include)
+ LOG.info('enabling repositories: [%s]', ", ".join(affected))
+ repos.update(affected)
+ for repoid, prev_enabled in self._prev_states.items():
+ if ( repoid not in repos
+ or ( bool(prev_enabled)
+ is bool(self._yum_base.repos.getRepo(repoid).enabled))):
+ # keep only manipulated repositories
+ del self._prev_states[repoid]
+ if len(self._prev_states):
+ for repoid in (r for r, v in self._prev_states.items() if v):
+ self._yum_base.pkgSack.sacks.pop(repoid, None)
+ self._yum_base.repos.populateSack()
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ # restore previous repository states
+ if len(self._prev_states):
+ LOG.info('restoring repositories: [%s]',
+ ", ".join(self._prev_states.keys()))
+ for repoid, enabled in self._prev_states.items():
+ repo = self._yum_base.repos.getRepo(repoid)
+ if enabled:
+ repo.enable()
+ else:
+ repo.disable()
+ for repoid in (r for r, v in self._prev_states.items() if not v):
+ self._yum_base.pkgSack.sacks.pop(repoid, None)
+ self._yum_base.repos.populateSack()
+
+# *****************************************************************************
+# Decorators
+# *****************************************************************************
+def _needs_database(method):
+ """
+ Decorator for YumWorker job handlers, that need to access the yum database.
+ It ensures, that database is initialized and locks it in case, that
+ no session is active.
+ """
+ logged = trace_function(method)
+ def _wrapper(self, *args, **kwargs):
+ """
+ Wrapper for the job handler method.
+ """
+ created_session = False
+ self._init_database() #pylint: disable=W0212
+ if self._session_level == 0: #pylint: disable=W0212
+ self._session_level = 1 #pylint: disable=W0212
+ created_session = True
+ self._lock_database() #pylint: disable=W0212
+ try:
+ LOG.debug("calling job handler %s with args=(%s)",
+ method.__name__,
+ ", ".join(chain(
+ (str(a) for a in args),
+ ("%s=%s"%(k, str(v)) for k, v in kwargs.items()))))
+ result = logged(self, *args, **kwargs)
+ LOG.debug("job handler %s finished", method.__name__)
+ return result
+ finally:
+ if created_session is True: #pylint: disable=W0212
+ self._session_level = 0 #pylint: disable=W0212
+ self._unlock_database() #pylint: disable=W0212
+ return _wrapper
+
+# *****************************************************************************
+# Classes
+# *****************************************************************************
+class YumWorker(Process):
+ """
+ The main process, that works with YUM API. It has two queues, one
+ for input jobs and second for results.
+
+ Jobs are dispatched by their class names to particular handler method.
+
+ It spawns a second thread for managing asynchronous jobs and queue
+ of incoming jobs. It's an instance of JobManager.
+ """
+
+ def __init__(self,
+ queue_in,
+ queue_out,
+ indication_manager,
+ yum_kwargs=None,
+ logging_config=None):
+ Process.__init__(self, name="YumWorker")
+ self._jobmgr = JobManager(queue_in, queue_out, indication_manager)
+ self._session_level = 0
+ self._session_ended = False
+
+ if yum_kwargs is None:
+ yum_kwargs = {}
+
+ self._yum_kwargs = yum_kwargs
+ self._yum_base = None
+
+ self._pkg_cache = None
+ # contains (repoid, time_stamp_of_config_file)
+ # plus (/repos/dir, ...) for each repo config directory
+ self._repodir_mtimes = {}
+ self._logging_config = logging_config
+
+ # *************************************************************************
+ # Private methods
+ # *************************************************************************
+ @trace_function
+ def _init_database(self):
+ """
+ Initializes yum base object, when it does no exists.
+ And updates the cache (when out of date).
+ """
+ if self._yum_base is None:
+ LOG.info("creating YumBase with kwargs=(%s)",
+ ", ".join(( "%s=%s"%(k, str(v))
+ for k, v in self._yum_kwargs.items())))
+ self._yum_base = yum.YumBase(**self._yum_kwargs)
+
+ @trace_function
+ def _free_database(self):
+ """
+ Release the yum base object to safe memory.
+ """
+ LOG.info("freing database")
+ self._pkg_cache.clear()
+ self._yum_base = None
+
+ @trace_function
+ def _lock_database(self):
+ """
+ Only one process is allowed to work with package database at given time.
+ That's why we lock it.
+
+ Try to lock it in loop, until success.
+ """
+ while True:
+ try:
+ LOG.info("trying to lock database - session level %d",
+ self._session_level)
+ self._yum_base.doLock()
+ LOG.info("successfully locked up")
+ break
+ except yum.Errors.LockError as exc:
+ LOG.warn("failed to lock")
+ if exc.errno in (errno.EPERM, errno.EACCES, errno.ENOSPC):
+ LOG.error("can't create lock file")
+ raise errors.DatabaseLockError("Can't create lock file.")
+ LOG.info("trying to lock again after %.1f seconds",
+ LOCK_WAIT_INTERVAL)
+ time.sleep(LOCK_WAIT_INTERVAL)
+
+ @trace_function
+ def _unlock_database(self):
+ """
+ The opposite to _lock_database() method.
+ """
+ if self._yum_base is not None:
+ LOG.info("unlocking database")
+ self._yum_base.closeRpmDB()
+ self._yum_base.doUnlock()
+
+ @trace_function
+ def _get_job(self):
+ """
+ Get job from JobManager thread.
+ If no job comes for long time, free database to save memory.
+ """
+ while True:
+ if self._session_ended and self._session_level == 0:
+ try:
+ return self._jobmgr.get_job(timeout=FREE_DATABASE_TIMEOUT)
+ except TQueue.Empty:
+ self._free_database()
+ self._session_ended = False
+ else:
+ return self._jobmgr.get_job()
+
+ @trace_function
+ def _transform_packages(self, packages,
+ cache_packages=True,
+ flush_cache=True):
+ """
+ Return instances of PackageInfo for each package in packages.
+ Cache all the packages.
+ @param packages list of YumAvailablePackage instances
+ @param cache_packages whether to update cache with packages
+ @param flush_cache whether to clear the cache before adding input
+ packages; makes sense only with cachee_packages=True
+ """
+ if cache_packages is True and flush_cache is True:
+ LOG.debug("flushing package cache")
+ self._pkg_cache.clear()
+ res = []
+ for orig in packages:
+ pkg = packageinfo.make_package_from_db(orig)
+ if cache_packages is True:
+ self._pkg_cache[pkg.objid] = orig
+ res.append(pkg)
+ return res
+
+ @trace_function
+ def _cache_packages(self, packages, flush_cache=True, transform=False):
+ """
+ Store packages in cache and return them.
+ @param flush_cache whether to clear the cache before adding new
+ packages
+ @param transform whether to return packages as PackageInfos
+ @return either list of original packages or PackageInfo instances
+ """
+ if transform is True:
+ return self._transform_packages(packages, flush_cache=flush_cache)
+ if flush_cache is True:
+ LOG.debug("flushing package cache")
+ self._pkg_cache.clear()
+ for pkg in packages:
+ self._pkg_cache[id(pkg)] = pkg
+ return packages
+
+ @trace_function
+ def _lookup_package(self, pkg):
+ """
+ Lookup the original package in cache.
+ If it was garbage collected already, make new query to find it.
+ @return instance of YumAvailablePackage
+ """
+ if not isinstance(pkg, packageinfo.PackageInfo):
+ raise TypeError("pkg must be instance of PackageInfo")
+ LOG.debug("looking up yum package %s with id=%d",
+ pkg, pkg.objid)
+ try:
+ result = self._pkg_cache[pkg.objid]
+ LOG.debug("lookup successful")
+ except KeyError:
+ LOG.warn("lookup of package %s with id=%d failed, trying"
+ " to query database", pkg, pkg.objid)
+ result = self._handle_filter_packages(
+ 'installed' if pkg.installed else 'available',
+ allow_duplicates=False,
+ sort=False,
+ transform=False,
+ **pkg.key_props)
+ if len(result) < 1:
+ LOG.warn("package %s not found", pkg)
+ raise errors.PackageNotFound(
+ "package %s could not be found" % pkg)
+ result = result[0]
+ return result
+
+ @trace_function
+ def _clear_repository_cache(self):
+ """
+ Clears the repository cache and their configuration directory
+ last modification times.
+ """
+ if self._yum_base is not None:
+ for repoid in self._yum_base.repos.repos.keys():
+ self._yum_base.repos.delete(repoid)
+ del self._yum_base.repos
+ del self._yum_base.pkgSack
+ self._repodir_mtimes.clear()
+
+ @trace_function
+ def _check_repository_configs(self):
+ """
+ Checks whether repository information is up to date with configuration
+ files by comparing timestamps. If not, repository cache will be
+ released.
+ """
+ dirty = False
+ if self._repodir_mtimes:
+ for repodir in self._yum_base.conf.reposdir:
+ if ( os.path.exists(repodir)
+ and ( not repodir in self._repodir_mtimes
+ or ( os.stat(repodir).st_mtime
+ > self._repodir_mtimes[repodir]))):
+ LOG.info("repository config dir %s changed", repodir)
+ dirty = True
+ break
+ if not dirty:
+ for repo in self._yum_base.repos.repos.values():
+ filename = repo.repofile
+ if ( not os.path.exists(filename)
+ or ( int(os.stat(filename).st_mtime)
+ > repo.repo_config_age)):
+ LOG.info('config file of repository "%s" changed',
+ repo.id)
+ dirty = True
+ break
+ if dirty is True:
+ LOG.info("repository cache is dirty, cleaning up ...")
+ self._clear_repository_cache()
+ self._yum_base.getReposFromConfig()
+ if dirty is True or not self._repodir_mtimes:
+ self._update_repodir_mtimes()
+
+ @trace_function
+ def _update_repodir_mtimes(self):
+ """
+ Updates the last modification times of repo configuration directories.
+ """
+ assert self._yum_base is not None
+ for repodir in self._yum_base.conf.reposdir:
+ if os.path.exists(repodir):
+ self._repodir_mtimes[repodir] = os.stat(repodir).st_mtime
+
+ @trace_function
+ def _do_work(self, job):
+ """
+ Dispatcher of incoming jobs. Job is passed to the right handler
+ depending on its class.
+ """
+ if not isinstance(job, jobs.YumJob):
+ raise TypeError("job must be instance of YumJob")
+ try:
+ handler = {
+ jobs.YumGetPackageList : self._handle_get_package_list,
+ jobs.YumFilterPackages : self._handle_filter_packages,
+ jobs.YumInstallPackage : self._handle_install_package,
+ jobs.YumRemovePackage : self._handle_remove_package,
+ jobs.YumUpdateToPackage : self._handle_update_to_package,
+ jobs.YumUpdatePackage : self._handle_update_package,
+ jobs.YumBeginSession : self._handle_begin_session,
+ jobs.YumEndSession : self._handle_end_session,
+ jobs.YumCheckPackage : self._handle_check_package,
+ jobs.YumCheckPackageFile : self._handle_check_package_file,
+ jobs.YumInstallPackageFromURI : \
+ self._handle_install_package_from_uri,
+ jobs.YumGetRepositoryList : \
+ self._handle_get_repository_list,
+ jobs.YumFilterRepositories : self._handle_filter_repositories,
+ jobs.YumSetRepositoryEnabled : \
+ self._handle_set_repository_enabled
+ }[job.__class__]
+ LOG.info("processing job %s(id=%d)",
+ job.__class__.__name__, job.jobid)
+ except KeyError:
+ LOG.error("No handler for job \"%s\"", job.__class__.__name__)
+ raise errors.UnknownJob("No handler for job \"%s\"." %
+ job.__class__.__name__)
+ return handler(**job.job_kwargs)
+
+ @trace_function
+ def _run_transaction(self, name):
+ """
+ Builds and runs the yum transaction and checks for errors.
+ @param name of transaction used only in error description on failure
+ """
+ LOG.info("building transaction %s", name)
+ (code, msgs) = self._yum_base.buildTransaction()
+ if code == 1:
+ LOG.error("building transaction %s failed: %s",
+ name, "\n".join(msgs))
+ raise errors.TransactionBuildFailed(
+ "Failed to build \"%s\" transaction: %s" % (
+ name, "\n".join(msgs)))
+ LOG.info("processing transaction %s", name)
+ self._yum_base.processTransaction()
+ self._yum_base.closeRpmDB()
+
+ @trace_function
+ def _main_loop(self):
+ """
+ This is a main loop called from run(). Jobs are handled here.
+ It accepts a job from input queue, handles it,
+ sends the result to output queue and marks the job as done.
+
+ It is terminated, when None is received from input queue.
+ """
+ while True:
+ job = self._get_job()
+ if job is not None: # not a terminate command
+ result = jobs.YumJob.RESULT_SUCCESS
+ try:
+ data = self._do_work(job)
+ except Exception: #pylint: disable=W0703
+ result = jobs.YumJob.RESULT_ERROR
+ # (type, value, traceback)
+ data = sys.exc_info()
+ # traceback is not pickable - replace it with formatted
+ # text
+ data = (data[0], data[1], traceback.format_tb(data[2]))
+ LOG.exception("job %s failed", job)
+ self._jobmgr.finish_job(job, result, data)
+ if job is None:
+ LOG.info("waiting for %s to finish", self._jobmgr.name)
+ self._jobmgr.join()
+ break
+
+ @trace_function
+ def _handle_begin_session(self):
+ """
+ Handler for session begin job.
+ """
+ self._session_level += 1
+ LOG.info("beginning session level %s", self._session_level)
+ if self._session_level == 1:
+ self._init_database()
+ self._lock_database()
+
+ @trace_function
+ def _handle_end_session(self):
+ """
+ Handler for session end job.
+ """
+ LOG.info("ending session level %d", self._session_level)
+ self._session_level = max(self._session_level - 1, 0)
+ if self._session_level == 0:
+ self._unlock_database()
+ self._session_ended = True
+
+ @_needs_database
+ def _handle_get_package_list(self, kind, allow_duplicates, sort,
+ include_repos=None, exclude_repos=None, transform=True):
+ """
+ Handler for listing packages job.
+ @param transform says, whether to return just a package abstractions
+ or original ones
+ @return [pkg1, pkg2, ...]
+ """
+ if kind == 'avail_notinst':
+ what = 'available'
+ elif kind == 'available':
+ what = 'all'
+ elif kind == 'avail_reinst':
+ what = 'all'
+ else:
+ what = kind
+ with RepoFilterSetter(self._yum_base, include_repos, exclude_repos):
+ LOG.debug("calling YumBase.doPackageLists(%s, showdups=%s)",
+ what, allow_duplicates)
+ pkglist = self._yum_base.doPackageLists(what,
+ showdups=allow_duplicates)
+ if kind == 'all':
+ result = pkglist.available + pkglist.installed
+ elif kind == 'available':
+ result = pkglist.available + pkglist.reinstall_available
+ elif kind == 'avail_reinst':
+ result = pkglist.reinstall_available
+ else: # get installed or available
+ result = getattr(pkglist, what)
+ if sort is True:
+ result.sort()
+ LOG.debug("returning %s packages", len(result))
+ return self._cache_packages(result, transform=transform)
+
+ @_needs_database
+ def _handle_filter_packages(self, kind, allow_duplicates, sort,
+ include_repos=None, exclude_repos=None,
+ transform=True, **filters):
+ """
+ Handler for filtering packages job.
+ @return [pkg1, pkg2, ...]
+ """
+ pkglist = self._handle_get_package_list(kind, allow_duplicates, False,
+ include_repos=include_repos, exclude_repos=exclude_repos,
+ transform=False)
+ matches = _get_package_filter_function(filters)
+ result = [p for p in pkglist if matches(p)]
+ if sort is True:
+ result.sort()
+ LOG.debug("%d packages matching", len(result))
+ if transform is True:
+ # caching has been already done by _handle_get_package_list()
+ result = self._transform_packages(result, cache_packages=False)
+ return result
+
+ @_needs_database
+ def _handle_install_package(self, pkg, force=False):
+ """
+ Handler for package installation job.
+ @return installed package instance
+ """
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages(
+ 'available' if force else 'avail_notinst',
+ allow_duplicates=False, sort=True,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ elif len(pkgs) > 1:
+ LOG.warn('multiple packages matches nevra "%s": [%s]',
+ pkg, ", ".join(p.nevra for p in pkgs))
+ pkg_desired = pkgs[-1]
+ else:
+ pkg_desired = self._lookup_package(pkg)
+ if isinstance(pkg_desired, yum.rpmsack.RPMInstalledPackage):
+ if force is False:
+ raise errors.PackageAlreadyInstalled(pkg)
+ action = "reinstall"
+ else:
+ action = "install"
+ getattr(self._yum_base, action)(pkg_desired)
+ self._run_transaction(action)
+ installed = self._handle_filter_packages("installed", False, False,
+ nevra=util.pkg2nevra(pkg_desired, with_epoch="ALWAYS"))
+ if len(installed) < 1:
+ raise errors.TransactionExecutionFailed(
+ "Failed to install desired package %s." % pkg)
+ return installed[0]
+
+ @_needs_database
+ def _handle_remove_package(self, pkg):
+ """
+ Handler for package removal job.
+ """
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages('installed',
+ allow_duplicates=False, sort=False,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ pkg = pkgs[-1]
+ else:
+ pkg = self._lookup_package(pkg)
+ self._yum_base.remove(pkg)
+ self._run_transaction("remove")
+
+ @_needs_database
+ def _handle_update_to_package(self, pkg):
+ """
+ Handler for specific package update job.
+ @return package corresponding to pkg after update
+ """
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages('available',
+ allow_duplicates=False, sort=False,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ pkg_desired = pkgs[-1]
+ else:
+ pkg_desired = self._lookup_package(pkg)
+ self._yum_base.update(update_to=True,
+ name=pkg_desired.name,
+ epoch=pkg_desired.epoch,
+ version=pkg_desired.version,
+ release=pkg_desired.release,
+ arch=pkg_desired.arch)
+ self._run_transaction("update")
+ installed = self._handle_filter_packages("installed", False, False,
+ **pkg.key_props)
+ if len(installed) < 1:
+ raise errors.TransactionExecutionFailed(
+ "Failed to update to desired package %s." % pkg)
+ return installed[0]
+
+ @_needs_database
+ def _handle_update_package(self, pkg, to_epoch, to_version, to_release,
+ _force=False):
+ """
+ Handler for package update job.
+ @return updated package instance
+ """
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages('installed',
+ allow_duplicates=False, sort=False,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ pkg = pkgs[-1]
+ else:
+ pkg = self._lookup_package(pkg)
+ if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ raise errors.PackageNotInstalled(pkg)
+ kwargs = { "name" : pkg.name, "arch" : pkg.arch }
+ if any(v is not None for v in (to_epoch, to_version, to_release)):
+ kwargs["update_to"] = True
+ if to_epoch:
+ kwargs["to_epoch"] = to_epoch
+ if to_version:
+ kwargs["to_version"] = to_version
+ if to_release:
+ kwargs["to_release"] = to_release
+ self._yum_base.update(**kwargs)
+ self._run_transaction("update")
+ kwargs = dict( (k[3:] if k.startswith("to_") else k, v)
+ for k, v in kwargs.items())
+ installed = self._handle_filter_packages(
+ "installed", False, False, **kwargs)
+ if len(installed) < 1:
+ raise errors.TransactionExecutionFailed(
+ "Failed to update package %s." % pkg)
+ return installed[0]
+
+ @_needs_database
+ def _handle_check_package(self, pkg, file_name=None):
+ """
+ @return PackageCheck instance for requested package
+ """
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages('installed',
+ allow_duplicates=False, sort=False,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ rpm = pkgs[-1]
+ pkg = self._transform_packages((rpm, ), cache_packages=False)[0]
+ else:
+ rpm = self._lookup_package(pkg)
+ if not isinstance(rpm, yum.rpmsack.RPMInstalledPackage):
+ raise errors.PackageNotInstalled(rpm)
+ vpkg = yum.packages._RPMVerifyPackage(rpm, rpm.hdr.fiFromHeader(),
+ packagecheck.pkg_checksum_type(rpm), [], True)
+ return (pkg, packagecheck.make_package_check_from_db(vpkg,
+ file_name=file_name))
+
+ @_needs_database
+ def _handle_check_package_file(self, pkg, file_name):
+ """
+ @return PackageCheck instance for requested package containing
+ just one PackageFile instance for given ``file_name``.
+ """
+ return self._handle_check_package(pkg, file_name)
+
+ @_needs_database
+ def _handle_install_package_from_uri(self, uri,
+ update_only=False, force=False):
+ """
+ @return installed PackageInfo instance
+ """
+ try:
+ pkg = yum.packages.YumUrlPackage(self,
+ ts=self._yum_base.rpmdb.readOnlyTS(), url=uri,
+ ua=default_grabber)
+ except yum.Errors.MiscError as exc:
+ raise errors.PackageOpenError(uri, str(exc))
+ installed = self._handle_filter_packages("installed", False, False,
+ nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS"))
+ if installed and force is False:
+ raise errors.PackageAlreadyInstalled(pkg)
+ kwargs = { 'po' : pkg }
+ if installed:
+ action = 'reinstallLocal'
+ else:
+ action = 'installLocal'
+ kwargs = { 'updateonly' : update_only }
+ getattr(self._yum_base, action)(uri, **kwargs)
+ self._run_transaction('installLocal')
+ installed = self._handle_filter_packages("installed", False, False,
+ nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS"))
+ if len(installed) < 1:
+ raise errors.TransactionExecutionFailed(
+ "Failed to install desired package %s." % pkg)
+ return installed[0]
+
+ @_needs_database
+ def _handle_get_repository_list(self, kind, transform=True):
+ """
+ @return list of yumdb.Repository instances
+ """
+ self._check_repository_configs()
+ if kind == 'enabled':
+ repos = sorted(self._yum_base.repos.listEnabled())
+ else:
+ repos = self._yum_base.repos.repos.values()
+ if kind == 'disabled':
+ repos = [repo for repo in repos if not repo.enabled]
+ repos.sort()
+ if transform:
+ repos = [repository.make_repository_from_db(r) for r in repos]
+ LOG.debug("returning %d repositories from %s",
+ len(repos), kind)
+ return repos
+
+ @_needs_database
+ def _handle_filter_repositories(self, kind, **filters):
+ """
+ @return list of yumdb.Repository instances -- filtered
+ """
+ filters = dict((k, v) for k, v in filters.items() if v is not None)
+ if 'repoid' in filters:
+ self._check_repository_configs()
+ try:
+ repo = repository.make_repository_from_db(
+ self._yum_base.repos.getRepo(filters["repoid"]))
+ if ( (kind == "enabled" and not repo.enabled)
+ or (kind == "disabled" and repo.enabled)):
+ LOG.warn(
+ 'no such repository with id="%s"matching filters',
+ filters['repoid'])
+ return []
+ LOG.debug(
+ "exactly one repository matching filters found")
+ return [repo]
+ except (KeyError, yum.Errors.RepoError):
+ LOG.warn('repository with id="%s" could not be found',
+ filters['repoid'])
+ raise errors.RepositoryNotFound(filters['repoid'])
+
+ repos = self._handle_get_repository_list(kind, transform=False)
+ result = []
+ for repo in repos:
+ # do the filtering and safe transformed repo into result
+ for prop, value in filters.items():
+ if repository.get_prop_from_yum_repo(repo, prop) != value:
+ # did not pass the filter
+ break
+ else: # all properties passed
+ result.append(repository.make_repository_from_db(repo))
+ LOG.debug("found %d repositories matching", len(result))
+ return result
+
+ @_needs_database
+ def _handle_set_repository_enabled(self, repo, enable):
+ """
+ @return previous enabled state
+ """
+ self._check_repository_configs()
+ if isinstance(repo, repository.Repository):
+ repoid = repo.repoid
+ else:
+ repoid = repo
+ try:
+ repo = self._yum_base.repos.getRepo(repoid)
+ except (KeyError, yum.Errors.RepoError):
+ raise errors.RepositoryNotFound(repoid)
+ res = repo.enabled
+ try:
+ if enable ^ res:
+ if enable is True:
+ LOG.info("enabling repository %s" % repo)
+ repo.enable()
+ else:
+ LOG.info("disabling repository %s" % repo)
+ repo.disable()
+ try:
+ yum.config.writeRawRepoFile(repo, only=["enabled"])
+ except Exception as exc:
+ raise errors.RepositoryChangeError(
+ 'failed to modify repository "%s": %s - %s' % (
+ repo, exc.__class__.__name__, str(exc)))
+ else:
+ LOG.info("no change for repo %s", repo)
+ except yum.Errors.RepoError as exc:
+ raise errors.RepositoryChangeError(
+ 'failed to modify repository "%s": %s' % (repo, str(exc)))
+ return res
+
+ # *************************************************************************
+ # Public properties
+ # *************************************************************************
+ @property
+ def uplink(self):
+ """
+ @return input queue for jobs
+ """
+ return self._jobmgr.queue_in
+
+ @property
+ def downlink(self):
+ """
+ @return output queue for job results
+ """
+ return self._jobmgr.queue_out
+
+ # *************************************************************************
+ # Public methods
+ # *************************************************************************
+ def run(self):
+ """
+ Thread's entry point. After initial setup it calls _main_loop().
+ """
+ if self._logging_config is not None:
+ setup_logging(self._logging_config)
+ global LOG
+ LOG = logging.getLogger(__name__)
+ LOG.info("running as pid=%d", self.pid)
+ self._jobmgr.start()
+ LOG.info("started %s as thread %s",
+ self._jobmgr.name, self._jobmgr.ident)
+ self._pkg_cache = weakref.WeakValueDictionary()
+
+ # This allows the code, that can be run both from broker and
+ # YumWorker, to check, whether it's called by this process.
+ from lmi.software.yumdb import YumDB
+ YumDB.RUNNING_UNDER_CIMOM_PROCESS = False
+
+ self._main_loop()
+ LOG.info("terminating")
+
diff --git a/src/software/lmi/software/yumdb/repository.py b/src/software/lmi/software/yumdb/repository.py
new file mode 100644
index 0000000..758ae21
--- /dev/null
+++ b/src/software/lmi/software/yumdb/repository.py
@@ -0,0 +1,202 @@
+# -*- encoding: utf-8 -*-
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Module holding an abstraction for YUM repository.
+"""
+
+from datetime import datetime
+import logging
+import yum
+import yum.Errors
+
+# maps names of Repository properties to their corresponding property
+# names in YumRepository object
+PROPERTY_NAME_MAP = {
+ "repoid" : "id",
+ "base_urls" : "baseurl",
+ "config_file" : "repofile",
+ "cost" : "cost",
+ "enabled" : "enabled",
+ "gpg_check" : "gpgcheck",
+ "last_edit" : "repo_config_age",
+ "mirror_list" : "mirrorlist",
+ "mirror_urls" : "mirrorurls",
+ "name" : "name",
+ "pkg_dir" : "pkgdir",
+ "ready" : "ready",
+ "repo_gpg_check" : "repo_gpgcheck",
+ "timeout" : "timeout"
+}
+
+def get_prop_from_yum_repo(repo, prop_name):
+ """
+ @param prop_name is one Repository properties
+ @return value of property from object of YumRepository
+ """
+ if not isinstance(repo, yum.yumRepo.YumRepository):
+ raise TypeError("repo must be in instance of yum.yumRepo.YumRepository")
+ if prop_name in PROPERTY_NAME_MAP:
+ val = getattr(repo, PROPERTY_NAME_MAP[prop_name])
+ if prop_name == "last_edit":
+ val = datetime.fromtimestamp(val)
+ elif prop_name == "mirror_urls" and not repo.mirrorlist:
+ val = None
+ elif prop_name == "ready":
+ val = val()
+ elif prop_name in {"arch", "basearch", "releasever"}:
+ val = repo.yumvar[prop_name]
+ elif prop_name in {"revision", "last_update"}:
+ if repo.enabled and repo.repoXML:
+ md = repo.repoXML
+ if prop_name == "last_update":
+ val = datetime.fromtimestamp(md.timestamp)
+ elif prop_name == "revision":
+ val = long(md.revision)
+ else:
+ val = getattr(repo.repoXML, prop_name)
+ else:
+ val = None
+ elif prop_name == "pkg_count":
+ # this needs populated sack: ydb.repos.populateSack(repo.id)
+ val = len(repo.sack)
+ else:
+ raise ValueError('Unknown repository property: "%s"' % prop_name)
+ return val
+
+class Repository(object):
+ """
+ Container for repository metadata. It represents yum repository.
+ It's supposed to be passed from YumWorker to YumDB client and
+ vice-versa.
+ """
+ __slots__ = (
+ "objid", # [int] id of python object on server process
+ "repoid", # [string] repository id name
+ # (name of config file)
+
+ "arch", # [string] architecture of packages
+ "basearch", # [string] base system architecture
+ "base_urls", # [list] base urls as strings
+ #"cache",
+ #"cache_dir",
+ "name", # [string] repository descriptive name
+ "config_file", # [string] file path to corresponding
+ # config file
+ "cost", # [int] cost of repository
+ "enabled", # [boolean] whether repo is enabled
+ "gpg_check", # [boolean] whether to check gpg signature
+ #"metadata_expire", # how long are metadata considered valid
+ "last_edit", # datetime of last config modification
+ "last_update", # datetime of last change of repository
+ # on server
+ "mirror_list", # url of mirrorlist, or None
+ "mirror_urls", # list of urls obtained from mirrorlist or None
+ #"persist_dir", #
+ #"pkg_count", # number of packages in directory
+ "pkg_dir", # directory with packages
+ #"proxy", # boolean saying whether this is a proxy
+ "ready", # boolean saying, whether it's ready for use
+ "releasever", # version of targeted distribution
+ "repo_gpg_check", # [boolean] whether to check gpg
+ # signarues of data
+ "revision",
+ "timeout", # timeout for requests
+ )
+
+ def __init__(self, objid, repoid, arch, basearch, base_urls,
+ config_file, cost, enabled, gpg_check, last_edit, last_update,
+ name, pkg_dir, ready, releasever, repo_gpg_check, revision,
+ timeout, mirror_list=None, mirror_urls=None):
+ for arg in ('last_edit', 'last_update'):
+ if ( locals()[arg] is not None
+ and not isinstance(locals()[arg], datetime)):
+ raise TypeError("%s must be an instance of datetime" % arg)
+ if not isinstance(timeout, float):
+ raise TypeError("timeout must be a float")
+ for arg in ('cost', 'revision'):
+ if ( locals()[arg] is not None
+ and not isinstance(locals()[arg], (int, long))):
+ raise TypeError("%s must be an integer" % arg)
+ self.objid = objid
+ self.repoid = repoid
+ self.arch = arch
+ self.basearch = basearch
+ self.base_urls = list(base_urls)
+ self.config_file = config_file
+ self.cost = cost
+ self.enabled = bool(enabled)
+ self.gpg_check = bool(gpg_check)
+ self.last_edit = last_edit
+ self.last_update = last_update
+ self.mirror_list = "" if not mirror_list else mirror_list
+ self.mirror_urls = [] if not mirror_urls else list(mirror_urls)
+ self.name = name
+ #self.pkg_count = pkg_count
+ self.pkg_dir = pkg_dir
+ self.ready = bool(ready)
+ self.releasever = releasever
+ self.repo_gpg_check = bool(repo_gpg_check)
+ self.revision = revision
+ self.timeout = timeout
+
+ def __str__(self):
+ return self.repoid
+
+ def __getstate__(self):
+ """
+ Used for serialization with pickle.
+ @return container content that will be serialized
+ """
+ return dict((k, getattr(self, k)) for k in self.__slots__)
+
+ def __setstate__(self, state):
+ """
+ Used for deserialization with pickle.
+ Restores the object from serialized form.
+ @param state is an object created by __setstate__() method
+ """
+ for k, value in state.items():
+ setattr(self, k, value)
+
+def make_repository_from_db(repo):
+ """
+ Create instance of Repository from instance of yum.yumRepo.YumRepository
+ @return instance of Repository
+ """
+ if not isinstance(repo, yum.yumRepo.YumRepository):
+ raise TypeError("repo must be in instance of yum.yumRepo.YumRepository")
+ metadata = {}
+ for prop_name in Repository.__slots__[1:]:
+ try:
+ metadata[prop_name] = get_prop_from_yum_repo(repo, prop_name)
+ except yum.Errors.RepoError as exc:
+ # some properties can cause error (like requesting ready)
+ logging.getLogger(__name__).warn(
+ 'failed to get property "%s" of repo "%s": %s' % (
+ prop_name, repo.name, exc))
+ if prop_name == "ready":
+ metadata[prop_name] = False
+ continue
+
+ res = Repository(id(repo), **metadata)
+ return res
+
diff --git a/src/software/lmi/software/yumdb/util.py b/src/software/lmi/software/yumdb/util.py
new file mode 100644
index 0000000..18fcf6b
--- /dev/null
+++ b/src/software/lmi/software/yumdb/util.py
@@ -0,0 +1,163 @@
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+
+"""
+Common utilities meant to be used only be ``yumdb`` subpackage.
+"""
+
+from itertools import chain
+import inspect
+import logging
+import os
+
+from lmi.common import cmpi_logging
+
+class DispatchingFormatter:
+ """
+ Formatter class for logging module. It allows to predefine different
+ format string for paricular module names.
+ """
+ def __init__(self, formatters, default):
+ """
+ *format* in parameters description can be either ``string`` or
+ another formatter object.
+
+ :param formatters (``dict``) Mapping of module names to *format*.
+ :param default Default *format*.
+ """
+ for k, formatter in formatters.items():
+ if isinstance(formatter, basestring):
+ formatters[k] = logging.Formatter(formatter)
+ self._formatters = formatters
+ if isinstance(default, basestring):
+ default = logging.Formatter(default)
+ self._default_formatter = default
+
+ def format(self, record):
+ """
+ Interface for logging module.
+ """
+ formatter = self._formatters.get(record.name, self._default_formatter)
+ return formatter.format(record)
+
+# *****************************************************************************
+# Decorators
+# *****************************************************************************
+def trace_function(func):
+ """
+ Decorator for logging entries and exits of function or method.
+ """
+ if not inspect.ismethod(func) and not inspect.isfunction(func):
+ raise TypeError("func must be a function")
+
+ def _print_value(val):
+ """
+ Used here for printing function arguments. Shortens the output
+ string, if that would be too long.
+ """
+ if isinstance(val, list):
+ if len(val) < 2:
+ return str(val)
+ else:
+ return "[%s, ...]" % _print_value(val[0])
+ return str(val)
+
+ logger = logging.getLogger(__name__+'.trace_function')
+ module = func.__module__.split('.')[-1]
+ lineno = inspect.currentframe().f_back.f_lineno
+
+ def _wrapper(self, *args, **kwargs):
+ """
+ Wrapper for function or method, that does the logging.
+ """
+ if logger.isEnabledFor(logging.DEBUG):
+ frm = inspect.currentframe()
+ logargs = {
+ "caller_file" : os.path.basename(os.path.splitext(
+ frm.f_back.f_code.co_filename)[0]),
+ "caller_lineno" : frm.f_back.f_lineno,
+ "module" : module,
+ "func" : func.__name__,
+ "lineno" : lineno,
+ "action" : "entering",
+ "args" : ", ".join(chain(
+ (_print_value(a) for a in args),
+ ( "%s=%s"%(k, _print_value(v))
+ for k, v in kwargs.items())))
+ }
+
+ if not logargs["args"]:
+ logargs["args"] = ""
+ else:
+ logargs["args"] = " with args=(%s)" % logargs["args"]
+ logger.debug("%(caller_file)s:%(caller_lineno)d - %(action)s"
+ " %(module)s:%(func)s:%(lineno)d%(args)s" , logargs)
+ try:
+ result = func(self, *args, **kwargs)
+ if logger.isEnabledFor(logging.DEBUG):
+ logargs["action"] = "exiting"
+ logger.debug("%(caller_file)s:%(caller_lineno)d - %(action)s"
+ " %(module)s:%(func)s:%(lineno)d", logargs)
+ except Exception as exc:
+ if logger.isEnabledFor(logging.DEBUG):
+ logargs['action'] = 'exiting'
+ logargs['error'] = str(exc)
+ logger.debug("%(caller_file)s:%(caller_lineno)d - %(action)s"
+ " %(module)s:%(func)s:%(lineno)d with error: %(error)s",
+ logargs)
+ raise
+ return result
+
+ return _wrapper
+
+def setup_logging(config):
+ """
+ This is meant to be used by ``YumWorker`` process to setup logging
+ independent of what providers are using. Unfortunately ``YumWorker``
+ can not use the same facilities as the rest of program, because
+ logging is done through *broker*.
+ """
+ try:
+ logging.config.dictConfig(config)
+ cmpi_logging.logger = logging.getLogger('lmi.software.yumdb')
+ except Exception: #pylint: disable=W0703
+ # logging is not set up but client expects us to work
+ # all messages are dumped to /dev/null
+ logging.config.dictConfig({
+ 'version' : 1,
+ 'disable_existing_loggers' : True,
+ 'handlers': {
+ 'null' : {
+ 'class': 'logging.handlers.FileHandler',
+ 'level': 'CRITICAL',
+ 'filename': '/dev/null'
+ }
+ },
+ 'loggers' : {
+ 'root' : {
+ 'level': 'CRITICAL',
+ 'handlers' : ['null'],
+ 'propagate' : False
+ }
+ }
+ })
+
+