summaryrefslogtreecommitdiffstats
path: root/plugins/omhdfs/omhdfs.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omhdfs/omhdfs.c')
-rw-r--r--plugins/omhdfs/omhdfs.c42
1 files changed, 33 insertions, 9 deletions
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index 42ed834f..eefea722 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -47,6 +47,7 @@
#include "unicode-helper.h"
#include "errmsg.h"
#include "hashtable.h"
+#include "hashtable_itr.h"
MODULE_TYPE_OUTPUT
@@ -200,7 +201,6 @@ fileObjDestruct(file_t **ppFile)
static void
fileObjDestruct4Hashtable(void *ptr)
{
- dbgprintf("omfile: fileObjDestruct4Hashtable called\n");
file_t *pFile = (file_t*) ptr;
fileObjDestruct(&pFile);
}
@@ -214,6 +214,7 @@ fileOpen(file_t *pFile)
assert(pFile->fh == NULL);
if(pFile->nUsers > 1)
d_pthread_mutex_lock(&pFile->mut);
+
DBGPRINTF("omhdfs: try to connect to HDFS at host '%s', port %d\n",
pFile->hdfsHost, pFile->hdfsPort);
pFile->fs = hdfsConnect(pFile->hdfsHost, pFile->hdfsPort);
@@ -256,6 +257,17 @@ fileWrite(file_t *pFile, uchar *buf)
assert(pFile->fh != NULL);
if(pFile->nUsers > 1)
d_pthread_mutex_lock(&pFile->mut);
+
+ /* open file if not open. This must be done *here* and while mutex-protected
+ * because of HUP handling (which is async to normal processing!).
+ */
+ if(pFile->fh == NULL) {
+ fileOpen(pFile);
+ if(pFile->fh == NULL) {
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ }
+
lenWrite = strlen((char*) buf);
tSize num_written_bytes = hdfsWrite(pFile->fs, pFile->fh, buf, lenWrite);
if((unsigned) num_written_bytes != lenWrite) {
@@ -321,15 +333,8 @@ ENDtryResume
BEGINdoAction
CODESTARTdoAction
- DBGPRINTF(" (%s)\n", pData->pFile->name);
- if(pData->pFile->fh == NULL) {
- fileOpen(pData->pFile);
- if(pData->pFile->fh == NULL) {
- ABORT_FINALIZE(RS_RET_SUSPENDED);
- }
- }
+ DBGPRINTF("omuxsock: action to to write to %s\n", pData->pFile->name);
iRet = fileWrite(pData->pFile, ppString[0]);
-finalize_it:
ENDdoAction
@@ -382,6 +387,24 @@ CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
+BEGINdoHUP
+ file_t *pFile;
+ struct hashtable_itr *itr;
+CODESTARTdoHUP
+ /* Iterator constructor only returns a valid iterator if
+ * the hashtable is not empty */
+ itr = hashtable_iterator(files);
+ if(hashtable_count(files) > 0)
+ {
+ do {
+ pFile = (file_t *) hashtable_iterator_value(itr);
+ fileClose(pFile);
+ DBGPRINTF("imuxsock: HUP, closing file %s\n", pFile->name);
+ } while (hashtable_iterator_advance(itr));
+ }
+ENDdoHUP
+
+
/* Reset config variables for this module to default values.
* rgerhards, 2007-07-17
*/
@@ -415,6 +438,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_doHUP
ENDqueryEtryPt