diff options
Diffstat (limited to 'pki/base/common/src/com/netscape/certsrv/request')
-rw-r--r-- | pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java | 207 |
1 files changed, 170 insertions, 37 deletions
diff --git a/pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java b/pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java index 4cc690b03..e4f742fff 100644 --- a/pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java +++ b/pki/base/common/src/com/netscape/certsrv/request/ARequestNotifier.java @@ -20,9 +20,13 @@ package com.netscape.certsrv.request; import java.util.*; +import com.netscape.certsrv.base.EBaseException; import com.netscape.certsrv.apps.*; import com.netscape.certsrv.request.*; +import com.netscape.certsrv.ldap.*; import com.netscape.certsrv.logging.ILogger; +import com.netscape.certsrv.ca.ICertificateAuthority; +import com.netscape.certsrv.publish.IPublisherProcessor; /** * The ARequestNotifier class implements the IRequestNotifier interface, @@ -34,25 +38,40 @@ public class ARequestNotifier implements IRequestNotifier { private Hashtable mListeners = new Hashtable(); private Vector mNotifierThreads = new Vector(); private Vector mRequests = new Vector(); + private int mMaxRequests = 100; + private boolean mSearchForRequests = false; private int mMaxThreads = 1; + private ICertificateAuthority mCA = null; private boolean mIsPublishingQueueEnabled = false; - private int mPublishingQueuePriorityLevel = 2; private int mPublishingQueuePriority = 0; + private int mMaxPublishingQueuePageSize = 1; + private IRequestQueue mRequestQueue = null; public ARequestNotifier() { mPublishingQueuePriority = Thread.currentThread().getPriority(); } - public ARequestNotifier(boolean isPublishingQueueEnabled, int publishingQueuePriorityLevel) { + public ARequestNotifier (ICertificateAuthority ca, + boolean isPublishingQueueEnabled, + int publishingQueuePriorityLevel, + int maxNumberOfPublishingThreads, + int publishingQueuePageSize) { + mCA = ca; + if (mCA != null) mRequestQueue = mCA.getRequestQueue(); mIsPublishingQueueEnabled = isPublishingQueueEnabled; - mPublishingQueuePriorityLevel = publishingQueuePriorityLevel; + mMaxThreads = maxNumberOfPublishingThreads; + mMaxRequests = publishingQueuePageSize; - // Publishing Queue Priority Levels: 2 - maximum, 1 - raised, 0 - normal + // Publishing Queue Priority Levels: 2 - maximum, 1 - higher, 0 - normal, -1 - lower, -2 - minimum if (publishingQueuePriorityLevel > 1) { mPublishingQueuePriority = Thread.MAX_PRIORITY; } else if (publishingQueuePriorityLevel > 0) { mPublishingQueuePriority = (Thread.currentThread().getPriority() + Thread.MAX_PRIORITY) / 2; + } else if (publishingQueuePriorityLevel < -1) { + mPublishingQueuePriority = Thread.MIN_PRIORITY; + } else if (publishingQueuePriorityLevel < 0) { + mPublishingQueuePriority = (Thread.currentThread().getPriority() + Thread.MIN_PRIORITY) / 2; } else { mPublishingQueuePriority = Thread.currentThread().getPriority(); } @@ -132,15 +151,72 @@ public class ARequestNotifier implements IRequestNotifier { * * @return request */ - public IRequest getRequest() { + public synchronized IRequest getRequest() { IRequest r = null; + String id = null; - CMS.debug("getRequest 1 mRequests.size = " + mRequests.size()); + CMS.debug("getRequest mRequests=" + mRequests.size() + " mSearchForRequests=" + mSearchForRequests); + if (mSearchForRequests && mRequests.size() == 1) { + if (mCA != null && mRequestQueue == null) mRequestQueue = mCA.getRequestQueue(); + if (mRequestQueue != null) { + IRequestVirtualList list = mRequestQueue.getPagedRequestsByFilter( + new RequestId((String)mRequests.elementAt(0)), + "(&(requeststate=complete)(requesttype=enrollment))", + mMaxRequests, "requestId"); + int s = list.getSize() - list.getCurrentIndex(); + CMS.debug("getRequest list size: "+s); + for (int i = 0; i < s; i++) { + r = null; + try { + r = list.getElementAt(i); + } catch (Exception e) { + // handled below + } + if (r == null) { + continue; + } + if (i == 0 && ((String)mRequests.elementAt(0)).equals(r.getRequestId().toString())) { + if (s == 1) { + break; + } else { + continue; + } + } + if (mRequests.size() < mMaxRequests) { + mRequests.addElement(r.getRequestId().toString()); + CMS.debug("getRequest added "+r.getRequestType()+" request "+r.getRequestId().toString()+ + " to mRequests: " + mRequests.size()+" ("+mMaxRequests+")"); + } else { + break; + } + } + CMS.debug("getRequest done with adding requests to mRequests: " + mRequests.size()); + } else { + CMS.debug("getRequest has no access to the request queue"); + } + } if (mRequests.size() > 0) { - r = (IRequest)mRequests.elementAt(0); - if (r != null) mRequests.remove(0); + id = (String)mRequests.elementAt(0); + if (id != null) { + CMS.debug("getRequest getting request: " + id); + if (mCA != null && mRequestQueue == null) mRequestQueue = mCA.getRequestQueue(); + if (mRequestQueue != null) { + try { + r = mRequestQueue.findRequest(new RequestId(id)); + mRequests.remove(0); + CMS.debug("getRequest request "+ id + ((r != null)?" found":" not found")); + } catch (EBaseException e) { + CMS.debug("getRequest EBaseException " + e.toString()); + } + } else { + CMS.debug("getRequest has no access to the request queue"); + } + } + if (mRequests.size() == 0) { + mSearchForRequests = false; + } } - CMS.debug("getRequest 2 mRequests.size = " + mRequests.size()); + CMS.debug("getRequest mRequests=" + mRequests.size() + " mSearchForRequests=" + mSearchForRequests + " done"); return r; } @@ -164,28 +240,15 @@ public class ARequestNotifier implements IRequestNotifier { } /** - * Sets maximum number of publishing threads. - * - * @param maxNumberOfThreads integer - */ - public void setMaxNumberOfPublishingThreads(int maxNumberOfThreads) { - if (maxNumberOfThreads > 1) { - mMaxThreads = maxNumberOfThreads; - } - CMS.debug("Number of publishing threads set to " + mMaxThreads); - } - - /** * Removes a notifier thread from the pool of publishing queue threads. * * @param notifierThread Thread */ - public synchronized void removeNotifierThread(Thread notifierThread) { - CMS.debug("about removeNotifierThread "+ mNotifierThreads.size()); + public void removeNotifierThread(Thread notifierThread) { if (mNotifierThreads.size() > 0) { mNotifierThreads.remove(notifierThread); } - CMS.debug("removeNotifierThread done "+ mNotifierThreads.size()); + CMS.debug("Number of publishing threads: " + mNotifierThreads.size()); } /** @@ -209,26 +272,96 @@ public class ARequestNotifier implements IRequestNotifier { } /** + * Checks for available publishing connections + * + * @return true if there are available publishing connections, false otherwise + */ + private boolean checkAvailablePublishingConnections() { + boolean availableConnections = false; + + IPublisherProcessor pp = null; + if (mCA != null) pp = mCA.getPublisherProcessor(); + if (pp != null && pp.enabled()) { + ILdapConnModule ldapConnModule = pp.getLdapConnModule(); + if (ldapConnModule != null) { + ILdapConnFactory ldapConnFactory = ldapConnModule.getLdapConnFactory(); + if (ldapConnFactory != null) { + CMS.debug("checkAvailablePublishingConnections maxConn: " + ldapConnFactory.maxConn() + + " totalConn: " + ldapConnFactory.totalConn()); + if (ldapConnFactory.maxConn() > ldapConnFactory.totalConn()) { + availableConnections = true; + } + } else { + CMS.debug("checkAvailablePublishingConnections ldapConnFactory is not accessible"); + } + } else { + CMS.debug("checkAvailablePublishingConnections ldapConnModule is not accessible"); + } + } else { + CMS.debug("checkAvailablePublishingConnections PublisherProcessor is not " + + ((pp != null)?"enabled":"accessible")); + } + + return availableConnections; + } + + /** + * Checks if more publishing threads can be added. + * + * @return true if more publishing threads can be added, false otherwise + */ + private boolean morePublishingThreads() { + boolean moreThreads = false; + + if (mNotifierThreads.size() == 0) { + moreThreads = true; + } else if (mNotifierThreads.size() < mMaxThreads) { + CMS.debug("morePublishingThreads ("+mRequests.size()+">"+ + ((mMaxRequests * mNotifierThreads.size()) / mMaxThreads)+ + " "+"("+mMaxRequests+"*"+mNotifierThreads.size()+"):"+mMaxThreads); + // gradually add new publishing threads + if (mRequests.size() > ((mMaxRequests * mNotifierThreads.size()) / mMaxThreads)) { + // check for available publishing connections + if (checkAvailablePublishingConnections()) { + moreThreads = true; + } + } + } + CMS.debug("morePublishingThreads moreThreads: " + moreThreads); + + return moreThreads; + } + + + /** * Notifies all registered listeners about request. * * @param r request */ public synchronized void addToNotify(IRequest r) { - mRequests.addElement(r); - CMS.debug("addToNotify PublishingQueue: " + mRequests.size() + " Threads: " + mNotifierThreads.size() + ":" + mMaxThreads + - " (" + Thread.currentThread().getPriority() + ", " + mPublishingQueuePriority + ", " + Thread.MAX_PRIORITY + ")"); - if (mNotifierThreads.size() < mMaxThreads) { - try { - Thread notifierThread = new Thread(new RunListeners((IRequestNotifier)this)); - if (notifierThread != null) { - mNotifierThreads.addElement(notifierThread); - if (mPublishingQueuePriority > 0) { - notifierThread.setPriority(mPublishingQueuePriority); + //mRequests.addElement(r); + if (!mSearchForRequests) { + if (mRequests.size() < mMaxRequests) { + mRequests.addElement(r.getRequestId().toString()); + CMS.debug("addToNotify extended buffer to "+mRequests.size()+"("+mMaxRequests+")"+ + " requests by adding request "+r.getRequestId().toString()); + if (morePublishingThreads()) { + try { + Thread notifierThread = new Thread(new RunListeners((IRequestNotifier)this)); + if (notifierThread != null) { + mNotifierThreads.addElement(notifierThread); + CMS.debug("Number of publishing threads: " + mNotifierThreads.size()); + if (mPublishingQueuePriority > 0) { + notifierThread.setPriority(mPublishingQueuePriority); + } + notifierThread.start(); + } + } catch (Throwable e) { + CMS.debug("addToNotify exception: " + e.toString()); } - notifierThread.start(); } - } catch (Throwable e) { - CMS.debug("addToNotify exception: " + e.toString()); + } else { + mSearchForRequests = true; } } } |