summaryrefslogtreecommitdiffstats
path: root/plugins/omhdfs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omhdfs')
-rw-r--r--plugins/omhdfs/omhdfs.c36
1 files changed, 26 insertions, 10 deletions
diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c
index 734c28cd..4fbf2ef4 100644
--- a/plugins/omhdfs/omhdfs.c
+++ b/plugins/omhdfs/omhdfs.c
@@ -276,16 +276,19 @@ fileClose(file_t *pFile)
{
DEFiRet;
+ if(pFile->fh == NULL)
+ FINALIZE;
+
if(pFile->nUsers > 1)
d_pthread_mutex_lock(&pFile->mut);
- if(pFile->fh != NULL) {
- hdfsCloseFile(pFile->fs, pFile->fh);
- pFile->fh = NULL;
- }
+
+ hdfsCloseFile(pFile->fs, pFile->fh);
+ pFile->fh = NULL;
if(pFile->nUsers > 1)
d_pthread_mutex_unlock(&pFile->mut);
+finalize_it:
RETiRet;
}
@@ -307,12 +310,26 @@ ENDfreeInstance
BEGINtryResume
CODESTARTtryResume
+ fileClose(pData->pFile);
+ fileOpen(pData->pFile);
+ if(pData->pFile->fh == NULL){
+ dbgprintf("omhdfs: tried to resume file %s, but still no luck...\n",
+ pData->pFile->name);
+ iRet = RS_RET_SUSPENDED;
+ }
ENDtryResume
BEGINdoAction
CODESTARTdoAction
DBGPRINTF(" (%s)\n", pData->pFile->name);
+ if(pData->pFile->fh == NULL) {
+ fileOpen(pData->pFile);
+ if(pData->pFile->fh == NULL) {
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+ }
iRet = fileWrite(pData->pFile, ppString[0]);
+finalize_it:
ENDdoAction
@@ -349,17 +366,16 @@ CODESTARTparseSelectorAct
CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost));
pFile->hdfsPort = hdfsPort;
fileOpen(pFile);
+ if(pFile->fh == NULL){
+ errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - "
+ "retrying later", pFile->name);
+ iRet = RS_RET_SUSPENDED;
+ }
r = hashtable_insert(files, keybuf, pFile);
if(r == 0)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
fileObjAddUser(pFile);
-
- if(pFile->fh == NULL){
- errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - retrying later", pFile->name);
- iRet = RS_RET_SUSPENDED;
- }
-
pData->pFile = pFile;
CODE_STD_FINALIZERparseSelectorAct