diff options
Diffstat (limited to 'src/software/lmi/software/yumdb')
-rw-r--r-- | src/software/lmi/software/yumdb/__init__.py | 677 | ||||
-rw-r--r-- | src/software/lmi/software/yumdb/errors.py | 111 | ||||
-rw-r--r-- | src/software/lmi/software/yumdb/jobmanager.py | 574 | ||||
-rw-r--r-- | src/software/lmi/software/yumdb/jobs.py | 668 | ||||
-rw-r--r-- | src/software/lmi/software/yumdb/packagecheck.py | 235 | ||||
-rw-r--r-- | src/software/lmi/software/yumdb/packageinfo.py | 181 | ||||
-rw-r--r-- | src/software/lmi/software/yumdb/process.py | 914 | ||||
-rw-r--r-- | src/software/lmi/software/yumdb/repository.py | 202 | ||||
-rw-r--r-- | src/software/lmi/software/yumdb/util.py | 163 |
9 files changed, 3725 insertions, 0 deletions
diff --git a/src/software/lmi/software/yumdb/__init__.py b/src/software/lmi/software/yumdb/__init__.py new file mode 100644 index 0000000..656b090 --- /dev/null +++ b/src/software/lmi/software/yumdb/__init__.py @@ -0,0 +1,677 @@ +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Since yum API functions should not be called with different thread_ids +repeatedly in the same program. It's neccessary, to make these calls +in single thread. But the provider needs to be able to clean up itself, +when its not needed. That's why the yum API needs to be accessed from +separated process, that is created and terminated when needed. + +This package contains all the bowels of this separate process together +with its management and communication facilities. + +YumDB is a context manager supposed to be used by any provider as the +only accessor to yum api. +""" + +import errno +import inspect +import os +import re +import time +from multiprocessing import Process, Queue #pylint: disable=W0404 +from pywbem.cim_provider2 import CIMProvider2 +import Queue as TQueue # T as threaded +import threading +import yum + +from lmi.common import cmpi_logging, singletonmixin +from lmi.common.IndicationManager import IndicationManager +from lmi.software.yumdb import jobs +from lmi.software.yumdb import errors +from lmi.software.yumdb.packageinfo import PackageInfo +from lmi.software.yumdb.packagecheck import PackageFile +from lmi.software.yumdb.packagecheck import PackageCheck +from lmi.software.yumdb.process import YumWorker +from lmi.software.yumdb.repository import Repository +from lmi.software.yumdb.util import DispatchingFormatter +from lmi.software.util import get_signal_name + +# Maximum time in seconds to wait for a job to accomplish. +# If timeout expires, spawned process is checked (it might +# be possibly killed) and is respawned in case it's dead. +MAX_JOB_WAIT_TIME = 30 + +# this may be used as an argument to YumWorker to setup logging +YUM_WORKER_DEBUG_LOGGING_CONFIG = { + "version" : 1, + 'disable_existing_loggers' : True, + "formatters": { + # this is a message format for logging function/method calls + # it's manually set up in YumWorker's init method + "default": { + "()": DispatchingFormatter, + "formatters" : { + "lmi.software.yumdb.util.trace_function": + "%(asctime)s %(levelname)s:%(message)s" + }, + "default" : "%(asctime)s %(levelname)s:%(module)s:" + "%(funcName)s:%(lineno)d - %(message)s" + }, + }, + "handlers": { + "file" : { + "class" : "logging.handlers.RotatingFileHandler", + "filename" : "/var/tmp/YumWorker.log", + "level" : "DEBUG", + "formatter": "default", + }, + }, + "loggers" : { + "root": { + "level": "DEBUG", + "handlers" : ["file"] + }, + "lmi.software.yumdb": { + "level" : "DEBUG", + "handlers" : ["file"], + "propagate" : False, + }, + } +} + +# ***************************************************************************** +# Utilities +# ***************************************************************************** +def log_reply_error(job, reply): + """ + Raises an exception in case of error occured in worker process + while processing job. + """ + if isinstance(reply, (int, long)): + # asynchronous job + return + if not isinstance(reply, jobs.YumJob): + raise TypeError('expected instance of jobs.YumJob for reply, not "%s"' % + reply.__class__.__name__) + if reply.result == jobs.YumJob.RESULT_ERROR: + cmpi_logging.logger.error( + "YumDB: %s failed with error %s: %s", + job, reply.result_data[0].__name__, str(reply.result_data[1])) + cmpi_logging.logger.trace_warn( + "YumDB: %s exception traceback:\n%s%s: %s", + job, "".join(reply.result_data[2]), + reply.result_data[0].__name__, str(reply.result_data[1])) + reply.result_data[1].tb_printed = True + raise reply.result_data[1] + elif reply.result == jobs.YumJob.RESULT_TERMINATED: + cmpi_logging.logger.warn('YumDB: %s terminated', job) + else: + cmpi_logging.logger.debug('YumDB: %s completed with success', job) + +def _make_async_job(jobcls, *args, **kwargs): + """Creates asynchronous job, filling it wih some metadata.""" + if not issubclass(jobcls, jobs.YumAsyncJob): + raise TypeError("jobcls must be a subclass of YumAsyncJob") + job = jobcls(*args, **kwargs) + if job.metadata is None: + job.metadata = {} + job.metadata['name'] = \ + type(job).__name__[len('Yum'):] + ('-%d' % job.jobid) + return job + +# ***************************************************************************** +# Decorators +# ***************************************************************************** +def job_request(async=False): + """ + Decorator factory for job entry points. They are YumDB methods. + All of them must return either job objects or jobid for asynchronous calls. + Job objects are processed by this decorator for caller to obtain only the + information he needs. + + It wrapps them with logger wrapper and in case of asynchronous jobs, + it returns just the jobid. + """ + def _decorator(method): + """ + Decorator that just logs the method's call and returns job's result. + """ + logged = cmpi_logging.trace_method(method) + def _new_func(self, *args, **kwargs): + """Wrapper for YumDB's method.""" + return logged(self, *args, **kwargs).result_data + return _new_func + + def _decorator_async(method): + """ + Decorator for methods accepting async argument. In case of async=True, + the method returns jobid. Job's result is returned otherwise. + """ + logged = cmpi_logging.trace_method(method) + def _new_func(self, *args, **kwargs): + """Wrapper for YumDB's method.""" + callargs = inspect.getcallargs(method, self, *args, **kwargs) + result = logged(self, *args, **kwargs) + if callargs.get('async', False): + return result + else: + return result.result_data + return _new_func + + return _decorator_async if async else _decorator + +class YumDB(singletonmixin.Singleton): + """ + Context manager for accessing yum/rpm database. + All requests are bundled into jobs -- instances of jobs.YumJob and + sent to YumWorker for processing. + + YumWorker is a separate process handling all calls to yum api. + Communication is done via queues (uplink and downlink). + Uplink is used to send jobs to YumWorker and downlink for obtaining + results. + + This is implemented in thread safe manner. + + It should be used as a context manager in case, we want to process + multiple jobs in single transaction. The example of usage: + with YumDB.getInstance() as ydb: + pkgs = ydb.filter_packages(...) + for pkg in pkgs: + ydb.install_package(pkg) + ... + Yum database stays locked in whole block of code under with statement. + """ + + # this is to inform Singleton, that __init__ should be called only once + ignoreSubsequent = True + + # This serves to all code base as a global variable used to check, + # whether YumDB instance is running under cimom broker or under worker + # process. This is important for code used in callback functions passed + # to worker responsible for creating instances of ConcreteJob. This code + # must avoid using calls to YumDB while running under worker. This + # + # Worker process must set this to False before starting its event handling + # loop. + RUNNING_UNDER_CIMOM_PROCESS = True + + @cmpi_logging.trace_method + def __init__(self, **kwargs): #pylint: disable=W0231 + """ + All arguments are passed to yum.YumBase constructor. + """ + self._process = None + if kwargs is None: + kwargs = {} + self._yum_kwargs = kwargs + + self._session_lock = threading.RLock() + self._session_level = 0 + + # used to guard access to _expected list and _process + self._reply_lock = threading.Lock() + # used to wait for job to be processed and received + self._reply_cond = threading.Condition(self._reply_lock) + # ids of all expected jobs -- those to be processed by YumWorker + self._expected = [] + # {job_id : reply, ... } + self._replies = {} + cmpi_logging.logger.trace_info('YumDB: initialized') + + # ************************************************************************* + # Private methods + # ************************************************************************* + @cmpi_logging.trace_method + def _handle_reply_timeout(self, job): + """ + This is called when timeout occurs while waiting on downlink queue for + reply. Delay can be caused by worker process's early termination (bug). + This handler tries to recover from such an situation. + """ + if not self._worker.is_alive(): + if self._worker.exitcode < 0: + cmpi_logging.logger.error("[jobid=%d] worker" + " process(pid=%d) killed by signal %s", job.jobid, + self._worker.pid, get_signal_name(-self._process.exitcode)) + else: + cmpi_logging.logger.error("[jobid=%d] worker" + " process(pid=%d) is dead - exit code: %d", + job.jobid, self._process.pid, self._worker.exitcode) + with self._reply_lock: + self._process = None + cmpi_logging.logger.error( + "[jobid=%d] starting new worker process", job.jobid) + self._expected = [] + if not isinstance(job, jobs.YumBeginSession): + with self._session_lock: + if self._session_level > 0: + cmpi_logging.logger.info('restoring session ' + 'level=%d', self._session_level) + new_session_job = jobs.YumBeginSession() + self._worker.uplink.put(new_session_job) + reply = self._worker.downlink.get() + log_reply_error(new_session_job, reply) + self._worker.uplink.put(job) + self._expected.append(job.jobid) + # other waiting processes need to resend their requests + self._reply_cond.notifyAll() + else: + cmpi_logging.logger.info("[jobid=%d] process is running," + " waiting some more", job.jobid) + + @cmpi_logging.trace_method + def _receive_reply(self, job): + """ + Block on downlink queue to receive expected replies from worker + process. Only one thread can be executing this code at any time. + + Only one thread can block on downlink channel to obtain reply. If + it's reply for him, he takes it and leaves, otherwise he adds it to + _replies dictionary and notifies other threads. This thread is the + one, whose job appears as first in _expected list. + + In case, that worker process terminated due to some error. Restart it + and resend all the job requests again. + """ + while True: + cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue", + job.jobid) + try: + jobout = self._worker.downlink.get( + block=True, timeout=MAX_JOB_WAIT_TIME) + if jobout.jobid == job.jobid: + cmpi_logging.logger.debug( + "[jobid=%d] received desired reply", job.jobid) + with self._reply_lock: + self._expected.remove(job.jobid) + if len(self._expected): + self._reply_cond.notify() + return jobout + else: + cmpi_logging.logger.info("[jobid=%d] received reply" + " for another thread (jobid=%d)", + job.jobid, jobout.jobid) + with self._reply_lock: + self._replies[jobout.jobid] = jobout + self._reply_cond.notifyAll() + except TQueue.Empty: + cmpi_logging.logger.warn("[jobid=%d] wait for job reply timeout" + "(%d seconds) occured", job.jobid, MAX_JOB_WAIT_TIME) + self._handle_reply_timeout(job) + + @cmpi_logging.trace_method + def _send_and_receive(self, job): + """ + Sends a request to server and blocks until job is processed by + YumWorker and reply is received. + + Only one thread can block on downlink channel to obtain reply. This + thread is the one, whose job appears as first in _expected list. Server + processes input jobs sequentially. That's why it's safe to presume, + that jobs are received in the same order as they were send. Thanks to + that we don't have to care about receiving replies for the other + waiting threads. + + @return result of job + """ + with self._reply_lock: + self._worker.uplink.put(job) + if getattr(job, 'async', False) is True: + return job.jobid + self._expected.append(job.jobid) + while True: + if job.jobid in self._replies: + cmpi_logging.logger.debug( + "[jobid=%d] desired reply already received", + job.jobid) + try: + self._expected.remove(job.jobid) + except ValueError: + cmpi_logging.logger.warn( + "[jobid=%d] reply not in expected list", + job.jobid) + return self._replies.pop(job.jobid) + elif job.jobid not in self._expected: + # process terminated, resending job + cmpi_logging.logger.warn("[jobid=%d] job removed" + " from expected list, sending request again", job.jobid) + self._worker.uplink.put(job) + self._expected.append(job.jobid) + elif job.jobid == self._expected[0]: + # now it's our turn to block on downlink + break + else: # another thread blocks on downlink -> let's sleep + cmpi_logging.logger.debug( + "[jobid=%d] another %d threads expecting reply," + " suspending...", job.jobid, len(self._expected) - 1) + self._reply_cond.wait() + cmpi_logging.logger.debug( + "[jobid=%d] received reply, waking up", job.jobid) + return self._receive_reply(job) + + def _do_job(self, job): + """ + Sends the job to YumWorker process and waits for reply. + If reply is a tuple, there was an error, while job processing. + Incoming exception is in format: + (exception_type, exception_value, formated_traceback_as_string) + @return reply + """ + cmpi_logging.logger.trace_verbose("YumDB: doing %s", job) + reply = self._send_and_receive(job) + log_reply_error(job, reply) + cmpi_logging.logger.trace_verbose("YumDB: job %s done", job.jobid) + return reply + + @property + def _worker(self): + """ + YumWorker process accessor. It's created upon first need. + """ + if self._process is None: + cmpi_logging.logger.trace_info("YumDB: starting YumWorker") + uplink = Queue() + downlink = Queue() + self._process = YumWorker(uplink, downlink, + indication_manager=IndicationManager.get_instance(), + yum_kwargs=self._yum_kwargs) + #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG) + self._process.start() + cmpi_logging.logger.trace_info( + "YumDB: YumWorker started with pid=%s", self._process.pid) + return self._process + + # ************************************************************************* + # Special methods + # ************************************************************************* + def __del__(self): + """ + Ensure, that YumWorker process is correctly shutted down. + """ + self.clean_up() + singletonmixing.Singleton.__del__(self) + + @cmpi_logging.trace_method + def __enter__(self): + with self._session_lock: + if self._session_level == 0: + self._do_job(jobs.YumBeginSession()) + cmpi_logging.logger.trace_info('YumDB: new session started') + self._session_level += 1 + cmpi_logging.logger.trace_info('YumDB: nested to session level=%d', + self._session_level) + return self + + @cmpi_logging.trace_method + def __exit__(self, exc_type, exc_value, traceback): + with self._session_lock: + if self._session_level == 1: + self._do_job(jobs.YumEndSession()) + cmpi_logging.logger.trace_info('YumDB: session ended') + cmpi_logging.logger.trace_info('YumDB: emerged from session' + ' level=%d', self._session_level) + self._session_level = max(self._session_level - 1, 0) + + # ************************************************************************* + # Public methods + # ************************************************************************* + @cmpi_logging.trace_method + def clean_up(self): + """ + Shut down the YumWorker process. + """ + with self._reply_lock: + if self._process is not None: + cmpi_logging.logger.info('YumDB: terminating YumWorker') + self._process.uplink.put(None) # terminating command + self._process.join() + cmpi_logging.logger.info('YumDB: YumWorker terminated') + self._process = None + else: + cmpi_logging.logger.warn("YumDB: clean_up called, when process" + " not initialized!") + + # ************************************************************************* + # Jobs with simple results + # ************************************************************************* + @job_request() + def get_package_list(self, kind, + allow_duplicates=False, + sort=False, + include_repos=None, + exclude_repos=None): + """ + @param kind is one of: jobs.YumGetPackageList.SUPPORTED_KINDS + @param allow_duplicates says, whether to list all found versions + of single package + @return [pkg1, pkg2, ...], pkgi is instance of yumdb.PackageInfo + """ + return self._do_job(jobs.YumGetPackageList( + kind, allow_duplicates=allow_duplicates, sort=sort, + include_repos=include_repos, exclude_repos=exclude_repos)) + + @job_request() + def filter_packages(self, kind, + allow_duplicates=False, + sort=False, + include_repos=None, + exclude_repos=None, + **filters): + """ + Similar to get_package_list(), but applies filter on packages. + @see yumdb.jobs.YumFilterPackages job for supported filter keys + """ + return self._do_job(jobs.YumFilterPackages( + kind, allow_duplicates=allow_duplicates, sort=sort, + include_repos=include_repos, exclude_repos=exclude_repos, + **filters)) + + @job_request() + def get_repository_list(self, kind): + """ + @param kind is one of: jobs.YumGetRepositoryList.SUPPORTED_KINDS + @param allow_duplicates says, whether to list all found versions + of single package + @return [pkg1, pkg2, ...], pkgi is instance of yumdb.Repository + """ + return self._do_job(jobs.YumGetRepositoryList(kind)) + + @job_request() + def filter_repositories(self, kind, **filters): + """ + Similar to get_repository_list(), but applies filter on packages. + @see yumdb.jobs.YumFilterRepositories job for supported filter keys + """ + return self._do_job(jobs.YumFilterRepositories(kind, **filters)) + + @job_request() + def set_repository_enabled(self, repoid, enable): + """ + Enable or disable repository. + @param enable is a boolean + """ + return self._do_job(jobs.YumSetRepositoryEnabled(repoid, enable)) + + # ************************************************************************* + # Asynchronous jobs + # ************************************************************************* + @job_request(async=True) + def install_package(self, pkg, async=False, force=False, **metadata): + """ + Install package. + @param pkg is an instance of PackageInfo obtained with + get_package_list() or filter_packages() or a valid nevra as string. + Package must not be installed if force is False. + """ + return self._do_job(_make_async_job(jobs.YumInstallPackage, + pkg, force=force, async=async, metadata=metadata)) + + @job_request(async=True) + def remove_package(self, pkg, async=False, **metadata): + """ + @param pkg is an instance of PackageInfo obtained with + get_package_list() or filter_packages(), which must be installed + """ + return self._do_job(_make_async_job(jobs.YumRemovePackage, + pkg, async=async, metadata=metadata)) + + @job_request(async=True) + def update_to_package(self, desired_pkg, async=False, **metadata): + """ + @param desired_pkg is an instance of PackageInfo, + which must be available + """ + return self._do_job(_make_async_job(jobs.YumUpdateToPackage, + desired_pkg, async=async, metadata=metadata)) + + @job_request(async=True) + def update_package(self, pkg, + async=False, + to_epoch=None, + to_version=None, + to_release=None, + force=False, + **metadata): + """ + @param pkg is an instance of PackageInfo, which must be installed + + The other parameters filter candidate available packages for update. + """ + return self._do_job(_make_async_job(jobs.YumUpdatePackage, + pkg, async, to_epoch, to_version, to_release, force=force, + metadata=metadata)) + + @job_request(async=True) + def check_package(self, pkg, async=False, **metadata): + """ + Return all necessary information from package database for package + verification. + + :param pkg: (``PackageInfo``) An instance of PackageInfo + representing installed package or its nevra string. + :rtype: (``PackageCheck``) + """ + return self._do_job(_make_async_job(jobs.YumCheckPackage, + pkg, async=async, metadata=metadata)) + + @job_request(async=True) + def check_package_file(self, pkg, file_name, async=False): + """ + Return all necessary information from package database concerning + on particular file of package. If ``pkg`` does not contain + ``file_name``, ``FileNotFound`` error is raised. + + :param pkg: (``PackageInfo``) An instance of PackageInfo + representing installed package or its nevra string. + :rtype: (``PackageFile``) + """ + return self._do_job(_make_async_job(jobs.YumCheckPackageFile, + pkg, file_name, async=async)) + + @job_request(async=True) + def install_package_from_uri(self, uri, + async=False, update_only=False, force=False, **metadata): + """ + Install package from uri. + @param uri is either remote url or local path. + """ + return self._do_job(_make_async_job(jobs.YumInstallPackageFromURI, + uri, async, update_only, force=force, metadata=metadata)) + + # ************************************************************************* + # Control of asynchronous jobs + # ************************************************************************* + @job_request() + def get_job(self, jobid): + """ + Return instance of ``YumJob`` with given ``jobid``. + """ + return self._do_job(jobs.YumJobGet(jobid)) + + @job_request() + def get_job_list(self): + """ + Return list of all asynchronous jobs. + """ + return self._do_job(jobs.YumJobGetList()) + + @job_request() + def get_job_by_name(self, name): + """ + Return asynchronous job filtered by its name. + """ + return self._do_job(jobs.YumJobGetByName(name)) + + @job_request() + def set_job_priority(self, jobid, priority): + """ + Change priority of asynchronous job. This will change + its order in queue, if it is still enqeueued. + + Return object of job. + """ + return self._do_job(jobs.YumJobSetPriority(jobid, priority)) + + @job_request() + def update_job(self, jobid, **kwargs): + """ + Update metadata of job. + + :param kwargs: (``dict``) Is a dictionary of job's property names + with mapped new values. Only keys given will be changed in + desired job. + + **Note** that only keys, that do not affect job's priority or its + scheduling for deletion can be changed. See :ref:`YumJobUpdate`. + """ + return self._do_job(jobs.YumJobUpdate(jobid, **kwargs)) + + @job_request() + def reschedule_job(self, jobid, + delete_on_completion, time_before_removal): + """ + Change the scheduling of job for deletion. + + :param delete_on_completion: (``bool``) Says, whether the job will + be scheduled for deletion at ``finished + time_before_removal`` + time. + :param time_before_removal: (``int``) Number of seconds, after the job + is finished, it will be kept alive. + """ + return self._do_job(jobs.YumJobReschedule(jobid, + delete_on_completion, time_before_removal)) + + @job_request() + def delete_job(self, jobid): + """ + Delete job object. This can be called only on finished job. + """ + return self._do_job(jobs.YumJobDelete(jobid)) + + @job_request() + def terminate_job(self, jobid): + """ + Terminate job. This can be called only on *NEW* job. + """ + return self._do_job(jobs.YumJobTerminate(jobid)) + diff --git a/src/software/lmi/software/yumdb/errors.py b/src/software/lmi/software/yumdb/errors.py new file mode 100644 index 0000000..44eb1c7 --- /dev/null +++ b/src/software/lmi/software/yumdb/errors.py @@ -0,0 +1,111 @@ +# -*- encoding: utf-8 -*- +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Exceptions raisable by YumWorker. +""" + +class YumDBError(Exception): + """Base class for all errors under yumdb package.""" + +class DatabaseLockError(YumDBError): + """Raised, when the yum database can not be locked.""" + pass + +class TransactionError(YumDBError): + """Base exception representing yum transaction processing error.""" + pass +class TransactionBuildFailed(TransactionError): + """Raised, when transaction building fails.""" + pass +class PackageAlreadyInstalled(TransactionError): + """Raised, when trying to install already installed package.""" + def __init__(self, pkg): + TransactionError.__init__(self, + 'Package "%s" is already installed.' % pkg) +class PackageOpenError(TransactionError): + """Raised, when trying to open package obtained from URI.""" + def __init__(self, pkg, msg): + TransactionError.__init__(self, + 'Failed to open package "%s": %s' % (pkg, msg)) +class TransactionExecutionFailed(TransactionError): + """Raised, when YumBase.doTransaction() method fails.""" + pass + +class PackageError(YumDBError): + """Generic exception for error concerning package handling.""" + pass +class PackageNotFound(PackageError): + """Raised, when requested package could not be found.""" + pass +class PackageNotInstalled(PackageError): + """Raised, when requested package is not installed for desired action.""" + def __init__(self, pkg): + PackageError.__init__(self, 'Package "%s" is not installed.' % pkg) +class FileNotFound(PackageError): + """ + Raised, when requesting check on file that does not belong to + particular package. + """ + pass + +class RepositoryError(YumDBError): + """Generic exception for error concerning repository handling.""" + pass +class RepositoryNotFound(RepositoryError): + """Raised, when requested repository cound not be found.""" + def __init__(self, repoid): + RepositoryError.__init__(self, "No such repository: %s" % repoid) +class RepositoryChangeError(RepositoryError): + """Raised, when modification of repository failed.""" + pass + +class JobError(YumDBError): + """Generic exception for job handling.""" + pass +class UnknownJob(JobError): + """Raised, when no handler is available for given job on worker.""" + pass +class InvalidURI(JobError): + """Raised, when passed uri is not a valid one.""" + def __init__(self, uri): + JobError.__init__(self, "Invalid uri: \"%s\"" % uri) +class InvalidNevra(JobError): + """Raised when trying to instantiate job with invalid nevra string.""" + pass +class JobControlError(JobError): + """Generic exception for management of asynchronous jobs.""" + pass +class JobNotFound(JobControlError): + """Raised upon request for not existing asynchronous job.""" + def __init__(self, target): + JobControlError.__init__(self, "job %s could not be found" % target) +class InvalidJobState(JobControlError): + """ + Raised when requested operation can not be executed on job in + its current state. + """ + pass + +class IndicationError(YumDBError): + """Generic error for indication handling.""" + pass + diff --git a/src/software/lmi/software/yumdb/jobmanager.py b/src/software/lmi/software/yumdb/jobmanager.py new file mode 100644 index 0000000..a224905 --- /dev/null +++ b/src/software/lmi/software/yumdb/jobmanager.py @@ -0,0 +1,574 @@ +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +This is a module for ``JobManager`` which is a separate thread of +``YumWorker`` process. It keeps a cache of asynchronous jobs and handles +input and output queues. + +This module uses its own logging facilities because it runs in separeted +process not having access to broker logging features. + +Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should +be set to callable taking ``YumJob`` instance and returning its matching +CIM abstraction instance. +""" +import heapq +import inspect +import logging +import Queue +import sys +import threading +import time +import traceback + +from lmi.common.IndicationManager import IndicationManager +from lmi.common.JobManager import JobManager as JM +from lmi.software.yumdb import errors, jobs +from lmi.software.yumdb.util import trace_function + +# This is a callable, which must be initialized before JobManager is used. +# It should be a pointer to function, which takes a job and returns +# corresponding CIM instance. It's used for sending indications. +JOB_TO_MODEL = lambda job: None + +# Minimum time to keep asynchronous job in cache after completion. In seconds. +MINIMUM_TIME_BEFORE_REMOVAL = 10 + +# replacement for cmpi_logging.logger +LOG = None + +# ***************************************************************************** +# Decorators +# ***************************************************************************** +def job_handler(job_from_target=True): + """ + Decorator for JobManager methods serving as handlers for control jobs. + + Decorator locks the job_lock of manager's instance. + """ + def _wrapper_jft(method): + """ + It consumes "target" keyword argument (which is job's id) and makes + it an instance of YumJob. The method is then called with "job" argument + instead of "target". + """ + logged = trace_function(method) + + def _new_func(self, *args, **kwargs): + """Wrapper around method.""" + if 'target' in kwargs: + kwargs['job'] = kwargs.pop('target') + callargs = inspect.getcallargs(method, self, *args, **kwargs) + target = callargs.pop('job') + with self._job_lock: #pylint: disable=W0212 + if not target in self._async_jobs: #pylint: disable=W0212 + raise errors.JobNotFound(target) + job = self._async_jobs[target] #pylint: disable=W0212 + callargs['job'] = job + return logged(**callargs) + return _new_func + + def _simple_wrapper(method): + """Just locks the job lock.""" + def _new_func(self, *args, **kwargs): + """Wrapper around method.""" + with self._job_lock: #pylint: disable=W0212 + return method(self, *args, **kwargs) + return _new_func + + if job_from_target: + return _wrapper_jft + else: + return _simple_wrapper + +class JobIndicationSender(object): + """ + Makes creation and sending of indications easy. It keeps a reference + to job, which can be *snapshotted* for making CIM instance out of it. + These instances are then used to send indications via IndicationManager. + + Typical usage:: + + sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2]) + ... # modify job + sender.snapshot() + sender.send() + + **Note** that number of kept CIM instances won't exceed 2. First one + is created upon instantiation and the second one be calling + ``snapshot()``. Any successive call to ``snapshot()`` will overwrite + the second instance. + """ + + def __init__(self, indication_manager, job, + indications=JM.IND_JOB_CHANGED, new=None): + """ + :param job (``YumJob``) Is job instance, which will be immediately + snapshoted as old instance and later as a new one. + :param indications (``list``) Can either be a list of indication ids + or a single indication id. + :param new (``YumJob``) A job instance stored as new. + """ + if not isinstance(indication_manager, IndicationManager): + raise TypeError("indication_manager must be a subclass of" + " IndicationManager") + if not isinstance(job, jobs.YumJob): + raise TypeError("job must be an instance of YumJob") + if not new is None and not isinstance(new, jobs.YumJob): + raise TypeError("new must be an instance of YumJob") + self._indication_manager = indication_manager + self._job = job + self._old_instance = JOB_TO_MODEL(job) + if new is not None: + new = JOB_TO_MODEL(job) + self._new_instance = new + self._indications = set() + self.indication_ids = indications + + @property + def job(self): + """ + Return instance of ``YumJob``. + """ + return self._job + + @property + def indication_ids(self): + """ + Return set of indication filter IDs. + """ + return self._indications.copy() + + @indication_ids.setter + def indication_ids(self, indication_ids): + """ + Set the indication filter IDs. + + :param indication_ids (``list``) Can be even single id. + """ + if isinstance(indication_ids, basestring): + indication_ids = set([indication_ids]) + self._indications = set(indication_ids) + + @trace_function + def add_indication_ids(self, indication_ids): + """ + Add filter IDs. + """ + if isinstance(indication_ids, basestring): + indication_ids = set([indication_ids]) + self._indications.update(indication_ids) + + @trace_function + def snapshot(self): + """ + Make a second CIM instance, overwriting previous one (not the first). + """ + self._new_instance = JOB_TO_MODEL(self._job) + + @trace_function + def send(self, make_snapshot=False): + """ + Send all requested indications for given job. + """ + if not self._indications: + raise errors.IndicationError( + "can not send any indication without id") + if make_snapshot: + self.snapshot() + if ( JM.IND_JOB_CHANGED in self._indications + and self._new_instance is None): + raise errors.IndicationError("no snapshot made for modified job") + for fltr_id in self._indications: + if fltr_id == JM.IND_JOB_CREATED: + LOG.debug("sending instance creation indication for job %s", + self._job) + self._indication_manager.send_instcreation( + self._new_instance if self._new_instance is not None + else self._old_instance, + fltr_id) + else: + LOG.debug("sending instance modification indication for job %s" + " with ID: %s", self._job, fltr_id) + self._indication_manager.send_instmodification( + self._old_instance, self._new_instance, + fltr_id) + +class JobManager(threading.Thread): + """ + Separate thread for managing queue of jobs requested by client. + There are three kinds of jobs, that are handled differently: + * asynchronous - kept in _async_jobs dictionary until job is + deleted by request or it expires; + no reply is sent to client upon job's completion + * synchronous - reply is sent to client after job's completion; + no reference to the job is kept afterwards + * job control - they are not enqueued in _job_queue for YumWorker + to process, but are handled directly and in the FIFO order + + Both asynchronous and synchronous jobs are enqueued in _job_queue + for YumWorker to obtain them. It's a priority queue sorting jobs by their + priority. + """ + # enumeration of actions, that may be enqueued in calendar + ACTION_REMOVE = 0 + + ACTION_NAMES = ['remove'] + + def __init__(self, queue_in, queue_out, indication_manager): + threading.Thread.__init__(self, name="JobManager") + self._queue_in = queue_in + self._queue_out = queue_out + self._indication_manager = indication_manager + self._terminate = False + + # (time, jobid, action) + self._calendar = [] + # {jobid : job} + self._async_jobs = {} + + # lock for critical access to _calendar, _async_jobs and _job_queue + self._job_lock = threading.RLock() + # priority queue of jobs that are processed by YumWorker + self._job_queue = [] + # condition for YumWorker waiting on empty _job_queue + self._job_enqueued = threading.Condition(self._job_lock) + + # ************************************************************************* + # Private methods + # ************************************************************************* + @trace_function + def _control_job(self, job): + """ + Function dispatching job to handler for particular YumJob subclass. + """ + try: + handler = { + # these are from YumDB client + jobs.YumJobGetList : self._handle_get_list, + jobs.YumJobGet : self._handle_get, + jobs.YumJobGetByName : self._handle_get_by_name, + jobs.YumJobSetPriority : self._handle_set_priority, + jobs.YumJobReschedule : self._handle_reschedule, + jobs.YumJobUpdate : self._handle_update, + jobs.YumJobDelete : self._handle_delete, + jobs.YumJobTerminate : self._handle_terminate, + }[job.__class__] + LOG.info("processing control job %s", str(job)) + except KeyError: + raise errors.UnknownJob("No handler for job \"%s\"." % + job.__class__.__name__) + return handler(**job.job_kwargs) + + @trace_function + def _enqueue_job(self, job): + """ + Insert incoming job into _job_queue. + """ + if isinstance(job, jobs.YumJobControl): + result = job.RESULT_SUCCESS + job.start() + try: + data = self._control_job(job) + except Exception: #pylint: disable=W0703 + result = job.RESULT_ERROR + data = sys.exc_info() + data = (data[0], data[1], traceback.format_tb(data[2])) + LOG.exception("control job %s failed", job) + job.finish(result, data) + LOG.debug("sending reply for %s: (%s, %s)", job, + job.ResultNames[job.result], job.result_data) + self._queue_out.put(job) + else: + if job is None: + LOG.debug('received terminating command') + self._terminate = True + LOG.debug('job %s enqued for YumWorker to handle', job) + heapq.heappush(self._job_queue, job) + if getattr(job, 'async', False) is True: + ind = self._prepare_indication_for(job, JM.IND_JOB_CREATED) + self._async_jobs[job.jobid] = job + ind.send() + self._job_enqueued.notify() + + @trace_function + def _schedule_event(self, after, jobid, action): + """ + Enqueue event into calendar. Event consists of time, jobid and + action. + """ + schedule_at = time.time() + after + for (sched, jid, act) in self._calendar: + if jid == jobid and act == action: + if sched <= schedule_at: # same event already scheduled + return + # schedule it for early time + LOG.debug('rescheduling action %s on job %d to take place' + ' after %d seconds (instead of %d)', + self.ACTION_NAMES[action], jid, after, + sched - schedule_at + after) + self._calendar.remove((sched, jid, act)) + self._calendar.append((schedule_at, jid, act)) + heapq.heapify(self._calendar) + return + LOG.debug('scheduling action %s on job %d to take place after ' + ' %d seconds', self.ACTION_NAMES[action], jobid, after) + heapq.heappush(self._calendar, (schedule_at, jobid, action)) + + @trace_function + def _run_event(self, jobid, action): + """ + Process event from calendar. + """ + if action == self.ACTION_REMOVE: + with self._job_lock: + del self._async_jobs[jobid] + else: + msg = "unsupported action: %s" % action + raise ValueError(msg) + + @trace_function + def _prepare_indication_for(self, job, *args, **kwargs): + """ + Return instance of ``JobIndicationSender``. + """ + return JobIndicationSender(self._indication_manager, job, + *args, **kwargs) + + # ************************************************************************* + # Job handlers + # ************************************************************************* + @job_handler() + def _handle_get(self, job): #pylint: disable=R0201 + """@return job object""" + return job + + @job_handler(False) + def _handle_get_list(self): + """@return list of all asynchronous jobs""" + with self._job_lock: + return sorted(self._async_jobs.values()) + + @job_handler(False) + def _handle_get_by_name(self, target): + """@return job object filtered by name""" + for job in self._async_jobs.values(): + if 'name' in job.metadata and target == job.metadata['name']: + return job + raise errors.JobNotFound(target) + + @job_handler() + def _handle_set_priority(self, job, new_priority): + """ + Modify job's priority and updates its position in queue. + @return modified job object + """ + if not isinstance(new_priority, (int, long)): + raise TypeError('priority must be an integer') + if job.priority != new_priority: + ind = self._prepare_indication_for(job) + job.update(priority=new_priority) + if job in self._job_queue: + heapq.heapify(self._job_queue) + ind.send(True) + return job + + @job_handler() + def _handle_reschedule(self, job, + delete_on_completion, + time_before_removal): + """ + Changes job's schedule for its deletion. + """ + if ( job.delete_on_completion == delete_on_completion + and job.time_before_removal == time_before_removal): + return + if job.finished and job.delete_on_completion: + for i, event in enumerate(self._calendar): + if event[1] == job.jobid and event[2] == self.ACTION_REMOVE: + del self._calendar[i] + heapq.heapify(self._calendar) + break + ind = self._prepare_indication_for(job) + if delete_on_completion: + schedule_at = time_before_removal + if job.finished: + schedule_at = job.finished + schedule_at - time.time() + self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE) + job.delete_on_completion = delete_on_completion + job.time_before_removal = time_before_removal + ind.send(True) + return job + + @job_handler() + def _handle_update(self, job, data): #pylint: disable=R0201 + """ + Updates any job metadata. + """ + ind = self._prepare_indication_for(job) + job.update(**data) + ind.send(True) + return job + + @job_handler() + def _handle_delete(self, job): + """ + Deletes finished asynchronous job. + """ + if not job.finished: + raise errors.InvalidJobState( + 'can not delete unfinished job "%s"' % job) + try: + self._job_queue.remove(job) + heapq.heapify(self._job_queue) + LOG.debug('job "%s" removed from queue', job) + except ValueError: + LOG.debug('job "%s" not started and not enqueued', job) + del self._async_jobs[job.jobid] + return job + + @job_handler() + def _handle_terminate(self, job): + """ + Terminates not started job. + """ + if job.started and not job.finished: + raise errors.InvalidJobState('can not kill running job "%s"' % job) + if job.finished: + raise errors.InvalidJobState('job "%s" already finished' % job) + self._job_queue.remove(job) + heapq.heapify(self._job_queue) + ind = self._prepare_indication_for(job) + job.finish(result=job.RESULT_TERMINATED) + ind.send(True) + LOG.info('terminated not started job "%s"', job) + return job + + # ************************************************************************* + # Public properties + # ************************************************************************* + @property + def queue_in(self): + """Incoming queue for YumJob instances.""" + return self._queue_in + + @property + def queue_out(self): + """Output queue for results.""" + return self._queue_out + + # ************************************************************************* + # Public methods + # ************************************************************************* + @trace_function + def finish_job(self, job, result, result_data): + """ + This should be called for any job by YumWorker after the job is + processed. + + If the job is synchronous, reply is send at once. Otherwise the result + is stored for later client's query in the job itself. + """ + with self._job_lock: + if job.state != job.RUNNING: + raise errors.InvalidJobState( + 'can not finish not started job "%s"' % job) + if getattr(job, 'async', False): + ind = self._prepare_indication_for(job, + (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED)) + job.finish(result, result_data) + if getattr(job, 'async', False): + if job.delete_on_completion: + schedule_at = max( job.time_before_removal + , MINIMUM_TIME_BEFORE_REMOVAL) + self._schedule_event(schedule_at, job.jobid, + self.ACTION_REMOVE) + if result == job.RESULT_SUCCESS: + ind.add_indication_ids(JM.IND_JOB_SUCCEEDED) + elif result == job.RESULT_ERROR: + ind.add_indication_ids(JM.IND_JOB_FAILED) + ind.send(True) + else: + LOG.debug("sending reply for %s: (%s, %s)", job, + job.ResultNames[job.result], job.result_data) + self._queue_out.put(job) + return job + + @trace_function + def get_job(self, block=True, timeout=None): + """ + Method supposed to be used only by YumWorker. It pops the first job + from _job_queue, starts it and returns it. + """ + start = time.time() + with self._job_lock: + if len(self._job_queue) == 0 and not block: + raise Queue.Empty + while len(self._job_queue) == 0: + if timeout: + LOG.debug('waiting for job for %s seconds' % timeout) + self._job_enqueued.wait(timeout) + if len(self._job_queue) == 0: + now = time.time() + if timeout > now - start: + raise Queue.Empty + job = heapq.heappop(self._job_queue) + if job is not None: + if getattr(job, "async", False): + ind = self._prepare_indication_for(job, + (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED)) + job.start() + ind.send(True) + else: + job.start() + return job + + def run(self): + """The entry point of thread.""" + global LOG #pylint: disable=W0603 + LOG = logging.getLogger(__name__) + LOG.info("%s thread started", self.name) + + while self._terminate is False: + try: + timeout = None + with self._job_lock: + if len(self._calendar) > 0: + timeout = self._calendar[0][0] - time.time() + LOG.debug('waiting on input queue for job%s', + (' with timeout %s' % timeout) if timeout else '') + job = self._queue_in.get(timeout=timeout) + with self._job_lock: + self._enqueue_job(job) + while not self._queue_in.empty(): + # this won't throw + self._enqueue_job(self._queue_in.get_nowait()) + + except Queue.Empty: + with self._job_lock: + while ( len(self._calendar) + and self._calendar[0][0] < time.time()): + _, jobid, action = heapq.heappop(self._calendar) + LOG.info('running action %s on job(id=%d)', + self.ACTION_NAMES[action], jobid) + self._run_event(jobid, action) + LOG.info('%s thread terminating', self.name) + diff --git a/src/software/lmi/software/yumdb/jobs.py b/src/software/lmi/software/yumdb/jobs.py new file mode 100644 index 0000000..346ff17 --- /dev/null +++ b/src/software/lmi/software/yumdb/jobs.py @@ -0,0 +1,668 @@ +# -*- encoding: utf-8 -*- +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Define job classes representing kinds of jobs of worker process. +""" + +import os +import threading +import time +import yum + +from lmi.software import util +from lmi.software.yumdb import errors +from lmi.software.yumdb.packageinfo import PackageInfo +from lmi.software.yumdb.repository import Repository + +DEFAULT_JOB_PRIORITY = 10 +# in seconds +DEFAULT_TIME_BEFORE_REMOVAL = 60 * 5 + +class YumJob(object): #pylint: disable=R0903 + """ + Base class for any job, that is processable by YumWorker process. + It contains jobid attribute, that must be unique for + each job, it's counted from zero a incremented after each creation. + + metadata attribute typically contain: + name - name of job, that is modifiable by user + method - identificator of method, that lead to creation of job + """ + __slots__ = ( 'jobid', 'created', 'started', 'finished', 'last_change' + , 'priority', 'result', 'result_data') + + # jobs can be created concurrently from multiple threads, that's + # why we need to make its creation thread safe + _JOB_ID_LOCK = threading.Lock() + _JOB_ID = 0 + + # job state enumeration + NEW, RUNNING, COMPLETED, TERMINATED, EXCEPTION = range(5) + # job result enumeration + RESULT_SUCCESS, RESULT_TERMINATED, RESULT_ERROR = range(3) + + ResultNames = ("success", "terminated", "error") + + @staticmethod + def _get_job_id(): + """ + Generates new job ids. It should be called only from constructor + of YumJob. Ensures, that each job has a unique number. + @return number of jobs created since program start -1 + """ + with YumJob._JOB_ID_LOCK: + val = YumJob._JOB_ID + YumJob._JOB_ID += 1 + return val + + @classmethod + def handle_ignore_job_props(cls): + """ + @return set of job properties, that does not count as job's handler + arguments - job handler does not care fore metadata, jobid, priority, + etc... + """ + return set(YumJob.__slots__) + + def __init__(self, priority=10): + if not isinstance(priority, (int, long)): + raise TypeError("priority must be integer") + self.jobid = self._get_job_id() + self.started = None + self.finished = None + self.priority = priority + self.created = time.time() + self.last_change = self.created + self.result = None + self.result_data = None + + @property + def state(self): + """ + @return integer representing job's state + """ + if not self.started: + return self.NEW + if not self.finished: + return self.RUNNING + if self.result == self.RESULT_ERROR: + return self.EXCEPTION + if self.result == self.RESULT_TERMINATED: + return self.TERMINATED + return self.COMPLETED + + @property + def job_kwargs(self): + """ + Jobs are in worker handled in handlers specific for each subclass. + These handlers are methods of worker. They accepts concrete arguments + that can be obtained from job by invoking this property. + @return dictionary of keyword arguments of job + """ + kwargs = {} + cls = self.__class__ + while not cls in (YumJob, object): + for slot in cls.__slots__: + if ( not slot in kwargs + and not slot in cls.handle_ignore_job_props()): + kwargs[slot] = getattr(self, slot) + cls = cls.__bases__[0] + for prop in YumJob.__slots__: + kwargs.pop(prop, None) + return kwargs + + def start(self): + """Modify the state of job to RUNNING.""" + if self.started: + raise errors.InvalidJobState("can not start already started job") + self.started = time.time() + self.last_change = self.started + + def finish(self, result, data=None): + """ + Modify the state of job to one of {COMPLETED, EXCEPTION, TERMINATED}. + Depending on result parameter. + """ + if not self.started and result != self.RESULT_TERMINATED: + raise errors.InvalidJobState("can not finish not started job") + self.finished = time.time() + if result == self.RESULT_TERMINATED: + self.started = self.finished + self.result = result + self.result_data = data + self.last_change = self.finished + + def update(self, **kwargs): + """Change job's properties.""" + change = False + for key, value in kwargs.items(): + if getattr(self, key) != value: + setattr(self, key, value) + change = True + if change is True: + self.last_change = time.time() + + def __eq__(self, other): + return self.__class__ is other.__class__ and self.jobid == other.jobid + + def __ne__(self, other): + return ( self.__class__ is not other.__class__ + or self.jobid != other.jobid) + + def __lt__(self, other): + """ + JobControl jobs have the highest priority. + """ + return ( ( isinstance(self, YumJobControl) + and not isinstance(other, YumJobControl)) + or ( self.priority < other.priority + or ( self.priority == other.priority + and ( self.jobid < other.jobid + or ( self.jobid == other.jobid + and (self.created < other.created)))))) + + def __cmp__(self, other): + if ( isinstance(self, YumJobControl) + and not isinstance(other, YumJobControl)): + return -1 + if ( not isinstance(self, YumJobControl) + and isinstance(other, YumJobControl)): + return 1 + if self.priority < other.priority: + return -1 + if self.priority > other.priority: + return 1 + if self.jobid < other.jobid: + return -1 + if self.jobid > other.jobid: + return 1 + if self.created < other.created: + return -1 + if self.created > other.created: + return 1 + return 0 + + def __str__(self): + return "%s(id=%d,p=%d)" % ( + self.__class__.__name__, self.jobid, self.priority) + + def __getstate__(self): + ret = self.job_kwargs + for prop in self.handle_ignore_job_props(): + ret[prop] = getattr(self, prop) + return ret + + def __setstate__(self, state): + for k, value in state.items(): + setattr(self, k, value) + +class YumAsyncJob(YumJob): #pylint: disable=R0903 + """ + Base class for jobs, that support asynchronnous execution. + No reply is sent upon job completition or error. The results are + kept on server. + """ + __slots__ = ( 'async' + , 'delete_on_completion' + , 'time_before_removal' + , 'metadata') + + @classmethod + def handle_ignore_job_props(cls): + return YumJob.handle_ignore_job_props().union(YumAsyncJob.__slots__) + + def __init__(self, priority=10, async=False, metadata=None): + YumJob.__init__(self, priority) + self.async = bool(async) + self.delete_on_completion = True + self.time_before_removal = DEFAULT_TIME_BEFORE_REMOVAL + if metadata is None and self.async is True: + metadata = {} + self.metadata = metadata + + def __str__(self): + return "%s(id=%d,p=%d%s%s)" % ( + self.__class__.__name__, self.jobid, + self.priority, + ',async' if self.async else '', + (',name="%s"'%self.metadata['name']) + if self.metadata and 'name' in self.metadata else '') + + def update(self, **kwargs): + if 'metadata' in kwargs: + self.metadata.update(kwargs.pop('metadata')) + return YumJob.update(self, **kwargs) + +# ***************************************************************************** +# Job control funtions +# ***************************************************************************** +class YumJobControl(YumJob): #pylint: disable=R0903 + """Base class for any job used for asynchronous jobs management.""" + pass + +class YumJobGetList(YumJobControl): #pylint: disable=R0903 + """Request for obtaining list of all asynchronous jobs.""" + pass + +class YumJobOnJob(YumJobControl): + """ + Base class for any control job acting upon particular asynchronous job. + """ + __slots__ = ('target', ) + def __init__(self, target): + YumJobControl.__init__(self) + if not isinstance(target, (int, long)): + raise TypeError("target must be an integer") + self.target = target + +class YumJobGet(YumJobOnJob): #pylint: disable=R0903 + """Get job object by its id.""" + pass + +class YumJobGetByName(YumJobOnJob): #pylint: disable=R0903 + """Get job object by its name property.""" + def __init__(self, name): + YumJobOnJob.__init__(self, -1) + self.target = name + +class YumJobSetPriority(YumJobOnJob): #pylint: disable=R0903 + """Change priority of job.""" + __slots__ = ('new_priority', ) + + def __init__(self, target, priority): + YumJobOnJob.__init__(self, target) + self.new_priority = priority + +class YumJobUpdate(YumJobOnJob): #pylint: disable=R0903 + """ + .. _YumJobUpdate: + + Update job's metadata. There are some forbidden properties, that + can not be changed in this way. Those are all affecting job's priority + and its scheduling for deletion. Plus any that store job's state. + All forbidden properties are listed in ``FORBIDDEN_PROPERTIES``. + """ + __slots__ = ('data', ) + FORBIDDEN_PROPERTIES = ( + 'async', 'jobid', 'created', 'started', 'priority', 'finished', + 'delete_on_completion', 'time_before_removal', 'last_change') + + def __init__(self, target, **kwargs): + YumJobOnJob.__init__(self, target) + assert not set.intersection( + set(YumJobUpdate.FORBIDDEN_PROPERTIES), set(kwargs)) + self.data = kwargs + +class YumJobReschedule(YumJobOnJob): #pylint: disable=R0903 + """Change the schedule of job's deletion.""" + __slots__ = ('delete_on_completion', 'time_before_removal') + def __init__(self, target, delete_on_completion, time_before_removal): + YumJobOnJob.__init__(self, target) + if not isinstance(time_before_removal, (int, long, float)): + raise TypeError("time_before_removal must be float") + self.delete_on_completion = bool(delete_on_completion) + self.time_before_removal = time_before_removal + +class YumJobDelete(YumJobOnJob): #pylint: disable=R0903 + """Delete job - can only be called on finished job.""" + pass + +class YumJobTerminate(YumJobOnJob): #pylint: disable=R0903 + """ + Can only be called on not yet started job. + Running job can not be terminated. + """ + pass + +# ***************************************************************************** +# Yum API functions +# ***************************************************************************** +class YumBeginSession(YumJob): #pylint: disable=R0903 + """ + Begin session on YumWorker which ensures that yum database is locked + during its lifetime. Sessions can be nested, but the number of + YumEndSession jobs must be processed to make the database unlocked. + """ + pass +class YumEndSession(YumJob): #pylint: disable=R0903 + """ + End the session started with YumBeginSession. If the last active session + is ended, database will be unlocked. + """ + pass + +class YumGetPackageList(YumJob): #pylint: disable=R0903 + """ + Job requesing a list of packages. + Arguments: + kind - supported values are in SUPPORTED_KINDS tuple + * installed lists all installed packages; more packages with + the same name can be installed varying in their architecture + * avail_notinst lists all available, not installed packages; + allow_duplicates must be True to include older packages (but still + available) + * avail_reinst lists all installed packages, that are available; + package can be installed, but not available anymore due to updates + of repository, where only the newest packages are kept + * available lists a union of avail_notinst and avail_reinst + * all lists union of installed and avail_notinst + + allow_duplicates - whether multiple packages can be present + in result for single (name, arch) of package differing + in their version + + sort - whether to sort packages by nevra + + include_repos - either a string passable to RepoStorage.enableRepo() + or a list of repository names, that will be temporared enabled before + listing packages; this is applied after disabling of repositories + + exclude_repos - either a string passable to RepoStorage.disableRepo() + or a list of repository names, that will be temporared disabled before + listing packages; this is applied before enabling of repositories + + Worker replies with [pkg1, pkg2, ...]. + """ + __slots__ = ('kind', 'allow_duplicates', 'sort', 'include_repos', + 'exclude_repos') + + SUPPORTED_KINDS = ( 'installed', 'available', 'avail_reinst' + , 'avail_notinst', 'all') + + def __init__(self, kind, allow_duplicates, sort=False, + include_repos=None, exclude_repos=None): + YumJob.__init__(self) + if not isinstance(kind, basestring): + raise TypeError("kind must be a string") + if not kind in self.SUPPORTED_KINDS: + raise ValueError("kind must be one of {%s}" % + ", ".join(self.SUPPORTED_KINDS)) + for arg in ('include_repos', 'exclude_repos'): + val = locals()[arg] + if ( not val is None + and not isinstance(arg, (tuple, list, basestring))): + raise TypeError("expected list or string for %s" % arg) + self.kind = kind + self.allow_duplicates = bool(allow_duplicates) + self.sort = bool(sort) + self.include_repos = include_repos + self.exclude_repos = exclude_repos + +class YumFilterPackages(YumGetPackageList): #pylint: disable=R0903 + """ + Job similar to YumGetPackageList, but allowing to specify + filter on packages. + Arguments (plus those in YumGetPackageList): + name, epoch, version, release, arch, nevra, envra, evra + + Some of those are redundant, but filtering is optimized for + speed, so supplying all of them won't affect performance. + + Worker replies with [pkg1, pkg2, ...]. + """ + __slots__ = ( + 'name', 'epoch', 'version', 'release', 'arch', + 'nevra', 'envra', 'evra', 'repoid') + + def __init__(self, kind, allow_duplicates, + sort=False, include_repos=None, exclude_repos=None, + name=None, epoch=None, version=None, + release=None, arch=None, + nevra=None, evra=None, + envra=None, + repoid=None): + if nevra is not None and not util.RE_NEVRA.match(nevra): + raise ValueError("Invalid nevra: %s" % nevra) + if evra is not None and not util.RE_EVRA.match(evra): + raise ValueError("Invalid evra: %s" % evra) + if envra is not None and not util.RE_ENVRA.match(evra): + raise ValueError("Invalid envra: %s" % envra) + YumGetPackageList.__init__(self, kind, allow_duplicates, sort, + include_repos=include_repos, exclude_repos=exclude_repos) + self.name = name + self.epoch = None if epoch is None else str(epoch) + self.version = version + self.release = release + self.arch = arch + self.nevra = nevra + self.evra = evra + self.envra = envra + self.repoid = repoid + +class YumSpecificPackageJob(YumAsyncJob): #pylint: disable=R0903 + """ + Abstract job taking instance of yumdb.PackageInfo as argument or + package's nevra. + Arguments: + pkg - plays different role depending on job subclass; + can also be a nevra + """ + __slots__ = ('pkg', ) + def __init__(self, pkg, async=False, metadata=None): + if isinstance(pkg, basestring): + if not util.RE_NEVRA_OPT_EPOCH.match(pkg): + raise errors.InvalidNevra('not a valid nevra "%s"' % pkg) + elif not isinstance(pkg, PackageInfo): + raise TypeError("pkg must be either string or instance" + " of PackageInfo") + YumAsyncJob.__init__(self, async=async, metadata=metadata) + self.pkg = pkg + +class YumInstallPackage(YumSpecificPackageJob): #pylint: disable=R0903 + """ + Job requesting installation of specific package. + pkg argument should be available. + Arguments: + pkg - same as in YumSpecificPackageJob + force is a boolean saying: + True -> reinstall the package if it's already installed + False -> fail if the package is already installed + + Worker replies with new instance of package. + """ + __slots__ = ('force', ) + def __init__(self, pkg, async=False, force=False, metadata=None): + YumSpecificPackageJob.__init__( + self, pkg, async=async, metadata=metadata) + self.force = bool(force) + +class YumRemovePackage(YumSpecificPackageJob): #pylint: disable=R0903 + """ + Job requesting removal of specific package. + pkg argument should be installed. + """ + pass + +class YumUpdateToPackage(YumSpecificPackageJob): #pylint: disable=R0903 + """ + Job requesting update to provided specific package. + Package is updated to epoch, version and release of this + provided available package. + + Worker replies with new instance of package. + """ + pass + +class YumUpdatePackage(YumSpecificPackageJob): #pylint: disable=R0903 + """ + Job requesting update of package, optionally reducing possible + candidate packages to ones with specific evr. + Arguments: + to_epoch, to_version, to_release + force is a boolean, that has meaning only when update_only is False: + True -> reinstall the package if it's already installed + False -> fail if the package is already installed + + The arguments more given, the more complete filter of candidates. + + Worker replies with new instance of package. + """ + __slots__ = ('to_epoch', 'to_version', 'to_release', 'force') + + def __init__(self, pkg, async=False, + to_epoch=None, to_version=None, to_release=None, force=False, + metadata=None): + if not isinstance(pkg, PackageInfo): + raise TypeError("pkg must be instance of yumdb.PackageInfo") + YumSpecificPackageJob.__init__( + self, pkg, async=async, metadata=metadata) + self.to_epoch = to_epoch + self.to_version = to_version + self.to_release = to_release + self.force = bool(force) + +class YumCheckPackage(YumSpecificPackageJob): #pylint: disable=R0903 + """ + Request verification information for instaled package and its files. + + Arguments: + pkg - either instance of PackageInfo or nevra string. + In latter case it will be replaced for YumWorker with instance + of PackageInfo. + + Worker replies with ``(pkg_info, pkg_check)``. + where: + ``pkg_info`` - is instance of PackageInfo + ``pkg_check`` - new instance of yumdb.PackageCheck + """ + def __init__(self, pkg, async=False, metadata=None): + YumSpecificPackageJob.__init__(self, pkg, async=async, + metadata=metadata) + if isinstance(pkg, PackageInfo) and not pkg.installed: + raise ValueError("package must be installed to check it") + +class YumCheckPackageFile(YumCheckPackage): #pylint: disable=R0903 + """ + Request verification information for particular file of installed + package. + + Worker replies with ``(pkg_info, pkg_check)``. + where: + ``pkg_info`` - is instance of PackageInfo + ``pkg_check`` - new instance of yumdb.PackageCheck containing only + requested file. + """ + __slots__ = ('file_name', ) + def __init__(self, pkg, file_name, *args, **kwargs): + YumCheckPackage.__init__(self, pkg, *args, **kwargs) + if not isinstance(file_name, basestring): + raise TypeError("file_name must be string") + self.file_name = file_name + +class YumInstallPackageFromURI(YumAsyncJob): #pylint: disable=R0903 + """ + Job requesting installation of specific package from URI. + Arguments: + uri is either a path to rpm package on local filesystem or url + of rpm stored on remote host + update_only is a boolean: + True -> install the package only if the older version is installed + False -> install the package if it's not already installed + force is a boolean, that has meaning only when update_only is False: + True -> reinstall the package if it's already installed + False -> fail if the package is already installed + + Worker replies with new instance of package. + """ + __slots__ = ('uri', 'update_only', "force") + def __init__(self, uri, async=False, update_only=False, force=False, + metadata=None): + if not isinstance(uri, basestring): + raise TypeError("uri must be a string") + if uri.startswith('file://'): + uri = uri[len('file://'):] + if not yum.misc.re_remote_url(uri) and not os.path.exists(uri): + raise errors.InvalidURI(uri) + YumAsyncJob.__init__(self, async=async, metadata=metadata) + self.uri = uri + self.update_only = bool(update_only) + self.force = bool(force) + +class YumGetRepositoryList(YumJob): #pylint: disable=R0903 + """ + Job requesing a list of repositories. + Arguments: + kind - supported values are in SUPPORTED_KINDS tuple + + Worker replies with [repo1, repo2, ...]. + """ + __slots__ = ('kind', ) + + SUPPORTED_KINDS = ('all', 'enabled', 'disabled') + + def __init__(self, kind): + YumJob.__init__(self) + if not isinstance(kind, basestring): + raise TypeError("kind must be a string") + if not kind in self.SUPPORTED_KINDS: + raise ValueError("kind must be one of {%s}" % + ", ".join(self.SUPPORTED_KINDS)) + self.kind = kind + +class YumFilterRepositories(YumGetRepositoryList): #pylint: disable=R0903 + """ + Job similar to YumGetRepositoryList, but allowing to specify + filter on packages. + Arguments (plus those in YumGetRepositoryList): + name, gpg_check, repo_gpg_check + + Some of those are redundant, but filtering is optimized for + speed, so supplying all of them won't affect performance. + + Worker replies with [repo1, repo2, ...]. + """ + __slots__ = ('repoid', 'gpg_check', 'repo_gpg_check') + + def __init__(self, kind, + repoid=None, gpg_check=None, repo_gpg_check=None): + YumGetRepositoryList.__init__(self, kind) + self.repoid = repoid + self.gpg_check = None if gpg_check is None else bool(gpg_check) + self.repo_gpg_check = ( + None if repo_gpg_check is None else bool(repo_gpg_check)) + +class YumSpecificRepositoryJob(YumJob): #pylint: disable=R0903 + """ + Abstract job taking instance of yumdb.Repository as argument. + Arguments: + repo - (``Repository`` or ``str``) plays different role depending + on job subclass + """ + __slots__ = ('repo', ) + def __init__(self, repo): + if not isinstance(repo, (Repository, basestring)): + raise TypeError("repoid must be either instance of" + " yumdb.Repository or string") + YumJob.__init__(self) + self.repo = repo + +class YumSetRepositoryEnabled(YumSpecificRepositoryJob):#pylint: disable=R0903 + """ + Job allowing to enable or disable repository. + Arguments: + enable - (``boolean``) representing next state + """ + __slots__ = ('enable', ) + def __init__(self, repo, enable): + YumSpecificRepositoryJob.__init__(self, repo) + self.enable = bool(enable) + diff --git a/src/software/lmi/software/yumdb/packagecheck.py b/src/software/lmi/software/yumdb/packagecheck.py new file mode 100644 index 0000000..e66078c --- /dev/null +++ b/src/software/lmi/software/yumdb/packagecheck.py @@ -0,0 +1,235 @@ +# -*- encoding: utf-8 -*- +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Module with definition of RPM package check class. +""" + +from collections import OrderedDict +from datetime import datetime +import grp +import logging +import os +import pwd +import rpm +import yum + +from lmi.software.yumdb import errors + +CHECKSUMTYPE_STR2NUM = dict((val.lower(), k) for (k, val) in + yum.constants.RPM_CHECKSUM_TYPES.items()) + +( FILE_TYPE_UNKNOWN +, FILE_TYPE_FILE +, FILE_TYPE_DIRECTORY +, FILE_TYPE_SYMLINK +, FILE_TYPE_FIFO +, FILE_TYPE_CHARACTER_DEVICE +, FILE_TYPE_BLOCK_DEVICE +) = range(7) + +FILE_TYPE_NAMES = ( 'unknown', 'file', 'directory', 'symlink', 'fifo' + , 'character device', 'block device') + +class PackageFile(object): + """ + Metadata related to particular file on filesystem belonging to RPM package. + Data contained here are from RPM database. + + Attributes: + ``path`` - (``str``) Absolute path of file. + ``file_type`` - (``int``) One of ``FILE_TYPE_*`` identifiers above. + ``uid`` - (``int``) User ID. + ``gid`` - (``int``) Group ID. + ``mode`` - (``int``) Raw file mode. + ``device`` - (``int``) Device number. + ``mtime`` - (``int``) Last modification time in seconds. + ``size`` - (``long``) File size as a number of bytes. + ``link_target`` - (``str``) Link target of symlink. None if ``file_type`` + is not symlink. + ``checksum`` - (``str``) Checksum as string in hexadecimal format. + None if file is not a regular file. + """ + __slots__ = ("path", "file_type", "uid", "gid", "mode", "device", "mtime", + "size", "link_target", "checksum") + + def __init__(self, path, file_type, uid, gid, mode, device, mtime, size, + link_target, checksum): + if not isinstance(file_type, basestring): + raise TypeError("file_type must be a string") + for arg in ('uid', 'gid', 'mode', 'mtime', 'size'): + if not isinstance(locals()[arg], (int, long)): + raise TypeError("%s must be integer" % arg) + if not os.path.isabs(path): + raise ValueError("path must be an absolute path") + self.path = path + try: + self.file_type = FILE_TYPE_NAMES.index(file_type.lower()) + except ValueError: + logging.getLogger(__name__).error('unrecognized file type "%s" for' + ' file "%s"', file_type, path) + self.file_type = FILE_TYPE_NAMES[FILE_TYPE_UNKNOWN] + self.uid = uid + self.gid = gid + self.mode = mode + self.device = device + self.mtime = mtime + self.size = size + self.link_target = (link_target + if self.file_type == FILE_TYPE_SYMLINK else None) + self.checksum = checksum if self.file_type == FILE_TYPE_FILE else None + + @property + def last_modification_datetime(self): + """ + @return instance datetime for last modification time of file + """ + return datetime.fromtimestamp(self.mtime) + + def __getstate__(self): + """ + Used for serialization with pickle. + @return container content that will be serialized + """ + return dict((k, getattr(self, k)) for k in self.__slots__) + + def __setstate__(self, state): + """ + Used for deserialization with pickle. + Restores the object from serialized form. + @param state is an object created by __setstate__() method + """ + for k, value in state.items(): + setattr(self, k, value) + +class PackageCheck(object): + """ + Metadata for package concerning verification. + It contains metadata for each file installed in "files" attribute. + """ + __slots__ = ("objid", "file_checksum_type", "files") + + def __init__(self, objid, file_checksum_type, files=None): + """ + @param objid is an in of original yum package object, which is used + by server for subsequent operations on this package requested by client + """ + if files is not None and not isinstance( + files, (list, tuple, set, dict)): + raise TypeError("files must be an iterable container") + self.objid = objid + self.file_checksum_type = file_checksum_type + if not isinstance(files, dict): + self.files = OrderedDict() + if files is not None: + for file_check in sorted(files, key=lambda f: f.path): + self.files[file_check.path] = file_check + else: + for path in sorted(files): + self.files[path] = files[path] + + def __iter__(self): + return iter(self.files) + + def __len__(self): + return len(self.files) + + def __getitem__(self, filepath): + return self.files[filepath] + + def __setitem__(self, filepath, package_file): + if not isinstance(package_file, PackageFile): + raise TypeError("package_file must be a PackageFile instance") + self.files[filepath] = package_file + + def __contains__(self, fileobj): + if isinstance(fileobj, basestring): + return fileobj in self.files + elif isinstance(fileobj, PackageFile): + return fileobj.path in self.files + else: + raise TypeError("expected file path for argument") + + def __getstate__(self): + """ + Used for serialization with pickle. + @return container content that will be serialized + """ + return dict((k, getattr(self, k)) for k in self.__slots__) + + def __setstate__(self, state): + """ + Used for deserialization with pickle. + Restores the object from serialized form. + @param state is an object created by __setstate__() method + """ + for k, value in state.items(): + setattr(self, k, value) + +def pkg_checksum_type(pkg): + """ + @return integer representation of checksum type + """ + if not isinstance(pkg, yum.packages.YumAvailablePackage): + raise TypeError("pkg must be an instance of YumAvailablePackage") + if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO] + return CHECKSUMTYPE_STR2NUM[pkg.yumdb_info.checksum_type.lower()] + +def make_package_check_from_db(vpkg, file_name=None): + """ + Create instance of PackageCheck from instance of + yum.packages._RPMVerifyPackage. + + :param file_name: (``str``) If not None, causes result to have just + one instance of ``PackageFile`` matching this file_name. + If it's not found in the package, ``FileNotFound`` will be raised. + :rtype (``PackageCheck``) + """ + if not isinstance(vpkg, yum.packages._RPMVerifyPackage): + raise TypeError("vpkg must be instance of" + " yum.packages._RPMVerifyPackage") + pkg = vpkg.po + + res = PackageCheck(id(pkg), pkg_checksum_type(pkg)) + files = res.files + for vpf in vpkg: + if file_name is not None and file_name != vpf.filename: + continue + files[vpf.filename] = PackageFile( + vpf.filename, + vpf.ftype, + pwd.getpwnam(vpf.user).pw_uid, + grp.getgrnam(vpf.group).gr_gid, + vpf.mode, + vpf.dev, + vpf.mtime, + vpf.size, + vpf.readlink, + vpf.digest[1] + ) + if file_name is not None: + break + if file_name is not None and len(files) < 1: + raise errors.FileNotFound('File "%s" not found in package "%s".' % ( + file_name, pkg.nevra)) + return res + diff --git a/src/software/lmi/software/yumdb/packageinfo.py b/src/software/lmi/software/yumdb/packageinfo.py new file mode 100644 index 0000000..7a40d19 --- /dev/null +++ b/src/software/lmi/software/yumdb/packageinfo.py @@ -0,0 +1,181 @@ +# -*- encoding: utf-8 -*- +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Module holding an abstraction for RPM package. +""" + +from datetime import datetime +import yum + +from lmi.software import util + +class PackageInfo(object): + """ + Container for package metadata. It represents rpm package in yum + database. It's supposed to be passed from YumWorker to YumDB client + and vice-versa. Instances of YumAvailablePackage can not be exchanged + -- results in segfaults. + + To speed up looking up of original yum package object on server, an + atribute "objid" is provided. + """ + __slots__ = ( + "objid", + "name", "epoch", "version", "release", "architecture", + 'summary', 'description', 'license', 'group', 'vendor', + "repoid", 'size', + 'installed', # boolean + 'install_time' # datetime instance + ) + + def __init__(self, objid, name, epoch, version, release, arch, **kwargs): + """ + @param objid is an in of original yum package object, which is used + by server for subsequent operations on this package requested by client + """ + self.objid = objid + self.name = name + self.epoch = epoch + self.version = version + self.release = release + self.architecture = arch + self.summary = kwargs.pop('summary', None) + self.description = kwargs.pop('description', None) + self.license = kwargs.pop('license', None) + self.group = kwargs.pop('group', None) + self.vendor = kwargs.pop('vendor', None) + self.repoid = kwargs.pop("repoid", None) + self.size = kwargs.pop('size', None) + if self.size is not None and not isinstance(self.size, (int, long)): + raise TypeError('size must be an integer') + self.installed = kwargs.pop('installed', None) + if self.installed is not None: + self.installed = bool(self.installed) + self.install_time = kwargs.pop('install_time', None) + if ( self.install_time is not None + and not isinstance(self.install_time, datetime)): + raise TypeError('install_time must be a datetime') + + # ************************************************************************* + # Properties + # ************************************************************************* + @property + def ver(self): + """Shortcut for version property.""" + return self.version + + @property + def rel(self): + """Shortcut for release property.""" + return self.release + + @property + def arch(self): + """Shortcut for architecture property.""" + return self.architecture + + @property + def nevra(self): + """@return nevra of package with epoch always present.""" + return self.get_nevra(with_epoch="ALWAYS") + + @property + def evra(self): + """@return evra of package.""" + return "%s:%s-%s.%s" % ( + self.epoch if self.epoch and self.epoch != "(none)" else "0", + self.version, + self.release, + self.architecture) + + @property + def key_props(self): + """ + @return package properties as dictionary, + that uniquelly identify package in database + """ + return dict((k, getattr(self, k)) for k in ( + 'name', 'epoch', 'version', 'release', 'arch', 'repoid')) + + # ************************************************************************* + # Public methods + # ************************************************************************* + def get_nevra(self, with_epoch='NOT_ZERO'): + """ + @param with_epoch may be one of: + "NOT_ZERO" - include epoch only if it's not zero + "ALWAYS" - include epoch always + "NEVER" - do not include epoch at all + @return nevra of package + """ + return util.make_nevra(self.name, self.epoch, self.version, + self.release, self.arch, with_epoch) + + # ************************************************************************* + # Special methods + # ************************************************************************* + def __str__(self): + return self.nevra + + def __getstate__(self): + """ + Used for serialization with pickle. + @return container content that will be serialized + """ + return dict((k, getattr(self, k)) for k in self.__slots__) + + def __setstate__(self, state): + """ + Used for deserialization with pickle. + Restores the object from serialized form. + @param state is an object created by __setstate__() method + """ + for k, value in state.items(): + setattr(self, k, value) + + def __eq__(self, other): + return ( self.name == other.name + and self.version == other.version + and self.release == other.release + and self.arch == other.arch + and self.epoch == other.epoch + and ( (self.repoid is None or other.repoid is None) + or (self.repoid == other.repoid))) + +def make_package_from_db(pkg): + """ + Create instance of PackageInfo from instance of + yum.packages.YumAvailablePackage. + @return instance of PackageInfo + """ + metadata = dict((k, getattr(pkg, k)) for k in ( + 'summary', 'description', 'license', 'group', 'vendor', 'size', + 'repoid')) + if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + metadata['installed'] = True + metadata['install_time'] = datetime.fromtimestamp(pkg.installtime) + else: + metadata['installed'] = False + res = PackageInfo(id(pkg), pkg.name, pkg.epoch, pkg.version, pkg.release, + pkg.arch, **metadata) + return res + diff --git a/src/software/lmi/software/yumdb/process.py b/src/software/lmi/software/yumdb/process.py new file mode 100644 index 0000000..85abe61 --- /dev/null +++ b/src/software/lmi/software/yumdb/process.py @@ -0,0 +1,914 @@ +# -*- encoding: utf-8 -*- +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Module holding the code of separate process accessing the YUM API. +""" + +import errno +from itertools import chain +import logging +from multiprocessing import Process +import os +import Queue as TQueue # T as threaded +import sys +import time +import traceback +from urlgrabber.grabber import default_grabber +import weakref +import yum + +from lmi.software import util +from lmi.software.yumdb import errors +from lmi.software.yumdb import jobs +from lmi.software.yumdb import packageinfo +from lmi.software.yumdb import packagecheck +from lmi.software.yumdb import repository +from lmi.software.yumdb.jobmanager import JobManager +from lmi.software.yumdb.util import trace_function, setup_logging + +# ***************************************************************************** +# Constants +# ***************************************************************************** +# interval in seconds +FREE_DATABASE_TIMEOUT = 60 +LOCK_WAIT_INTERVAL = 0.5 +RPMDB_PATH = '/var/lib/rpm/Packages' +LOG = None + +# ***************************************************************************** +# Utilities +# **************************************************************************** +def _get_package_filter_function(filters): + """ + @param filters is a dictionary, where keys are package property + names and values are their desired values. + @return a function used to filter list of packages + """ + if not isinstance(filters, dict): + raise TypeError("filters must be a dictionary") + + filters = dict((k, value) for k, value in filters.items() + if value is not None) + + match = None + if "nevra" in filters: + match = util.RE_NEVRA.match(filters["nevra"]) + elif "envra" in filters: + match = util.RE_ENVRA.match(filters["envra"]) + if match is not None: + for attr in ("name", "epoch", "version", "release", "arch"): + match_attr = attr + filters[attr] = match.group(match_attr) + filters.pop('nevra', None) + filters.pop('envra', None) + elif "evra" in filters: + for prop_name in ("epoch", "version", "release", "epoch"): + filters.pop(prop_name, None) + filter_list = [] + # properties are sorted by their filtering ability + # (the most unprobable property, that can match, comes first) + for prop_name in ("evra", "name", "version", "epoch", + "release", "repoid", "arch"): + if not prop_name in filters: + continue + filter_list.append((prop_name, filters.pop(prop_name))) + def _cmp_props(pkg): + """@return True if pkg matches properies filter""" + return all(getattr(pkg, p) == v for p, v in filter_list) + return _cmp_props + +class RepoFilterSetter(object): + """ + A context manager, that will set a repository filter lasting + as long as the object itself. + """ + def __init__(self, yum_base, include_repos=None, exclude_repos=None): + if not isinstance(yum_base, yum.YumBase): + raise TypeError("yum_base must be a YumBase instance") + self._yum_base = yum_base + self._include = include_repos + self._exclude = exclude_repos + # after __enter__ this will be dictionary containing ( + # repoid, enabled) pairs + self._prev_states = None + + def __enter__(self): + self._prev_states = { r.id: r.enabled + for r in self._yum_base.repos.repos.values()} + if isinstance(self._exclude, (list, tuple, set)): + exclude = ",".join(self._exclude) + else: + exclude = self._exclude + # set of repositories, that were affected + repos = set() + if exclude: + repos.update(self._yum_base.repos.disableRepo(exclude)) + LOG.info('disabling repositories: [%s]', ", ".join(repos)) + if isinstance(self._include, (list, tuple, set)): + include = ",".join(self._include) + else: + include = self._include + if include: + affected = self._yum_base.repos.enableRepo(include) + LOG.info('enabling repositories: [%s]', ", ".join(affected)) + repos.update(affected) + for repoid, prev_enabled in self._prev_states.items(): + if ( repoid not in repos + or ( bool(prev_enabled) + is bool(self._yum_base.repos.getRepo(repoid).enabled))): + # keep only manipulated repositories + del self._prev_states[repoid] + if len(self._prev_states): + for repoid in (r for r, v in self._prev_states.items() if v): + self._yum_base.pkgSack.sacks.pop(repoid, None) + self._yum_base.repos.populateSack() + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + # restore previous repository states + if len(self._prev_states): + LOG.info('restoring repositories: [%s]', + ", ".join(self._prev_states.keys())) + for repoid, enabled in self._prev_states.items(): + repo = self._yum_base.repos.getRepo(repoid) + if enabled: + repo.enable() + else: + repo.disable() + for repoid in (r for r, v in self._prev_states.items() if not v): + self._yum_base.pkgSack.sacks.pop(repoid, None) + self._yum_base.repos.populateSack() + +# ***************************************************************************** +# Decorators +# ***************************************************************************** +def _needs_database(method): + """ + Decorator for YumWorker job handlers, that need to access the yum database. + It ensures, that database is initialized and locks it in case, that + no session is active. + """ + logged = trace_function(method) + def _wrapper(self, *args, **kwargs): + """ + Wrapper for the job handler method. + """ + created_session = False + self._init_database() #pylint: disable=W0212 + if self._session_level == 0: #pylint: disable=W0212 + self._session_level = 1 #pylint: disable=W0212 + created_session = True + self._lock_database() #pylint: disable=W0212 + try: + LOG.debug("calling job handler %s with args=(%s)", + method.__name__, + ", ".join(chain( + (str(a) for a in args), + ("%s=%s"%(k, str(v)) for k, v in kwargs.items())))) + result = logged(self, *args, **kwargs) + LOG.debug("job handler %s finished", method.__name__) + return result + finally: + if created_session is True: #pylint: disable=W0212 + self._session_level = 0 #pylint: disable=W0212 + self._unlock_database() #pylint: disable=W0212 + return _wrapper + +# ***************************************************************************** +# Classes +# ***************************************************************************** +class YumWorker(Process): + """ + The main process, that works with YUM API. It has two queues, one + for input jobs and second for results. + + Jobs are dispatched by their class names to particular handler method. + + It spawns a second thread for managing asynchronous jobs and queue + of incoming jobs. It's an instance of JobManager. + """ + + def __init__(self, + queue_in, + queue_out, + indication_manager, + yum_kwargs=None, + logging_config=None): + Process.__init__(self, name="YumWorker") + self._jobmgr = JobManager(queue_in, queue_out, indication_manager) + self._session_level = 0 + self._session_ended = False + + if yum_kwargs is None: + yum_kwargs = {} + + self._yum_kwargs = yum_kwargs + self._yum_base = None + + self._pkg_cache = None + # contains (repoid, time_stamp_of_config_file) + # plus (/repos/dir, ...) for each repo config directory + self._repodir_mtimes = {} + self._logging_config = logging_config + + # ************************************************************************* + # Private methods + # ************************************************************************* + @trace_function + def _init_database(self): + """ + Initializes yum base object, when it does no exists. + And updates the cache (when out of date). + """ + if self._yum_base is None: + LOG.info("creating YumBase with kwargs=(%s)", + ", ".join(( "%s=%s"%(k, str(v)) + for k, v in self._yum_kwargs.items()))) + self._yum_base = yum.YumBase(**self._yum_kwargs) + + @trace_function + def _free_database(self): + """ + Release the yum base object to safe memory. + """ + LOG.info("freing database") + self._pkg_cache.clear() + self._yum_base = None + + @trace_function + def _lock_database(self): + """ + Only one process is allowed to work with package database at given time. + That's why we lock it. + + Try to lock it in loop, until success. + """ + while True: + try: + LOG.info("trying to lock database - session level %d", + self._session_level) + self._yum_base.doLock() + LOG.info("successfully locked up") + break + except yum.Errors.LockError as exc: + LOG.warn("failed to lock") + if exc.errno in (errno.EPERM, errno.EACCES, errno.ENOSPC): + LOG.error("can't create lock file") + raise errors.DatabaseLockError("Can't create lock file.") + LOG.info("trying to lock again after %.1f seconds", + LOCK_WAIT_INTERVAL) + time.sleep(LOCK_WAIT_INTERVAL) + + @trace_function + def _unlock_database(self): + """ + The opposite to _lock_database() method. + """ + if self._yum_base is not None: + LOG.info("unlocking database") + self._yum_base.closeRpmDB() + self._yum_base.doUnlock() + + @trace_function + def _get_job(self): + """ + Get job from JobManager thread. + If no job comes for long time, free database to save memory. + """ + while True: + if self._session_ended and self._session_level == 0: + try: + return self._jobmgr.get_job(timeout=FREE_DATABASE_TIMEOUT) + except TQueue.Empty: + self._free_database() + self._session_ended = False + else: + return self._jobmgr.get_job() + + @trace_function + def _transform_packages(self, packages, + cache_packages=True, + flush_cache=True): + """ + Return instances of PackageInfo for each package in packages. + Cache all the packages. + @param packages list of YumAvailablePackage instances + @param cache_packages whether to update cache with packages + @param flush_cache whether to clear the cache before adding input + packages; makes sense only with cachee_packages=True + """ + if cache_packages is True and flush_cache is True: + LOG.debug("flushing package cache") + self._pkg_cache.clear() + res = [] + for orig in packages: + pkg = packageinfo.make_package_from_db(orig) + if cache_packages is True: + self._pkg_cache[pkg.objid] = orig + res.append(pkg) + return res + + @trace_function + def _cache_packages(self, packages, flush_cache=True, transform=False): + """ + Store packages in cache and return them. + @param flush_cache whether to clear the cache before adding new + packages + @param transform whether to return packages as PackageInfos + @return either list of original packages or PackageInfo instances + """ + if transform is True: + return self._transform_packages(packages, flush_cache=flush_cache) + if flush_cache is True: + LOG.debug("flushing package cache") + self._pkg_cache.clear() + for pkg in packages: + self._pkg_cache[id(pkg)] = pkg + return packages + + @trace_function + def _lookup_package(self, pkg): + """ + Lookup the original package in cache. + If it was garbage collected already, make new query to find it. + @return instance of YumAvailablePackage + """ + if not isinstance(pkg, packageinfo.PackageInfo): + raise TypeError("pkg must be instance of PackageInfo") + LOG.debug("looking up yum package %s with id=%d", + pkg, pkg.objid) + try: + result = self._pkg_cache[pkg.objid] + LOG.debug("lookup successful") + except KeyError: + LOG.warn("lookup of package %s with id=%d failed, trying" + " to query database", pkg, pkg.objid) + result = self._handle_filter_packages( + 'installed' if pkg.installed else 'available', + allow_duplicates=False, + sort=False, + transform=False, + **pkg.key_props) + if len(result) < 1: + LOG.warn("package %s not found", pkg) + raise errors.PackageNotFound( + "package %s could not be found" % pkg) + result = result[0] + return result + + @trace_function + def _clear_repository_cache(self): + """ + Clears the repository cache and their configuration directory + last modification times. + """ + if self._yum_base is not None: + for repoid in self._yum_base.repos.repos.keys(): + self._yum_base.repos.delete(repoid) + del self._yum_base.repos + del self._yum_base.pkgSack + self._repodir_mtimes.clear() + + @trace_function + def _check_repository_configs(self): + """ + Checks whether repository information is up to date with configuration + files by comparing timestamps. If not, repository cache will be + released. + """ + dirty = False + if self._repodir_mtimes: + for repodir in self._yum_base.conf.reposdir: + if ( os.path.exists(repodir) + and ( not repodir in self._repodir_mtimes + or ( os.stat(repodir).st_mtime + > self._repodir_mtimes[repodir]))): + LOG.info("repository config dir %s changed", repodir) + dirty = True + break + if not dirty: + for repo in self._yum_base.repos.repos.values(): + filename = repo.repofile + if ( not os.path.exists(filename) + or ( int(os.stat(filename).st_mtime) + > repo.repo_config_age)): + LOG.info('config file of repository "%s" changed', + repo.id) + dirty = True + break + if dirty is True: + LOG.info("repository cache is dirty, cleaning up ...") + self._clear_repository_cache() + self._yum_base.getReposFromConfig() + if dirty is True or not self._repodir_mtimes: + self._update_repodir_mtimes() + + @trace_function + def _update_repodir_mtimes(self): + """ + Updates the last modification times of repo configuration directories. + """ + assert self._yum_base is not None + for repodir in self._yum_base.conf.reposdir: + if os.path.exists(repodir): + self._repodir_mtimes[repodir] = os.stat(repodir).st_mtime + + @trace_function + def _do_work(self, job): + """ + Dispatcher of incoming jobs. Job is passed to the right handler + depending on its class. + """ + if not isinstance(job, jobs.YumJob): + raise TypeError("job must be instance of YumJob") + try: + handler = { + jobs.YumGetPackageList : self._handle_get_package_list, + jobs.YumFilterPackages : self._handle_filter_packages, + jobs.YumInstallPackage : self._handle_install_package, + jobs.YumRemovePackage : self._handle_remove_package, + jobs.YumUpdateToPackage : self._handle_update_to_package, + jobs.YumUpdatePackage : self._handle_update_package, + jobs.YumBeginSession : self._handle_begin_session, + jobs.YumEndSession : self._handle_end_session, + jobs.YumCheckPackage : self._handle_check_package, + jobs.YumCheckPackageFile : self._handle_check_package_file, + jobs.YumInstallPackageFromURI : \ + self._handle_install_package_from_uri, + jobs.YumGetRepositoryList : \ + self._handle_get_repository_list, + jobs.YumFilterRepositories : self._handle_filter_repositories, + jobs.YumSetRepositoryEnabled : \ + self._handle_set_repository_enabled + }[job.__class__] + LOG.info("processing job %s(id=%d)", + job.__class__.__name__, job.jobid) + except KeyError: + LOG.error("No handler for job \"%s\"", job.__class__.__name__) + raise errors.UnknownJob("No handler for job \"%s\"." % + job.__class__.__name__) + return handler(**job.job_kwargs) + + @trace_function + def _run_transaction(self, name): + """ + Builds and runs the yum transaction and checks for errors. + @param name of transaction used only in error description on failure + """ + LOG.info("building transaction %s", name) + (code, msgs) = self._yum_base.buildTransaction() + if code == 1: + LOG.error("building transaction %s failed: %s", + name, "\n".join(msgs)) + raise errors.TransactionBuildFailed( + "Failed to build \"%s\" transaction: %s" % ( + name, "\n".join(msgs))) + LOG.info("processing transaction %s", name) + self._yum_base.processTransaction() + self._yum_base.closeRpmDB() + + @trace_function + def _main_loop(self): + """ + This is a main loop called from run(). Jobs are handled here. + It accepts a job from input queue, handles it, + sends the result to output queue and marks the job as done. + + It is terminated, when None is received from input queue. + """ + while True: + job = self._get_job() + if job is not None: # not a terminate command + result = jobs.YumJob.RESULT_SUCCESS + try: + data = self._do_work(job) + except Exception: #pylint: disable=W0703 + result = jobs.YumJob.RESULT_ERROR + # (type, value, traceback) + data = sys.exc_info() + # traceback is not pickable - replace it with formatted + # text + data = (data[0], data[1], traceback.format_tb(data[2])) + LOG.exception("job %s failed", job) + self._jobmgr.finish_job(job, result, data) + if job is None: + LOG.info("waiting for %s to finish", self._jobmgr.name) + self._jobmgr.join() + break + + @trace_function + def _handle_begin_session(self): + """ + Handler for session begin job. + """ + self._session_level += 1 + LOG.info("beginning session level %s", self._session_level) + if self._session_level == 1: + self._init_database() + self._lock_database() + + @trace_function + def _handle_end_session(self): + """ + Handler for session end job. + """ + LOG.info("ending session level %d", self._session_level) + self._session_level = max(self._session_level - 1, 0) + if self._session_level == 0: + self._unlock_database() + self._session_ended = True + + @_needs_database + def _handle_get_package_list(self, kind, allow_duplicates, sort, + include_repos=None, exclude_repos=None, transform=True): + """ + Handler for listing packages job. + @param transform says, whether to return just a package abstractions + or original ones + @return [pkg1, pkg2, ...] + """ + if kind == 'avail_notinst': + what = 'available' + elif kind == 'available': + what = 'all' + elif kind == 'avail_reinst': + what = 'all' + else: + what = kind + with RepoFilterSetter(self._yum_base, include_repos, exclude_repos): + LOG.debug("calling YumBase.doPackageLists(%s, showdups=%s)", + what, allow_duplicates) + pkglist = self._yum_base.doPackageLists(what, + showdups=allow_duplicates) + if kind == 'all': + result = pkglist.available + pkglist.installed + elif kind == 'available': + result = pkglist.available + pkglist.reinstall_available + elif kind == 'avail_reinst': + result = pkglist.reinstall_available + else: # get installed or available + result = getattr(pkglist, what) + if sort is True: + result.sort() + LOG.debug("returning %s packages", len(result)) + return self._cache_packages(result, transform=transform) + + @_needs_database + def _handle_filter_packages(self, kind, allow_duplicates, sort, + include_repos=None, exclude_repos=None, + transform=True, **filters): + """ + Handler for filtering packages job. + @return [pkg1, pkg2, ...] + """ + pkglist = self._handle_get_package_list(kind, allow_duplicates, False, + include_repos=include_repos, exclude_repos=exclude_repos, + transform=False) + matches = _get_package_filter_function(filters) + result = [p for p in pkglist if matches(p)] + if sort is True: + result.sort() + LOG.debug("%d packages matching", len(result)) + if transform is True: + # caching has been already done by _handle_get_package_list() + result = self._transform_packages(result, cache_packages=False) + return result + + @_needs_database + def _handle_install_package(self, pkg, force=False): + """ + Handler for package installation job. + @return installed package instance + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages( + 'available' if force else 'avail_notinst', + allow_duplicates=False, sort=True, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + elif len(pkgs) > 1: + LOG.warn('multiple packages matches nevra "%s": [%s]', + pkg, ", ".join(p.nevra for p in pkgs)) + pkg_desired = pkgs[-1] + else: + pkg_desired = self._lookup_package(pkg) + if isinstance(pkg_desired, yum.rpmsack.RPMInstalledPackage): + if force is False: + raise errors.PackageAlreadyInstalled(pkg) + action = "reinstall" + else: + action = "install" + getattr(self._yum_base, action)(pkg_desired) + self._run_transaction(action) + installed = self._handle_filter_packages("installed", False, False, + nevra=util.pkg2nevra(pkg_desired, with_epoch="ALWAYS")) + if len(installed) < 1: + raise errors.TransactionExecutionFailed( + "Failed to install desired package %s." % pkg) + return installed[0] + + @_needs_database + def _handle_remove_package(self, pkg): + """ + Handler for package removal job. + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('installed', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg = pkgs[-1] + else: + pkg = self._lookup_package(pkg) + self._yum_base.remove(pkg) + self._run_transaction("remove") + + @_needs_database + def _handle_update_to_package(self, pkg): + """ + Handler for specific package update job. + @return package corresponding to pkg after update + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('available', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg_desired = pkgs[-1] + else: + pkg_desired = self._lookup_package(pkg) + self._yum_base.update(update_to=True, + name=pkg_desired.name, + epoch=pkg_desired.epoch, + version=pkg_desired.version, + release=pkg_desired.release, + arch=pkg_desired.arch) + self._run_transaction("update") + installed = self._handle_filter_packages("installed", False, False, + **pkg.key_props) + if len(installed) < 1: + raise errors.TransactionExecutionFailed( + "Failed to update to desired package %s." % pkg) + return installed[0] + + @_needs_database + def _handle_update_package(self, pkg, to_epoch, to_version, to_release, + _force=False): + """ + Handler for package update job. + @return updated package instance + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('installed', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg = pkgs[-1] + else: + pkg = self._lookup_package(pkg) + if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + raise errors.PackageNotInstalled(pkg) + kwargs = { "name" : pkg.name, "arch" : pkg.arch } + if any(v is not None for v in (to_epoch, to_version, to_release)): + kwargs["update_to"] = True + if to_epoch: + kwargs["to_epoch"] = to_epoch + if to_version: + kwargs["to_version"] = to_version + if to_release: + kwargs["to_release"] = to_release + self._yum_base.update(**kwargs) + self._run_transaction("update") + kwargs = dict( (k[3:] if k.startswith("to_") else k, v) + for k, v in kwargs.items()) + installed = self._handle_filter_packages( + "installed", False, False, **kwargs) + if len(installed) < 1: + raise errors.TransactionExecutionFailed( + "Failed to update package %s." % pkg) + return installed[0] + + @_needs_database + def _handle_check_package(self, pkg, file_name=None): + """ + @return PackageCheck instance for requested package + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('installed', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + rpm = pkgs[-1] + pkg = self._transform_packages((rpm, ), cache_packages=False)[0] + else: + rpm = self._lookup_package(pkg) + if not isinstance(rpm, yum.rpmsack.RPMInstalledPackage): + raise errors.PackageNotInstalled(rpm) + vpkg = yum.packages._RPMVerifyPackage(rpm, rpm.hdr.fiFromHeader(), + packagecheck.pkg_checksum_type(rpm), [], True) + return (pkg, packagecheck.make_package_check_from_db(vpkg, + file_name=file_name)) + + @_needs_database + def _handle_check_package_file(self, pkg, file_name): + """ + @return PackageCheck instance for requested package containing + just one PackageFile instance for given ``file_name``. + """ + return self._handle_check_package(pkg, file_name) + + @_needs_database + def _handle_install_package_from_uri(self, uri, + update_only=False, force=False): + """ + @return installed PackageInfo instance + """ + try: + pkg = yum.packages.YumUrlPackage(self, + ts=self._yum_base.rpmdb.readOnlyTS(), url=uri, + ua=default_grabber) + except yum.Errors.MiscError as exc: + raise errors.PackageOpenError(uri, str(exc)) + installed = self._handle_filter_packages("installed", False, False, + nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS")) + if installed and force is False: + raise errors.PackageAlreadyInstalled(pkg) + kwargs = { 'po' : pkg } + if installed: + action = 'reinstallLocal' + else: + action = 'installLocal' + kwargs = { 'updateonly' : update_only } + getattr(self._yum_base, action)(uri, **kwargs) + self._run_transaction('installLocal') + installed = self._handle_filter_packages("installed", False, False, + nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS")) + if len(installed) < 1: + raise errors.TransactionExecutionFailed( + "Failed to install desired package %s." % pkg) + return installed[0] + + @_needs_database + def _handle_get_repository_list(self, kind, transform=True): + """ + @return list of yumdb.Repository instances + """ + self._check_repository_configs() + if kind == 'enabled': + repos = sorted(self._yum_base.repos.listEnabled()) + else: + repos = self._yum_base.repos.repos.values() + if kind == 'disabled': + repos = [repo for repo in repos if not repo.enabled] + repos.sort() + if transform: + repos = [repository.make_repository_from_db(r) for r in repos] + LOG.debug("returning %d repositories from %s", + len(repos), kind) + return repos + + @_needs_database + def _handle_filter_repositories(self, kind, **filters): + """ + @return list of yumdb.Repository instances -- filtered + """ + filters = dict((k, v) for k, v in filters.items() if v is not None) + if 'repoid' in filters: + self._check_repository_configs() + try: + repo = repository.make_repository_from_db( + self._yum_base.repos.getRepo(filters["repoid"])) + if ( (kind == "enabled" and not repo.enabled) + or (kind == "disabled" and repo.enabled)): + LOG.warn( + 'no such repository with id="%s"matching filters', + filters['repoid']) + return [] + LOG.debug( + "exactly one repository matching filters found") + return [repo] + except (KeyError, yum.Errors.RepoError): + LOG.warn('repository with id="%s" could not be found', + filters['repoid']) + raise errors.RepositoryNotFound(filters['repoid']) + + repos = self._handle_get_repository_list(kind, transform=False) + result = [] + for repo in repos: + # do the filtering and safe transformed repo into result + for prop, value in filters.items(): + if repository.get_prop_from_yum_repo(repo, prop) != value: + # did not pass the filter + break + else: # all properties passed + result.append(repository.make_repository_from_db(repo)) + LOG.debug("found %d repositories matching", len(result)) + return result + + @_needs_database + def _handle_set_repository_enabled(self, repo, enable): + """ + @return previous enabled state + """ + self._check_repository_configs() + if isinstance(repo, repository.Repository): + repoid = repo.repoid + else: + repoid = repo + try: + repo = self._yum_base.repos.getRepo(repoid) + except (KeyError, yum.Errors.RepoError): + raise errors.RepositoryNotFound(repoid) + res = repo.enabled + try: + if enable ^ res: + if enable is True: + LOG.info("enabling repository %s" % repo) + repo.enable() + else: + LOG.info("disabling repository %s" % repo) + repo.disable() + try: + yum.config.writeRawRepoFile(repo, only=["enabled"]) + except Exception as exc: + raise errors.RepositoryChangeError( + 'failed to modify repository "%s": %s - %s' % ( + repo, exc.__class__.__name__, str(exc))) + else: + LOG.info("no change for repo %s", repo) + except yum.Errors.RepoError as exc: + raise errors.RepositoryChangeError( + 'failed to modify repository "%s": %s' % (repo, str(exc))) + return res + + # ************************************************************************* + # Public properties + # ************************************************************************* + @property + def uplink(self): + """ + @return input queue for jobs + """ + return self._jobmgr.queue_in + + @property + def downlink(self): + """ + @return output queue for job results + """ + return self._jobmgr.queue_out + + # ************************************************************************* + # Public methods + # ************************************************************************* + def run(self): + """ + Thread's entry point. After initial setup it calls _main_loop(). + """ + if self._logging_config is not None: + setup_logging(self._logging_config) + global LOG + LOG = logging.getLogger(__name__) + LOG.info("running as pid=%d", self.pid) + self._jobmgr.start() + LOG.info("started %s as thread %s", + self._jobmgr.name, self._jobmgr.ident) + self._pkg_cache = weakref.WeakValueDictionary() + + # This allows the code, that can be run both from broker and + # YumWorker, to check, whether it's called by this process. + from lmi.software.yumdb import YumDB + YumDB.RUNNING_UNDER_CIMOM_PROCESS = False + + self._main_loop() + LOG.info("terminating") + diff --git a/src/software/lmi/software/yumdb/repository.py b/src/software/lmi/software/yumdb/repository.py new file mode 100644 index 0000000..758ae21 --- /dev/null +++ b/src/software/lmi/software/yumdb/repository.py @@ -0,0 +1,202 @@ +# -*- encoding: utf-8 -*- +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Module holding an abstraction for YUM repository. +""" + +from datetime import datetime +import logging +import yum +import yum.Errors + +# maps names of Repository properties to their corresponding property +# names in YumRepository object +PROPERTY_NAME_MAP = { + "repoid" : "id", + "base_urls" : "baseurl", + "config_file" : "repofile", + "cost" : "cost", + "enabled" : "enabled", + "gpg_check" : "gpgcheck", + "last_edit" : "repo_config_age", + "mirror_list" : "mirrorlist", + "mirror_urls" : "mirrorurls", + "name" : "name", + "pkg_dir" : "pkgdir", + "ready" : "ready", + "repo_gpg_check" : "repo_gpgcheck", + "timeout" : "timeout" +} + +def get_prop_from_yum_repo(repo, prop_name): + """ + @param prop_name is one Repository properties + @return value of property from object of YumRepository + """ + if not isinstance(repo, yum.yumRepo.YumRepository): + raise TypeError("repo must be in instance of yum.yumRepo.YumRepository") + if prop_name in PROPERTY_NAME_MAP: + val = getattr(repo, PROPERTY_NAME_MAP[prop_name]) + if prop_name == "last_edit": + val = datetime.fromtimestamp(val) + elif prop_name == "mirror_urls" and not repo.mirrorlist: + val = None + elif prop_name == "ready": + val = val() + elif prop_name in {"arch", "basearch", "releasever"}: + val = repo.yumvar[prop_name] + elif prop_name in {"revision", "last_update"}: + if repo.enabled and repo.repoXML: + md = repo.repoXML + if prop_name == "last_update": + val = datetime.fromtimestamp(md.timestamp) + elif prop_name == "revision": + val = long(md.revision) + else: + val = getattr(repo.repoXML, prop_name) + else: + val = None + elif prop_name == "pkg_count": + # this needs populated sack: ydb.repos.populateSack(repo.id) + val = len(repo.sack) + else: + raise ValueError('Unknown repository property: "%s"' % prop_name) + return val + +class Repository(object): + """ + Container for repository metadata. It represents yum repository. + It's supposed to be passed from YumWorker to YumDB client and + vice-versa. + """ + __slots__ = ( + "objid", # [int] id of python object on server process + "repoid", # [string] repository id name + # (name of config file) + + "arch", # [string] architecture of packages + "basearch", # [string] base system architecture + "base_urls", # [list] base urls as strings + #"cache", + #"cache_dir", + "name", # [string] repository descriptive name + "config_file", # [string] file path to corresponding + # config file + "cost", # [int] cost of repository + "enabled", # [boolean] whether repo is enabled + "gpg_check", # [boolean] whether to check gpg signature + #"metadata_expire", # how long are metadata considered valid + "last_edit", # datetime of last config modification + "last_update", # datetime of last change of repository + # on server + "mirror_list", # url of mirrorlist, or None + "mirror_urls", # list of urls obtained from mirrorlist or None + #"persist_dir", # + #"pkg_count", # number of packages in directory + "pkg_dir", # directory with packages + #"proxy", # boolean saying whether this is a proxy + "ready", # boolean saying, whether it's ready for use + "releasever", # version of targeted distribution + "repo_gpg_check", # [boolean] whether to check gpg + # signarues of data + "revision", + "timeout", # timeout for requests + ) + + def __init__(self, objid, repoid, arch, basearch, base_urls, + config_file, cost, enabled, gpg_check, last_edit, last_update, + name, pkg_dir, ready, releasever, repo_gpg_check, revision, + timeout, mirror_list=None, mirror_urls=None): + for arg in ('last_edit', 'last_update'): + if ( locals()[arg] is not None + and not isinstance(locals()[arg], datetime)): + raise TypeError("%s must be an instance of datetime" % arg) + if not isinstance(timeout, float): + raise TypeError("timeout must be a float") + for arg in ('cost', 'revision'): + if ( locals()[arg] is not None + and not isinstance(locals()[arg], (int, long))): + raise TypeError("%s must be an integer" % arg) + self.objid = objid + self.repoid = repoid + self.arch = arch + self.basearch = basearch + self.base_urls = list(base_urls) + self.config_file = config_file + self.cost = cost + self.enabled = bool(enabled) + self.gpg_check = bool(gpg_check) + self.last_edit = last_edit + self.last_update = last_update + self.mirror_list = "" if not mirror_list else mirror_list + self.mirror_urls = [] if not mirror_urls else list(mirror_urls) + self.name = name + #self.pkg_count = pkg_count + self.pkg_dir = pkg_dir + self.ready = bool(ready) + self.releasever = releasever + self.repo_gpg_check = bool(repo_gpg_check) + self.revision = revision + self.timeout = timeout + + def __str__(self): + return self.repoid + + def __getstate__(self): + """ + Used for serialization with pickle. + @return container content that will be serialized + """ + return dict((k, getattr(self, k)) for k in self.__slots__) + + def __setstate__(self, state): + """ + Used for deserialization with pickle. + Restores the object from serialized form. + @param state is an object created by __setstate__() method + """ + for k, value in state.items(): + setattr(self, k, value) + +def make_repository_from_db(repo): + """ + Create instance of Repository from instance of yum.yumRepo.YumRepository + @return instance of Repository + """ + if not isinstance(repo, yum.yumRepo.YumRepository): + raise TypeError("repo must be in instance of yum.yumRepo.YumRepository") + metadata = {} + for prop_name in Repository.__slots__[1:]: + try: + metadata[prop_name] = get_prop_from_yum_repo(repo, prop_name) + except yum.Errors.RepoError as exc: + # some properties can cause error (like requesting ready) + logging.getLogger(__name__).warn( + 'failed to get property "%s" of repo "%s": %s' % ( + prop_name, repo.name, exc)) + if prop_name == "ready": + metadata[prop_name] = False + continue + + res = Repository(id(repo), **metadata) + return res + diff --git a/src/software/lmi/software/yumdb/util.py b/src/software/lmi/software/yumdb/util.py new file mode 100644 index 0000000..18fcf6b --- /dev/null +++ b/src/software/lmi/software/yumdb/util.py @@ -0,0 +1,163 @@ +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# + +""" +Common utilities meant to be used only be ``yumdb`` subpackage. +""" + +from itertools import chain +import inspect +import logging +import os + +from lmi.common import cmpi_logging + +class DispatchingFormatter: + """ + Formatter class for logging module. It allows to predefine different + format string for paricular module names. + """ + def __init__(self, formatters, default): + """ + *format* in parameters description can be either ``string`` or + another formatter object. + + :param formatters (``dict``) Mapping of module names to *format*. + :param default Default *format*. + """ + for k, formatter in formatters.items(): + if isinstance(formatter, basestring): + formatters[k] = logging.Formatter(formatter) + self._formatters = formatters + if isinstance(default, basestring): + default = logging.Formatter(default) + self._default_formatter = default + + def format(self, record): + """ + Interface for logging module. + """ + formatter = self._formatters.get(record.name, self._default_formatter) + return formatter.format(record) + +# ***************************************************************************** +# Decorators +# ***************************************************************************** +def trace_function(func): + """ + Decorator for logging entries and exits of function or method. + """ + if not inspect.ismethod(func) and not inspect.isfunction(func): + raise TypeError("func must be a function") + + def _print_value(val): + """ + Used here for printing function arguments. Shortens the output + string, if that would be too long. + """ + if isinstance(val, list): + if len(val) < 2: + return str(val) + else: + return "[%s, ...]" % _print_value(val[0]) + return str(val) + + logger = logging.getLogger(__name__+'.trace_function') + module = func.__module__.split('.')[-1] + lineno = inspect.currentframe().f_back.f_lineno + + def _wrapper(self, *args, **kwargs): + """ + Wrapper for function or method, that does the logging. + """ + if logger.isEnabledFor(logging.DEBUG): + frm = inspect.currentframe() + logargs = { + "caller_file" : os.path.basename(os.path.splitext( + frm.f_back.f_code.co_filename)[0]), + "caller_lineno" : frm.f_back.f_lineno, + "module" : module, + "func" : func.__name__, + "lineno" : lineno, + "action" : "entering", + "args" : ", ".join(chain( + (_print_value(a) for a in args), + ( "%s=%s"%(k, _print_value(v)) + for k, v in kwargs.items()))) + } + + if not logargs["args"]: + logargs["args"] = "" + else: + logargs["args"] = " with args=(%s)" % logargs["args"] + logger.debug("%(caller_file)s:%(caller_lineno)d - %(action)s" + " %(module)s:%(func)s:%(lineno)d%(args)s" , logargs) + try: + result = func(self, *args, **kwargs) + if logger.isEnabledFor(logging.DEBUG): + logargs["action"] = "exiting" + logger.debug("%(caller_file)s:%(caller_lineno)d - %(action)s" + " %(module)s:%(func)s:%(lineno)d", logargs) + except Exception as exc: + if logger.isEnabledFor(logging.DEBUG): + logargs['action'] = 'exiting' + logargs['error'] = str(exc) + logger.debug("%(caller_file)s:%(caller_lineno)d - %(action)s" + " %(module)s:%(func)s:%(lineno)d with error: %(error)s", + logargs) + raise + return result + + return _wrapper + +def setup_logging(config): + """ + This is meant to be used by ``YumWorker`` process to setup logging + independent of what providers are using. Unfortunately ``YumWorker`` + can not use the same facilities as the rest of program, because + logging is done through *broker*. + """ + try: + logging.config.dictConfig(config) + cmpi_logging.logger = logging.getLogger('lmi.software.yumdb') + except Exception: #pylint: disable=W0703 + # logging is not set up but client expects us to work + # all messages are dumped to /dev/null + logging.config.dictConfig({ + 'version' : 1, + 'disable_existing_loggers' : True, + 'handlers': { + 'null' : { + 'class': 'logging.handlers.FileHandler', + 'level': 'CRITICAL', + 'filename': '/dev/null' + } + }, + 'loggers' : { + 'root' : { + 'level': 'CRITICAL', + 'handlers' : ['null'], + 'propagate' : False + } + } + }) + + |