summaryrefslogtreecommitdiffstats
path: root/server/parser/parsethread.c
blob: ff800731346df9c63a2038b6a200ffd6383d1c8b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
/*
 * Copyright (C) 2009 Red Hat Inc.
 *
 * This application is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License as published by the Free
 * Software Foundation; version 2.
 *
 * This application is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 */

/**
 * @file   parsethread.c
 * @author David Sommerseth <davids@redhat.com>
 * @date   Thu Oct 15 11:52:10 2009
 *
 * @brief  Contains the "main" function which a parser threads runs
 *
 *
 */

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <signal.h>
#include <pthread.h>
#include <libgen.h>
#include <errno.h>
#include <assert.h>

#include <eurephia_nullsafe.h>
#include <parsethread.h>
#include <pgsql.h>
#include <log.h>
#include <threadinfo.h>
#include <statuses.h>


/**
 * Does the same job as 'mkdir -p', but it expects a complete filename as input, and it will
 * extract the directory path from that filename
 *
 * @param fname  Full filename containing the directory the report will reside.
 *
 * @return Returns 1 on success, otherwise -1
 */
static int make_report_dir(LogContext *log, const char *fname) {
	char *fname_cp = NULL, *dname = NULL, *chkdir = NULL;
	char *tok = NULL, *saveptr = NULL;
	int ret = 0;
	struct stat info;

	if( !fname ) {
		return 0;
	}

	fname_cp = strdup(fname);
	assert( fname_cp != NULL );
	dname = dirname(fname_cp);
	chkdir = malloc_nullsafe(log, strlen(dname)+8);

	if( dname[0] == '/' ) {
		chkdir[0] = '/';
	}

	// Traverse the directory path, and make sure the directory exists
	tok = strtok_r(dname, "/", &saveptr);
	while( tok ) {
		strcat(chkdir, tok);
		strcat(chkdir, "/");

		errno = 0;
		// Check if directory exists
		if( (stat(chkdir, &info) < 0) ) {
			switch( errno ) {
			case ENOENT: // If the directory do not exist, create it
				if( mkdir(chkdir, 0755) < 0 ) {
					// If creating dir failed, report error
					writelog(log, LOG_ALERT,
						 "Could not create directory: %s (%s)",
						 chkdir, strerror(errno));
					ret = -1;
					goto exit;
				}
				break;
			default: // If other failure, report that and exit
				writelog(log, LOG_ALERT,
					 "Could not access directory: %s (%s)",
					 chkdir, strerror(errno));
				ret = -1;
				goto exit;
			}
		}
		// Goto next path element
		tok = strtok_r(NULL, "/", &saveptr);
	}
	ret = 1;
 exit:
	free_nullsafe(fname_cp);
	free_nullsafe(chkdir);

	return ret;
}


/**
 * Builds up a proper full path of where to save the report.
 *
 * @param destdir   Destination directory for all reports
 * @param fname     Report filename, containing hostname of the reporter
 * @param rterid    rteval run ID
 *
 * @return Returns a pointer to a string with the new full path filename on success, otherwise NULL.
 */
static char *get_destination_path(LogContext *log, const char *destdir,
				  parseJob_t *job, const int rterid)
{
        char *newfname = NULL;
        int retlen = 0;

        if( !job || rterid < 0 ) {
                return NULL;
        }

        retlen = strlen_nullsafe(job->clientid) + strlen(destdir) + 24;
        newfname = malloc_nullsafe(log, retlen+2);

        snprintf(newfname, retlen, "%s/%s/report-%i.xml", destdir, job->clientid, rterid);

        return newfname;
}


/**
 * Checks if the file size of the given file is below the given max size value.
 *
 * @param thrdata  Pointer to a threadData_t structure with log context and max_report_size setting
 * @param fname    Filename of the file to check
 *
 * @return Returns 1 if file is within the limit, otherwise 0.  On errors -1 is returned.
 */
inline int check_filesize(threadData_t *thrdata, const char *fname) {
	struct stat info;

	if( !fname ) {
		return 0;
	}

	errno = 0;
	if( (stat(fname, &info) < 0) ) {
		writelog(thrdata->dbc->log, LOG_ERR, "Failed to check report file '%s': %s",
			 fname, strerror(errno));
		return -1;
	}

	return (info.st_size <= thrdata->max_report_size);
}


/**
 * The core parse function.  Parses an XML file and stores it in the database according to
 * the xmlparser.xsl template.
 *
 * @param thrdata  Pointer to a threadData_t structure with database connection, log context, settings, etc
 * @param job      Pointer to a parseJob_t structure containing the job information
 *
 * @return Return values:
 * @code
 *          STAT_SUCCESS  : Successfully registered report
 *          STAT_FTOOBIG  : XML report file is too big
 *          STAT_XMLFAIL  : Could not parse the XML report file
 *          STAT_SYSREG   : Failed to register the system into the systems or systems_hostname tables
 *          STAT_RTERIDREG: Failed to get a new rterid value
 *          STAT_GENDB    : Failed to start an SQL transaction (BEGIN)
 *          STAT_RTEVRUNS : Failed to register the rteval run into rtevalruns or rtevalruns_details
 *          STAT_CYCLIC   : Failed to register the data into cyclic_statistics or cyclic_rawdata tables
 *          STAT_REPMOVE  : Failed to move the report file
 * @endcode
 */
inline int parse_report(threadData_t *thrdata, parseJob_t *job)
{
	int syskey = -1, rterid = -1;
	int rc = -1;
	xmlDoc *repxml = NULL;
	char *destfname;

	// Check file size - and reject too big files
	if( check_filesize(thrdata, job->filename) == 0 ) {
		writelog(thrdata->dbc->log, LOG_ERR,
			 "Report file '%s' is too big, rejected", job->filename);
		return STAT_FTOOBIG;
	}


	repxml = xmlParseFile(job->filename);
	if( !repxml ) {
		writelog(thrdata->dbc->log, LOG_ERR,
			 "Could not parse XML file: %s", job->filename);
	        return STAT_XMLFAIL;
	}

	pthread_mutex_lock(thrdata->mtx_sysreg);
	syskey = db_register_system(thrdata->dbc, thrdata->xslt, repxml);
	if( syskey < 0 ) {
		writelog(thrdata->dbc->log, LOG_ERR,
			 "Failed to register system (XML file: %s)", job->filename);
		rc = STAT_SYSREG;
		goto exit;

	}
	rterid = db_get_new_rterid(thrdata->dbc);
	if( rterid < 0 ) {
		writelog(thrdata->dbc->log, LOG_ERR,
			 "Failed to register rteval run (XML file: %s)", job->filename);
		rc = STAT_RTERIDREG;
		goto exit;
	}
	pthread_mutex_unlock(thrdata->mtx_sysreg);

	if( db_begin(thrdata->dbc) < 1 ) {
		rc = STAT_GENDB;
		goto exit;
	}

	// Create a new filename of where to save the report
	destfname = get_destination_path(thrdata->dbc->log, thrdata->destdir, job, rterid);
	if( !destfname ) {
		writelog(thrdata->dbc->log, LOG_ERR,
			 "Failed to generate local report filename for (%i) %s",
			job->submid, job->filename);
		db_rollback(thrdata->dbc);
		rc = STAT_UNKNFAIL;
		goto exit;
	}

	if( db_register_rtevalrun(thrdata->dbc, thrdata->xslt, repxml, job->submid,
				  syskey, rterid, destfname) < 0 ) {
		writelog(thrdata->dbc->log, LOG_ERR,
			 "Failed to register rteval run (XML file: %s)",
			 job->filename);
		db_rollback(thrdata->dbc);
		rc = STAT_RTEVRUNS;
		goto exit;
	}

	if( db_register_cyclictest(thrdata->dbc, thrdata->xslt, repxml, rterid) != 1 ) {
		writelog(thrdata->dbc->log, LOG_ERR,
			 "Failed to register cyclictest data (XML file: %s)",
			 job->filename);
		db_rollback(thrdata->dbc);
		rc = STAT_CYCLIC;
		goto exit;
	}

	// When all database registrations are done, move the file to it's right place
	if( make_report_dir(thrdata->dbc->log, destfname) < 1 ) { // Make sure report directory exists
		db_rollback(thrdata->dbc);
		rc = STAT_REPMOVE;
		goto exit;
	}

	if( rename(job->filename, destfname) < 0 ) { // Move the file
		writelog(thrdata->dbc->log, LOG_ERR,
			 "Failed to move report file from %s to %s (%s)",
			 job->filename, destfname, strerror(errno));
		db_rollback(thrdata->dbc);
		rc = STAT_REPMOVE;
		goto exit;
	}
	free_nullsafe(destfname);

	rc = STAT_SUCCESS;
	db_commit(thrdata->dbc);

 exit:
	xmlFreeDoc(repxml);
	return rc;
}


/**
 * The parser thread.  This thread lives until a shutdown notification is received.  It pulls
 * messages on a POSIX MQ based message queue containing submission ID and full path to an XML
 * report to be parsed.
 *
 * @param thrargs Contains database connection, XSLT stylesheet, POSXI MQ descriptor, etc
 *
 * @return Returns 0 on successful operation, otherwise 1 on errors.
 */
void *parsethread(void *thrargs) {
	threadData_t *args = (threadData_t *) thrargs;
	parseJob_t jobinfo;
	long exitcode = 0;

	writelog(args->dbc->log, LOG_DEBUG, "[Thread %i] Starting", args->id);
	pthread_mutex_lock(args->mtx_thrcnt);
	(*(args->threadcount)) += 1;
	pthread_mutex_unlock(args->mtx_thrcnt);

	// Polling loop
	while( *(args->shutdown) == 0 ) {
		int len = 0;
		unsigned int prio = 0;

		// Check if the database connection is alive before pulling any messages
		if( db_ping(args->dbc) != 1 ) {
			writelog(args->dbc->log, LOG_EMERG,
				 "[Thread %i] Lost database conneciting: Shutting down thread.",
				 args->id);

			if( *(args->threadcount) <= 1 ) {
				writelog(args->dbc->log, LOG_EMERG,
					 "No more worker threads available.  "
					 "Signaling for complete shutdown!");
				kill(getpid(), SIGUSR1);
			}
			exitcode = 1;
			goto exit;
		}

		// Retrieve a parse job from the message queue
		memset(&jobinfo, 0, sizeof(parseJob_t));
		errno = 0;
		len = mq_receive(args->msgq, (char *)&jobinfo, sizeof(parseJob_t), &prio);
		if( (len < 0) && errno != EAGAIN ) {
			writelog(args->dbc->log, LOG_CRIT,
				 "Could not receive the message from queue: %s",
				 strerror(errno));
			pthread_exit((void *) 1);
		}

		// Ignore whatever message if the shutdown flag is set.
		if( *(args->shutdown) != 0 ) {
			break;
		}

		// If we have a message, then process the parse job
		if( (errno != EAGAIN) && (len > 0) ) {
			int res = 0;

			writelog(args->dbc->log, LOG_DEBUG,
				 "** Thread %i: Job recieved, submid: %i",
				 args->id, jobinfo.submid);

			// Mark the job as "in progress", if successful update, continue parsing it
			if( db_update_submissionqueue(args->dbc, jobinfo.submid, STAT_INPROG) ) {
				res = parse_report(args, &jobinfo);
				// Set the status for the submission
				db_update_submissionqueue(args->dbc, jobinfo.submid, res);
			} else {
				writelog(args->dbc->log, LOG_CRIT,
					 "Failed to mark submid %i as STAT_INPROG",
					 jobinfo.submid);
			}
		}
	}
	writelog(args->dbc->log, LOG_DEBUG, "[Thread %i] Shut down", args->id);
 exit:
	pthread_mutex_lock(args->mtx_thrcnt);
	(*(args->threadcount)) -= 1;
	pthread_mutex_unlock(args->mtx_thrcnt);

	pthread_exit((void *) exitcode);
}