diff options
-rw-r--r-- | pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java | 169 |
1 files changed, 160 insertions, 9 deletions
diff --git a/pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java b/pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java index a52050e04..851cbee12 100644 --- a/pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java +++ b/pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java @@ -20,9 +20,12 @@ package com.netscape.certsrv.request; import java.util.*; +import com.netscape.certsrv.apps.*; import com.netscape.certsrv.request.*; import com.netscape.certsrv.logging.ILogger; - +import com.netscape.certsrv.ldap.*; +import com.netscape.certsrv.publish.IPublisherProcessor; +import com.netscape.certsrv.ca.ICertificateAuthority; /** * The ARequestNotifier class implements the IRequestNotifier interface, @@ -31,7 +34,35 @@ import com.netscape.certsrv.logging.ILogger; * @version $Revision: 14561 $, $Date: 2007-05-01 10:28:56 -0700 (Tue, 01 May 2007) $ */ public class ARequestNotifier implements IRequestNotifier { - Hashtable mListeners = new Hashtable(); + private Hashtable mListeners = new Hashtable(); + private Vector mNotifierThreads = new Vector(); + private Vector mRequests = new Vector(); + private boolean checkMaxThreads = true; + private int maxThreads = 1; + private ICertificateAuthority mCA = null; + private boolean mIsPublishingQueueEnabled = false; + private int mPublishingQueuePriorityLevel = 2; + private int mPublishingQueuePriority = 0; + + + public ARequestNotifier() { + mPublishingQueuePriority = Thread.currentThread().getPriority(); + } + + public ARequestNotifier(ICertificateAuthority ca, boolean isPublishingQueueEnabled, int publishingQueuePriorityLevel) { + mCA = ca; + mIsPublishingQueueEnabled = isPublishingQueueEnabled; + mPublishingQueuePriorityLevel = publishingQueuePriorityLevel; + + // Publishing Queue Priority Levels: 2 - maximum, 1 - raised, 0 - normal + if (publishingQueuePriorityLevel > 1) { + mPublishingQueuePriority = Thread.MAX_PRIORITY; + } else if (publishingQueuePriorityLevel > 0) { + mPublishingQueuePriority = (Thread.currentThread().getPriority() + Thread.MAX_PRIORITY) / 2; + } else { + mPublishingQueuePriority = Thread.currentThread().getPriority(); + } + } /** * Registers a request listener. @@ -94,6 +125,64 @@ public class ARequestNotifier implements IRequestNotifier { } /** + * Gets list of listeners. + * + * @return enumeration of listeners + */ + public Enumeration getListeners() { + return mListeners.elements(); + } + + /** + * Gets request from publishing queue. + * + * @return request + */ + public IRequest getRequest() { + IRequest r = null; + + CMS.debug("getRequest 1 mRequests.size = " + mRequests.size()); + if (mRequests.size() > 0) { + r = (IRequest)mRequests.elementAt(0); + if (r != null) mRequests.remove(0); + } + CMS.debug("getRequest 2 mRequests.size = " + mRequests.size()); + + return r; + } + + /** + * Gets number of requests in publishing queue. + * + * @return number of requests in publishing queue + */ + public int getNumberOfRequests() { + return mRequests.size(); + } + + /** + * Checks if publishing queue is enabled. + * + * @return true if publishing queue is enabled, false otherwise + */ + public boolean isPublishingQueueEnabled() { + return mIsPublishingQueueEnabled; + } + + /** + * Removes a notifier thread from the pool of publishing queue threads. + * + * @param notifierThread Thread + */ + public synchronized void removeNotifierThread(Thread notifierThread) { + CMS.debug("about removeNotifierThread "+ mNotifierThreads.size()); + if (mNotifierThreads.size() > 0) { + mNotifierThreads.remove(notifierThread); + } + CMS.debug("removeNotifierThread done "+ mNotifierThreads.size()); + } + + /** * Notifies all registered listeners about request. * * @param r request @@ -112,6 +201,47 @@ public class ARequestNotifier implements IRequestNotifier { */ } } + + /** + * Notifies all registered listeners about request. + * + * @param r request + */ + public synchronized void addToNotify(IRequest r) { + CMS.debug("checkMaxThreads = " + checkMaxThreads); + if (checkMaxThreads) { + if (mCA != null) { + IPublisherProcessor publisher = mCA.getPublisherProcessor(); + ILdapConnModule ldapConnModule = null; + ILdapConnFactory ldapConnFactory = null; + if (publisher != null) ldapConnModule = publisher.getLdapConnModule(); + if (ldapConnModule != null) ldapConnFactory = ldapConnModule.getLdapConnFactory(); + if (ldapConnFactory != null) { + int n = ldapConnFactory.maxConn(); + if (n > 1) maxThreads = n - 1; + CMS.debug("maxThreads = " + maxThreads); + } + } + checkMaxThreads = false; + } + mRequests.addElement(r); + CMS.debug("addToNotify PublishingQueue: " + mRequests.size() + " Threads: " + mNotifierThreads.size() + ":" + maxThreads + + " (" + Thread.currentThread().getPriority() + ", " + mPublishingQueuePriority + ", " + Thread.MAX_PRIORITY + ")"); + if (mNotifierThreads.size() < maxThreads) { + try { + Thread notifierThread = new Thread(new RunListeners((IRequestNotifier)this)); + if (notifierThread != null) { + mNotifierThreads.addElement(notifierThread); + if (mPublishingQueuePriority > 0) { + notifierThread.setPriority(mPublishingQueuePriority); + } + notifierThread.start(); + } + } catch (Throwable e) { + CMS.debug("addToNotify exception: " + e.toString()); + } + } + } } @@ -122,6 +252,7 @@ public class ARequestNotifier implements IRequestNotifier { class RunListeners implements Runnable { IRequest mRequest = null; Enumeration mListeners = null; + IRequestNotifier mRequestNotifier = null; /** * RunListeners class constructor. @@ -135,16 +266,36 @@ class RunListeners implements Runnable { } /** + * RunListeners class constructor. + * + * @param r request + * @param listeners list of listeners + */ + public RunListeners(IRequestNotifier requestNotifier) { + mRequestNotifier = requestNotifier; + mListeners = mRequestNotifier.getListeners(); + } + + /** * RunListeners thread implementation. */ public void run() { - if (mListeners != null) { - while (mListeners.hasMoreElements()) { - IRequestListener l = - (IRequestListener) mListeners.nextElement(); - - l.accept(mRequest); + CMS.debug("RunListeners::"+((mRequestNotifier != null && mRequestNotifier.getNumberOfRequests() > 0)?" Queue: "+mRequestNotifier.getNumberOfRequests():" noQueue")+ + " "+((mRequest != null)?" SingleRequest":" noSingleRequest")); + do { + if (mRequestNotifier != null) mRequest = (IRequest)mRequestNotifier.getRequest(); + if (mListeners != null && mRequest != null) { + while (mListeners.hasMoreElements()) { + IRequestListener l = (IRequestListener) mListeners.nextElement(); + CMS.debug("RunListeners: IRequestListener = " + l.getClass().getName()); + l.accept(mRequest); + } } - } + CMS.debug("RunListeners: "+((mRequestNotifier != null && mRequestNotifier.getNumberOfRequests() > 0)?" Queue: "+mRequestNotifier.getNumberOfRequests():" noQueue")+ + " "+((mRequest != null)?" SingleRequest":" noSingleRequest")); + if (mRequestNotifier != null) mListeners = mRequestNotifier.getListeners(); + } while (mRequestNotifier != null && mRequestNotifier.getNumberOfRequests() > 0); + + if (mRequestNotifier != null) mRequestNotifier.removeNotifierThread(Thread.currentThread()); } } |