summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2013-03-26 10:51:52 +0100
committerMichal Minar <miminar@redhat.com>2013-03-26 11:04:34 +0100
commite4c58708ed07313059044a76c20a1336060cd473 (patch)
tree144d70674e43ad51346e6797622d52e1b55c0c4d /src
parent6565f1261bf5ec326e780a2b2becbb8200c63b7c (diff)
downloadopenlmi-providers-e4c58708ed07313059044a76c20a1336060cd473.tar.gz
openlmi-providers-e4c58708ed07313059044a76c20a1336060cd473.tar.xz
openlmi-providers-e4c58708ed07313059044a76c20a1336060cd473.zip
fixed concurrency issue
readded _replies dictionary to YumDB client to cache answers from YumWorker process, that can change the order of jobs to process
Diffstat (limited to 'src')
-rw-r--r--src/software/openlmi/software/yumdb/__init__.py30
1 files changed, 25 insertions, 5 deletions
diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py
index b5160b4..d64c0f4 100644
--- a/src/software/openlmi/software/yumdb/__init__.py
+++ b/src/software/openlmi/software/yumdb/__init__.py
@@ -242,6 +242,8 @@ class YumDB(singletonmixin.Singleton):
self._reply_cond = threading.Condition(self._reply_lock)
# ids of all expected jobs -- those to be processed by YumWorker
self._expected = []
+ # {job_id : reply, ... }
+ self._replies = {}
cmpi_logging.logger.trace_info('YumDB: initialized')
# *************************************************************************
@@ -291,6 +293,11 @@ class YumDB(singletonmixin.Singleton):
Block on downlink queue to receive expected replies from worker
process. Only one thread can be executing this code at any time.
+ Only one thread can block on downlink channel to obtain reply. If
+ it's reply for him, he takes it and leaves, otherwise he adds it to
+ _replies dictionary and notifies other threads. This thread is the
+ one, whose job appears as first in _expected list.
+
In case, that worker process terminated due to some error. Restart it
and resend all the job requests again.
"""
@@ -301,18 +308,20 @@ class YumDB(singletonmixin.Singleton):
jobout = self._worker.downlink.get(
block=True, timeout=MAX_JOB_WAIT_TIME)
if jobout.jobid == job.jobid:
+ cmpi_logging.logger.debug(
+ "[jobid=%d] received desired reply", job.jobid)
with self._reply_lock:
- cmpi_logging.logger.debug(
- "[jobid=%d] received desired reply", job.jobid)
self._expected.remove(job.jobid)
if len(self._expected):
self._reply_cond.notify()
return jobout
else:
- # this should not happen
- cmpi_logging.logger.error("[jobid=%d] received reply"
+ cmpi_logging.logger.info("[jobid=%d] received reply"
" for another thread (jobid=%d)",
job.jobid, jobout.jobid)
+ with self._reply_lock:
+ self._replies[jobout.jobid] = jobout
+ self._reply_cond.notifyAll()
except TQueue.Empty:
cmpi_logging.logger.warn("[jobid=%d] wait for job reply timeout"
"(%d seconds) occured", job.jobid, MAX_JOB_WAIT_TIME)
@@ -339,7 +348,18 @@ class YumDB(singletonmixin.Singleton):
return job.jobid
self._expected.append(job.jobid)
while True:
- if job.jobid not in self._expected:
+ if job.jobid in self._replies:
+ cmpi_logging.logger.debug(
+ "[jobid=%d] desired reply already received",
+ job.jobid)
+ try:
+ self._expected.remove(job.jobid)
+ except ValueError:
+ cmpi_logging.logger.warn(
+ "[jobid=%d] reply not in expected list",
+ job.jobid)
+ return self._replies.pop(job.jobid)
+ elif job.jobid not in self._expected:
# process terminated, resending job
cmpi_logging.logger.warn("[jobid=%d] job removed"
" from expected list, sending request again", job.jobid)