diff options
-rw-r--r-- | src/software/openlmi/software/yumdb/__init__.py | 30 |
1 files changed, 25 insertions, 5 deletions
diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py index b5160b4..d64c0f4 100644 --- a/src/software/openlmi/software/yumdb/__init__.py +++ b/src/software/openlmi/software/yumdb/__init__.py @@ -242,6 +242,8 @@ class YumDB(singletonmixin.Singleton): self._reply_cond = threading.Condition(self._reply_lock) # ids of all expected jobs -- those to be processed by YumWorker self._expected = [] + # {job_id : reply, ... } + self._replies = {} cmpi_logging.logger.trace_info('YumDB: initialized') # ************************************************************************* @@ -291,6 +293,11 @@ class YumDB(singletonmixin.Singleton): Block on downlink queue to receive expected replies from worker process. Only one thread can be executing this code at any time. + Only one thread can block on downlink channel to obtain reply. If + it's reply for him, he takes it and leaves, otherwise he adds it to + _replies dictionary and notifies other threads. This thread is the + one, whose job appears as first in _expected list. + In case, that worker process terminated due to some error. Restart it and resend all the job requests again. """ @@ -301,18 +308,20 @@ class YumDB(singletonmixin.Singleton): jobout = self._worker.downlink.get( block=True, timeout=MAX_JOB_WAIT_TIME) if jobout.jobid == job.jobid: + cmpi_logging.logger.debug( + "[jobid=%d] received desired reply", job.jobid) with self._reply_lock: - cmpi_logging.logger.debug( - "[jobid=%d] received desired reply", job.jobid) self._expected.remove(job.jobid) if len(self._expected): self._reply_cond.notify() return jobout else: - # this should not happen - cmpi_logging.logger.error("[jobid=%d] received reply" + cmpi_logging.logger.info("[jobid=%d] received reply" " for another thread (jobid=%d)", job.jobid, jobout.jobid) + with self._reply_lock: + self._replies[jobout.jobid] = jobout + self._reply_cond.notifyAll() except TQueue.Empty: cmpi_logging.logger.warn("[jobid=%d] wait for job reply timeout" "(%d seconds) occured", job.jobid, MAX_JOB_WAIT_TIME) @@ -339,7 +348,18 @@ class YumDB(singletonmixin.Singleton): return job.jobid self._expected.append(job.jobid) while True: - if job.jobid not in self._expected: + if job.jobid in self._replies: + cmpi_logging.logger.debug( + "[jobid=%d] desired reply already received", + job.jobid) + try: + self._expected.remove(job.jobid) + except ValueError: + cmpi_logging.logger.warn( + "[jobid=%d] reply not in expected list", + job.jobid) + return self._replies.pop(job.jobid) + elif job.jobid not in self._expected: # process terminated, resending job cmpi_logging.logger.warn("[jobid=%d] job removed" " from expected list, sending request again", job.jobid) |