From 670e81c9a8275a5509efd71dae66d9a267ec1574 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 1 Oct 2010 14:36:39 +0000 Subject: omhdfs: made action suspend/resume working --- plugins/omhdfs/omhdfs.c | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/plugins/omhdfs/omhdfs.c b/plugins/omhdfs/omhdfs.c index 734c28cd..4fbf2ef4 100644 --- a/plugins/omhdfs/omhdfs.c +++ b/plugins/omhdfs/omhdfs.c @@ -276,16 +276,19 @@ fileClose(file_t *pFile) { DEFiRet; + if(pFile->fh == NULL) + FINALIZE; + if(pFile->nUsers > 1) d_pthread_mutex_lock(&pFile->mut); - if(pFile->fh != NULL) { - hdfsCloseFile(pFile->fs, pFile->fh); - pFile->fh = NULL; - } + + hdfsCloseFile(pFile->fs, pFile->fh); + pFile->fh = NULL; if(pFile->nUsers > 1) d_pthread_mutex_unlock(&pFile->mut); +finalize_it: RETiRet; } @@ -307,12 +310,26 @@ ENDfreeInstance BEGINtryResume CODESTARTtryResume + fileClose(pData->pFile); + fileOpen(pData->pFile); + if(pData->pFile->fh == NULL){ + dbgprintf("omhdfs: tried to resume file %s, but still no luck...\n", + pData->pFile->name); + iRet = RS_RET_SUSPENDED; + } ENDtryResume BEGINdoAction CODESTARTdoAction DBGPRINTF(" (%s)\n", pData->pFile->name); + if(pData->pFile->fh == NULL) { + fileOpen(pData->pFile); + if(pData->pFile->fh == NULL) { + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + } iRet = fileWrite(pData->pFile, ppString[0]); +finalize_it: ENDdoAction @@ -349,17 +366,16 @@ CODESTARTparseSelectorAct CHKmalloc(pFile->hdfsHost = strdup((hdfsHost == NULL) ? "default" : (char*) hdfsHost)); pFile->hdfsPort = hdfsPort; fileOpen(pFile); + if(pFile->fh == NULL){ + errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - " + "retrying later", pFile->name); + iRet = RS_RET_SUSPENDED; + } r = hashtable_insert(files, keybuf, pFile); if(r == 0) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } fileObjAddUser(pFile); - - if(pFile->fh == NULL){ - errmsg.LogError(0, RS_RET_ERR_HDFS_OPEN, "omhdfs: failed to open %s - retrying later", pFile->name); - iRet = RS_RET_SUSPENDED; - } - pData->pFile = pFile; CODE_STD_FINALIZERparseSelectorAct -- cgit