summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog21
-rw-r--r--configure.ac2
-rw-r--r--doc/manual.html2
-rw-r--r--doc/omfile.html10
-rw-r--r--doc/omlibdbi.html46
-rw-r--r--plugins/omlibdbi/omlibdbi.c151
-rw-r--r--runtime/debug.c6
-rw-r--r--runtime/stream.c140
-rw-r--r--runtime/stream.h7
-rw-r--r--tools/omfile.c15
10 files changed, 299 insertions, 101 deletions
diff --git a/ChangeLog b/ChangeLog
index 25ecdcc2..6d33904e 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,23 @@
-----------------------------------------------------------------------------
+---------------------------------------------------------------------------
+Version 7.3.1 [devel] 2012-10-??
+- change lumberjack cookie to "@cee" from "@cee "
+ CEE originally specified the cookie with SP, whereas other lumberjack
+ tools used it without space. In order to keep interop with lumberjack,
+ we now use the cookie without space as well. I hope this can be changed
+ in CEE as well when it is released at a later time.
+ Thanks to Miloslav Trmač for pointing this out and a similiar v7 patch.
+---------------------------------------------------------------------------
+Version 7.3.0 [devel] 2012-10-09
+- omlibdbi improvements, added
+ * support for config load phases & module() parameters
+ * support for default templates
+ * driverdirectory is now cleanly a global parameter, but can no longer
+ be specified as an action paramter. Note that in previous versions
+ this parameter was ignored in all but the first action definition
+- improved omfile zip writer to increase compression
+ This was achieved by somewhat reducing the robustness of the zip archive.
+ This is controlled by the new action parameter "VeryReliableZip".
+---------------------------------------------------------------------------
Version 7.1.12 [beta] 2012-10-??
- minor updates to better support newer systemd developments
Thanks to Michael Biebl for the patches.
diff --git a/configure.ac b/configure.ac
index 09217d09..f817ca06 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
-AC_INIT([rsyslog],[7.1.11],[rsyslog@lists.adiscon.com])
+AC_INIT([rsyslog],[7.3.0],[rsyslog@lists.adiscon.com])
AM_INIT_AUTOMAKE
m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
diff --git a/doc/manual.html b/doc/manual.html
index cfad1a6a..ea456c44 100644
--- a/doc/manual.html
+++ b/doc/manual.html
@@ -19,7 +19,7 @@ rsyslog support</a> available directly from the source!</p>
<p><b>Please visit the <a href="http://www.rsyslog.com/sponsors">rsyslog sponsor's page</a>
to honor the project sponsors or become one yourself!</b> We are very grateful for any help towards the
project goals.</p>
-<p><b>This documentation is for version 7.1.11 (beta branch) of rsyslog.</b>
+<p><b>This documentation is for version 7.3.0 (devel branch) of rsyslog.</b>
Visit the <i><a href="http://www.rsyslog.com/status">rsyslog status page</a></i></b>
to obtain current version information and project status.
</p><p><b>If you like rsyslog, you might
diff --git a/doc/omfile.html b/doc/omfile.html
index 23ecc034..e58ea3b0 100644
--- a/doc/omfile.html
+++ b/doc/omfile.html
@@ -28,6 +28,16 @@
<li><strong>ZipLevel </strong>0..9 [default 0]<br>
if greater 0, turns on gzip compression of the output file. The higher the number, the better the compression, but also the more CPU is required for zipping.<br></li><br>
+ <li><b>VeryReliableZip</b> [<b>on</b>/off] (v7.3.0+) - if ZipLevel is greater 0,
+ then this setting controls if extra headers are written to make the resulting file
+ extra hardened against malfunction. If set to off, data appended to previously unclean
+ closed files may not be accessible without extra tools.
+ Note that this risk is usually expected to be bearable, and thus "off" is the default mode.
+ The extra headers considerably
+ degrade compression, files with this option set to "on" may be four to five times as
+ large as files processed in "off" mode.
+ </li><br>
+
<li><strong>FlushInterval </strong>(not mandatory, default will be used)<br>
Defines a template to be used for the output. <br></li><br>
diff --git a/doc/omlibdbi.html b/doc/omlibdbi.html
index 008dcb81..e47c7f57 100644
--- a/doc/omlibdbi.html
+++ b/doc/omlibdbi.html
@@ -54,7 +54,23 @@ dlopen()ed plugin (as omlibdbi is). So in short, you probably save you
a lot of headache if you make sure you have at least libdbi version
0.8.3 on your system.
</p>
-<p><b>Action Parameters</b>:</p>
+<p><b>Module Parameters</b></p>
+<ul>
+<li><b>template</b><br>
+The default template to use. This template is used when no template is
+explicitely specified in the action() statement.
+<li><b>driverdirectory</b><br>
+Path to the libdbi drivers. Usually,
+you do not need to set it. If you installed libdbi-drivers at a
+non-standard location, you may need to specify the directory here. If
+you are unsure, do <b>not</b> use this configuration directive.
+Usually, everything works just fine.
+Note that this was an action() paramter in rsyslog versions below 7.3.0.
+However, only the first action's driverdirectory parameter was actually used.
+This has been cleaned up in 7.3.0, where this now is a module paramter.
+</li>
+</ul>
+<p><b>Action Parameters</b></p>
<ul>
<li><b>server</b><br>Name or address of the MySQL server
<li><b>db</b><br>Database to use
@@ -68,24 +84,18 @@ writiting "mysql" (suggest to use ommysql instead), "firebird" (Firbird
and InterBase), "ingres", "msql", "Oracle", "sqlite", "sqlite3",
"freetds" (for Microsoft SQL and Sybase) and "pgsql" (suggest to use
ompgsql instead).</li>
-<li><b>driverdirectory</b><br>
-Path to the libdbi drivers. Usually,
-you do not need to set it. If you installed libdbi-drivers at a
-non-standard location, you may need to specify the directory here. If
-you are unsure, do <b>not</b> use this configuration directive.
-Usually, everything works just fine.</li>
</ul>
<p><b>Legacy (pre-v6) Configuration Directives</b>:</p>
+<p>It is strongly recommended NOT to use legacy format.
<ul>
-<li><b>$ActionLibdbiDriverDirectory /path/to/dbd/drivers</b>
+<li><i>$ActionLibdbiDriverDirectory /path/to/dbd/drivers</i>
- like the driverdirectory action parameter.
-<li><strong>$ActionLibdbiDriver drivername</strong><br> - like the drivername action parameter.
-<li><span style="font-weight: bold;">$ActionLibdbiHost hostname</span> - like the server action parameter
-The host to connect to.</li>
-<li><b>$ActionLibdbiUserName user</b> - like the uid action parameter
-<li><b>$ActionlibdbiPassword</b> - like the pwd action parameter
-<li><b>$ActionlibdbiDBName db</b> - like the db action parameter
-<li><b>selector line: :omlibdbi:<i>;template</i></b><br>
+<li><i>$ActionLibdbiDriver drivername</i> - like the drivername action parameter
+<li><i>$ActionLibdbiHost hostname</i> - like the server action parameter
+<li><i>$ActionLibdbiUserName user</i> - like the uid action parameter
+<li><i>$ActionlibdbiPassword</i> - like the pwd action parameter
+<li><i>$ActionlibdbiDBName db</i> - like the db action parameter
+<li><i>selector line: :omlibdbi:<code>;template</code></i><br>
executes the recently configured omlibdbi action. The ;template part is
optional. If no template is provided, a default template is used (which
is currently optimized for MySQL - sorry, folks...)</li>
@@ -114,14 +124,14 @@ database "syslog_db" on mysqlsever.example.com. The server is MySQL and
being accessed under the account of "user" with password "pwd" (if you
have empty passwords, just remove the $ActionLibdbiPassword line).<br>
</p>
-<textarea rows="5" cols="60">$ModLoad omlibdbi
+<textarea rows="5" cols="60">module(load="omlibdbi")
*.* action(type="omlibdbi" driver="mysql"
server="mysqlserver.example.com" db="syslog_db"
uid="user" pwd="pwd"
</textarea>
-<p><b>Sample:</b></p>
+<p><b>Legacy Sample:</b></p>
<p>The same as above, but in legacy config format (pre rsyslog-v6):
-<textarea rows="10" cols="60">$ModLoad omlibdbi
+<textarea rows="8" cols="60">$ModLoad omlibdbi
$ActionLibdbiDriver mysql
$ActionLibdbiHost mysqlserver.example.com
$ActionLibdbiUserName user
diff --git a/plugins/omlibdbi/omlibdbi.c b/plugins/omlibdbi/omlibdbi.c
index 8f5fa944..d7f3cf41 100644
--- a/plugins/omlibdbi/omlibdbi.c
+++ b/plugins/omlibdbi/omlibdbi.c
@@ -81,15 +81,36 @@ typedef struct configSettings_s {
uchar *dbName; /* database to use */
} configSettings_t;
static configSettings_t cs;
+uchar *pszFileDfltTplName; /* name of the default template to use */
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+ uchar *dbiDrvrDir; /* where do the dbi drivers reside? */
+ uchar *tplName; /* default template */
+};
+
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
+
/* tables for interfacing with the v6 config system */
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "template", eCmdHdlrGetWord, 0 },
+ { "driverdirectory", eCmdHdlrGetWord, 0 }
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "server", eCmdHdlrGetWord, 1 },
{ "db", eCmdHdlrGetWord, 1 },
{ "uid", eCmdHdlrGetWord, 1 },
{ "pwd", eCmdHdlrGetWord, 1 },
- { "driverdirectory", eCmdHdlrGetWord, 0 },
{ "driver", eCmdHdlrGetWord, 1 },
{ "template", eCmdHdlrGetWord, 0 }
};
@@ -99,6 +120,20 @@ static struct cnfparamblk actpblk =
actpdescr
};
+/* this function gets the default template. It coordinates action between
+ * old-style and new-style configuration parts.
+ */
+static inline uchar*
+getDfltTpl(void)
+{
+ if(loadModConf != NULL && loadModConf->tplName != NULL)
+ return loadModConf->tplName;
+ else if(pszFileDfltTplName == NULL)
+ return (uchar*)" StdDBFmt";
+ else
+ return pszFileDfltTplName;
+}
+
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
@@ -144,7 +179,6 @@ static void closeConn(instanceData *pData)
BEGINfreeInstance
CODESTARTfreeInstance
closeConn(pData);
- free(pData->dbiDrvrDir);
free(pData->drvrName);
free(pData->host);
free(pData->usrName);
@@ -302,6 +336,79 @@ CODESTARTdoAction
ENDdoAction
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ pModConf->tplName = NULL;
+ bLegacyCnfModGlobalsPermitted = 1;
+ENDbeginCnfLoad
+
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "omlibdbi: error processing "
+ "module config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for omlibdbi:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(modpblk.descr[i].name, "template")) {
+ loadModConf->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ if(pszFileDfltTplName != NULL) {
+ errmsg.LogError(0, RS_RET_DUP_PARAM, "omlibdbi: warning: default template "
+ "was already set via legacy directive - may lead to inconsistent "
+ "results.");
+ }
+ } else if(!strcmp(modpblk.descr[i].name, "driverdirectory")) {
+ loadModConf->dbiDrvrDir = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("omlibdbi: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+ bLegacyCnfModGlobalsPermitted = 0;
+finalize_it:
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ loadModConf = NULL; /* done loading */
+ /* free legacy config vars */
+ free(pszFileDfltTplName);
+ pszFileDfltTplName = NULL;
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ free(pModConf->tplName);
+ free(pModConf->dbiDrvrDir);
+ENDfreeCnf
+
+
+
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -311,6 +418,7 @@ setInstParamDefaults(instanceData *pData)
BEGINnewActInst
struct cnfparamvals *pvals;
+ uchar *tplToUse;
int i;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
@@ -332,28 +440,19 @@ CODESTARTnewActInst
pData->usrName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "pwd")) {
pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "driverdirectory")) {
- pData->dbiDrvrDir = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "driver")) {
pData->drvrName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
- dbgprintf("ommysql: program error, non-handled "
+ dbgprintf("omlibdbi: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
}
}
- if(pData->tplName == NULL) {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*) strdup(" StdDBFmt"),
- OMSR_RQD_TPL_OPT_SQL));
- } else {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0,
- (uchar*) strdup((char*) pData->tplName),
- OMSR_RQD_TPL_OPT_SQL));
- }
+ tplToUse = (pData->tplName == NULL) ? (uchar*)strdup((char*)getDfltTpl()) : pData->tplName;
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_RQD_TPL_OPT_SQL));
CODE_STD_FINALIZERnewActInst
-dbgprintf("XXXX: added param, iRet %d\n", iRet);
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
@@ -380,19 +479,17 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* NULL values are supported because drivers have different needs.
* They will err out on connect. -- rgerhards, 2008-02-15
*/
- if(cs.host != NULL)
- if((pData->host = (uchar*) strdup((char*)cs.host)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ if(cs.host != NULL)
+ CHKmalloc(pData->host = (uchar*) strdup((char*)cs.host));
if(cs.usrName != NULL)
- if((pData->usrName = (uchar*) strdup((char*)cs.usrName)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- if(cs.dbName != NULL)
- if((pData->dbName = (uchar*) strdup((char*)cs.dbName)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- if(cs.pwd != NULL)
- if((pData->pwd = (uchar*) strdup((char*)cs.pwd)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ CHKmalloc(pData->usrName = (uchar*) strdup((char*)cs.usrName));
+ if(cs.dbName != NULL)
+ CHKmalloc(pData->dbName = (uchar*) strdup((char*)cs.dbName));
+ if(cs.pwd != NULL)
+ CHKmalloc(pData->pwd = (uchar*) strdup((char*)cs.pwd));
if(cs.dbiDrvrDir != NULL)
- if((pData->dbiDrvrDir = (uchar*) strdup((char*)cs.dbiDrvrDir)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
- CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_RQD_TPL_OPT_SQL, (uchar*) " StdDBFmt"));
-
+ CHKmalloc(loadModConf->dbiDrvrDir = (uchar*) strdup((char*)cs.dbiDrvrDir));
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_RQD_TPL_OPT_SQL, getDfltTpl()));
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -413,6 +510,8 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
@@ -444,7 +543,7 @@ INITLegCnfVars
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbidriverdirectory", 0, eCmdHdlrGetWord, NULL, &cs.dbiDrvrDir, STD_LOADABLE_MODULE_ID));
+ CHKiRet(regCfSysLineHdlr2((uchar *)"actionlibdbidriverdirectory", 0, eCmdHdlrGetWord, NULL, &cs.dbiDrvrDir, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbidriver", 0, eCmdHdlrGetWord, NULL, &cs.drvrName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbihost", 0, eCmdHdlrGetWord, NULL, &cs.host, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbiusername", 0, eCmdHdlrGetWord, NULL, &cs.usrName, STD_LOADABLE_MODULE_ID));
diff --git a/runtime/debug.c b/runtime/debug.c
index edc4a255..307a8bb8 100644
--- a/runtime/debug.c
+++ b/runtime/debug.c
@@ -927,12 +927,12 @@ dbgprint(obj_t *pObj, char *pszMsg, size_t lenMsg)
pszObjName = obj.GetName(pObj);
}
-// pthread_mutex_lock(&mutdbgprint);
-// pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
+ pthread_mutex_lock(&mutdbgprint);
+ pthread_cleanup_push(dbgMutexCancelCleanupHdlr, &mutdbgprint);
do_dbgprint(pszObjName, pszMsg, lenMsg);
-// pthread_cleanup_pop(1);
+ pthread_cleanup_pop(1);
}
#pragma GCC diagnostic warning "-Wempty-body"
diff --git a/runtime/stream.c b/runtime/stream.c
index 742799d2..064cb42a 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -74,11 +74,12 @@ DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
/* forward definitions */
-static rsRetVal strmFlushInternal(strm_t *pThis);
+static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
static void *asyncWriterThread(void *pPtr);
-static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush);
+static rsRetVal doZipFinish(strm_t *pThis);
static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
@@ -341,7 +342,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
(pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName);
if(pThis->tOperationsMode != STREAMMODE_READ) {
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
if(pThis->bAsyncWrite) {
strmWaitAsyncWriterDone(pThis);
}
@@ -675,6 +676,7 @@ BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro!
pThis->fd = -1;
pThis->fdDir = -1;
pThis->iUngetC = -1;
+ pThis->bVeryReliableZip = 0;
pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
pThis->tOpenMode = 0600;
@@ -777,6 +779,10 @@ stopWriter(strm_t *pThis)
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
int i;
CODESTARTobjDestruct(strm)
+ /* we need to stop the ZIP writer */
+ if(pThis->iZipLevel) {
+ doZipFinish(pThis);
+ }
if(pThis->bAsyncWrite)
/* Note: mutex will be unlocked in stopWriter! */
d_pthread_mutex_lock(&pThis->mut);
@@ -919,14 +925,14 @@ finalize_it:
/* write memory buffer to a stream object.
*/
static inline rsRetVal
-doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
DEFiRet;
ASSERT(pThis != NULL);
if(pThis->iZipLevel) {
- CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf, bFlush));
} else {
/* write without zipping */
CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
@@ -971,7 +977,7 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
* the background thread. -- rgerhards, 2009-07-07
*/
static rsRetVal
-strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlushZip)
{
DEFiRet;
@@ -990,7 +996,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
if(pThis->bAsyncWrite) {
CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
} else {
- CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
+ CHKiRet(doWriteInternal(pThis, pBuf, lenBuf, bFlushZip));
}
@@ -1030,7 +1036,7 @@ asyncWriterThread(void *pPtr)
}
if(bTimedOut && pThis->iBufPtr > 0) {
/* if we timed out, we need to flush pending data */
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
bTimedOut = 0;
continue; /* now we should have data */
}
@@ -1056,7 +1062,7 @@ asyncWriterThread(void *pPtr)
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
- doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state
// TODO: error check????? 2009-07-06
--pThis->iCnt;
@@ -1166,63 +1172,101 @@ finalize_it:
* rgerhards, 2009-06-04
*/
static rsRetVal
-doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
- z_stream zstrm;
int zRet; /* zlib return state */
sbool bzInitDone = RSFALSE;
DEFiRet;
+ unsigned outavail;
assert(pThis != NULL);
assert(pBuf != NULL);
- /* allocate deflate state */
- zstrm.zalloc = Z_NULL;
- zstrm.zfree = Z_NULL;
- zstrm.opaque = Z_NULL;
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- /* see note in file header for the params we use with deflateInit2() */
- zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
- ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ if(!pThis->bzInitDone) {
+ /* allocate deflate state */
+ pThis->zstrm.zalloc = Z_NULL;
+ pThis->zstrm.zfree = Z_NULL;
+ pThis->zstrm.opaque = Z_NULL;
+ /* see note in file header for the params we use with deflateInit2() */
+ zRet = zlibw.DeflateInit2(&pThis->zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pThis->bzInitDone = RSTRUE;
}
bzInitDone = RSTRUE;
/* now doing the compression */
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- zstrm.avail_in = lenBuf;
+ pThis->zstrm.next_in = (Bytef*) pBuf;
+ pThis->zstrm.avail_in = lenBuf;
/* run deflate() on buffer until everything has been compressed */
do {
- DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
- zstrm.avail_out = pThis->sIOBufSize;
- zstrm.next_out = pThis->pZipBuf;
- zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
- DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
- assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
- if(zstrm.avail_out == pThis->sIOBufSize)
- break; /* this is valid, indicates end of compression --> see zlib howto */
- CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
- } while (zstrm.avail_out == 0);
- assert(zstrm.avail_in == 0); /* all input will be used */
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, bFlush ? Z_SYNC_FLUSH : Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail =pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
+ }
+ } while (pThis->zstrm.avail_out == 0);
finalize_it:
- if(bzInitDone) {
- zRet = zlibw.DeflateEnd(&zstrm);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
+ if(pThis->bzInitDone && pThis->bVeryReliableZip) {
+ doZipFinish(pThis);
+ }
+ RETiRet;
+}
+
+
+
+/* finish zlib buffer, to be called before closing the ZIP file (if
+ * running in stream mode).
+ */
+static rsRetVal
+doZipFinish(strm_t *pThis)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ assert(pThis != NULL);
+
+ if(!pThis->bzInitDone) {
+ FINALIZE;
+ }
+
+dbgprintf("AAAA: doZipFinish() called\n");
+ pThis->zstrm.avail_in = 0;
+ /* run deflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail = pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
}
+ } while (pThis->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = zlibw.DeflateEnd(&pThis->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
}
+ pThis->bzInitDone = 0;
RETiRet;
}
-
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
*/
static rsRetVal
-strmFlushInternal(strm_t *pThis)
+strmFlushInternal(strm_t *pThis, int bFlushZip)
{
DEFiRet;
@@ -1232,7 +1276,7 @@ strmFlushInternal(strm_t *pThis)
(long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : "");
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
- iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr, bFlushZip);
}
RETiRet;
@@ -1254,7 +1298,7 @@ strmFlush(strm_t *pThis)
if(pThis->bAsyncWrite)
d_pthread_mutex_lock(&pThis->mut);
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 1));
finalize_it:
if(pThis->bAsyncWrite)
@@ -1277,7 +1321,7 @@ static rsRetVal strmSeek(strm_t *pThis, off64_t offs)
if(pThis->fd == -1) {
CHKiRet(strmOpenFile(pThis));
} else {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
long long i;
DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs);
@@ -1320,7 +1364,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
/* we now always have space for one character, so we simply copy it */
*(pThis->pIOBuf + pThis->iBufPtr) = c;
@@ -1390,7 +1434,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iOffset = 0;
do {
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
if(iWrite > lenBuf)
@@ -1405,7 +1449,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
* write it. This seems more natural than waiting (hours?) for the next message...
*/
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
finalize_it:
@@ -1433,6 +1477,7 @@ DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
DEFpropSetMeth(strm, sType, strmType_t)
DEFpropSetMeth(strm, iZipLevel, int)
+DEFpropSetMeth(strm, bVeryReliableZip, int)
DEFpropSetMeth(strm, bSync, int)
DEFpropSetMeth(strm, sIOBufSize, size_t)
DEFpropSetMeth(strm, iSizeLimit, off_t)
@@ -1564,7 +1609,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis));
objSerializeSCALAR(pStrm, iCurrFNum, INT);
@@ -1757,6 +1802,7 @@ CODESTARTobjQueryInterface(strm)
pIf->SettOpenMode = strmSettOpenMode;
pIf->SetsType = strmSetsType;
pIf->SetiZipLevel = strmSetiZipLevel;
+ pIf->SetbVeryReliableZip = strmSetbVeryReliableZip;
pIf->SetbSync = strmSetbSync;
pIf->SetsIOBufSize = strmSetsIOBufSize;
pIf->SetiSizeLimit = strmSetiSizeLimit;
diff --git a/runtime/stream.h b/runtime/stream.h
index a01929f2..fdfefaa3 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -124,6 +124,8 @@ typedef struct strm_s {
sbool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
sbool bStopWriter; /* shall writer thread terminate? */
sbool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
+ sbool bzInitDone; /* did we do an init of zstrm already? */
+ sbool bVeryReliableZip; /* shall we write interim headers to create a very reliable ZIP file? */
int iFlushInterval; /* flush in which interval - 0, no flushing */
pthread_mutex_t mut;/* mutex for flush in async mode */
pthread_cond_t notFull;
@@ -132,6 +134,7 @@ typedef struct strm_s {
unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
short iCnt; /* current nbr of elements in buffer */
+ z_stream zstrm; /* zip stream to use */
struct {
uchar *pBuf;
size_t lenBuf;
@@ -181,8 +184,10 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*);
/* v6 added */
rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, int mode);
+ /* v7 added 2012-09-14 */
+ INTERFACEpropSetMeth(strm, bVeryReliableZip, int);
ENDinterface(strm)
-#define strmCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
+#define strmCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
/* prototypes */
diff --git a/tools/omfile.c b/tools/omfile.c
index 5b0bfb46..c7e0dc25 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -156,6 +156,7 @@ typedef struct _instanceData {
int iFlushInterval; /* how fast flush buffer on inactivity? */
sbool bFlushOnTXEnd; /* flush write buffers when transaction has ended? */
sbool bUseAsyncWriter; /* use async stream writer? */
+ sbool bVeryRobustZip;
} instanceData;
@@ -205,6 +206,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "ziplevel", eCmdHdlrInt, 0 }, /* legacy: omfileziplevel */
{ "flushinterval", eCmdHdlrInt, 0 }, /* legacy: omfileflushinterval */
{ "asyncwriting", eCmdHdlrBinary, 0 }, /* legacy: omfileasyncwriting */
+ { "veryrobustzip", eCmdHdlrBinary, 0 },
{ "flushontxend", eCmdHdlrBinary, 0 }, /* legacy: omfileflushontxend */
{ "iobuffersize", eCmdHdlrSize, 0 }, /* legacy: omfileiobuffersize */
{ "dirowner", eCmdHdlrUID, 0 }, /* legacy: dirowner */
@@ -269,7 +271,8 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tflush on TX end=%d\n", pData->bFlushOnTXEnd);
dbgprintf("\tflush interval=%d\n", pData->iFlushInterval);
dbgprintf("\tfile cache size=%d\n", pData->iDynaFileCacheSize);
- dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "yes" : "no");
+ dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "on" : "off");
+ dbgprintf("\tvery robust zip: %s\n", pData->bCreateDirs ? "on" : "off");
dbgprintf("\tfile owner %d, group %d\n", (int) pData->fileUID, (int) pData->fileGID);
dbgprintf("\tdirectory owner %d, group %d\n", (int) pData->dirUID, (int) pData->dirGID);
dbgprintf("\tdir create mode 0%3.3o, file create mode 0%3.3o\n",
@@ -292,7 +295,7 @@ setLegacyDfltTpl(void __attribute__((unused)) *pVal, uchar* newVal)
if(loadModConf != NULL && loadModConf->tplName != NULL) {
free(newVal);
- errmsg.LogError(0, RS_RET_ERR, "omfile default template already set via module "
+ errmsg.LogError(0, RS_RET_ERR, "omfile: default template already set via module "
"global parameter - can no longer be changed");
ABORT_FINALIZE(RS_RET_ERR);
}
@@ -536,6 +539,7 @@ prepareFile(instanceData *pData, uchar *newFileName)
CHKiRet(strm.SetFName(pData->pStrm, szBaseName, ustrlen(szBaseName)));
CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName)));
CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel));
+ CHKiRet(strm.SetbVeryReliableZip(pData->pStrm, pData->bVeryRobustZip));
CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize));
CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND));
CHKiRet(strm.SettOpenMode(pData->pStrm, cs.fCreateMode));
@@ -843,6 +847,7 @@ BEGINendTransaction
CODESTARTendTransaction
/* Note: pStrm may be NULL if there was an error opening the stream */
if(pData->bFlushOnTXEnd && pData->pStrm != NULL) {
+dbgprintf("AAAA: flusing stream, endTx\n");
CHKiRet(strm.Flush(pData->pStrm));
}
finalize_it:
@@ -854,6 +859,7 @@ CODESTARTdoAction
DBGPRINTF("file to log to: %s\n", pData->f_fname);
CHKiRet(writeFile(ppString, iMsgOpts, pData));
if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) {
+dbgprintf("AAAA: flusing stream, in Tx\n");
CHKiRet(strm.Flush(pData->pStrm));
}
finalize_it:
@@ -878,6 +884,7 @@ setInstParamDefaults(instanceData *pData)
pData->bCreateDirs = 1;
pData->bSyncFile = 0;
pData->iZipLevel = 0;
+ pData->bVeryRobustZip = 0;
pData->bFlushOnTXEnd = FLUSHONTX_DFLT;
pData->iIOBufSize = IOBUF_DFLT_SIZE;
pData->iFlushInterval = FLUSH_INTRVL_DFLT;
@@ -915,6 +922,8 @@ CODESTARTnewActInst
pData->iZipLevel = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "flushinterval")) {
pData->iFlushInterval = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "veryrobustzip")) {
+ pData->bVeryRobustZip = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "asyncwriting")) {
pData->bUseAsyncWriter = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "flushontxend")) {
@@ -1064,6 +1073,7 @@ CODESTARTparseSelectorAct
pData->iIOBufSize = (int) cs.iIOBufSize;
pData->iFlushInterval = cs.iFlushInterval;
pData->bUseAsyncWriter = cs.bUseAsyncWriter;
+ pData->bVeryRobustZip = 0; /* cannot be specified via legacy conf */
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -1090,7 +1100,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
cs.bUseAsyncWriter = USE_ASYNCWRITER_DFLT;
free(pszFileDfltTplName);
pszFileDfltTplName = NULL;
-
return RS_RET_OK;
}