diff options
author | Michal Minar <miminar@redhat.com> | 2014-04-14 10:06:05 +0200 |
---|---|---|
committer | Michal Minar <miminar@redhat.com> | 2014-06-25 15:17:17 +0200 |
commit | 477583819f4bd419476a7cf3c4c8ae005310796e (patch) | |
tree | ff99ead5c6e585853bb18163f40ec602f72389fd | |
parent | dd58a5e76e9708981dbca3193830bb41b069db11 (diff) | |
download | openlmi-providers-477583819f4bd419476a7cf3c4c8ae005310796e.tar.gz openlmi-providers-477583819f4bd419476a7cf3c4c8ae005310796e.tar.xz openlmi-providers-477583819f4bd419476a7cf3c4c8ae005310796e.zip |
added job manager library
-rw-r--r-- | CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/libs/jobmanager/CMakeLists.txt | 46 | ||||
-rw-r--r-- | src/libs/jobmanager/job_manager.c | 3323 | ||||
-rw-r--r-- | src/libs/jobmanager/job_manager.h | 483 | ||||
-rw-r--r-- | src/libs/jobmanager/lmi_job.c | 2094 | ||||
-rw-r--r-- | src/libs/jobmanager/lmi_job.h | 605 | ||||
-rw-r--r-- | src/libs/jobmanager/lmi_job_serialization.c | 172 | ||||
-rw-r--r-- | src/libs/jobmanager/openlmijobmanager.pc.in | 11 |
9 files changed, 6739 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 37eae82..0513c46 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,6 +43,7 @@ option(WITH-REALMD "Build RealmD provider" ON) option(WITH-PCP "Build PCP provider" ON) option(WITH-INDMANAGER "Build indication manager" ON) option(WITH-INDSENDER "Build indication sender" ON) +option(WITH-JOBMANAGER "Build job manager" ON) option(WITH-SOFTWARE "Build software provider" ON) option(WITH-SOFTWARE-DBUS "Use experimental dbus implementation of software provider" OFF) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5157c3d..0b8a8ff 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -50,6 +50,10 @@ if (WITH-INDSENDER) add_subdirectory(libs/indsender) endif (WITH-INDSENDER) +if (WITH-JOBMANAGER) + add_subdirectory(libs/jobmanager) +endif (WITH-JOBMANAGER) + if (WITH-SOFTWARE) if (WITH-SOFTWARE-DBUS) add_subdirectory(software-dbus) diff --git a/src/libs/jobmanager/CMakeLists.txt b/src/libs/jobmanager/CMakeLists.txt new file mode 100644 index 0000000..6a41bf0 --- /dev/null +++ b/src/libs/jobmanager/CMakeLists.txt @@ -0,0 +1,46 @@ +# don't be pedantic over using pointers to functions (needed for marshallers) +STRING(REGEX REPLACE "-pedantic *" "" NEW_C_FLAGS ${CMAKE_C_FLAGS}) +set(CMAKE_C_FLAGS ${NEW_C_FLAGS}) + +pkg_check_modules(GOBJECT REQUIRED gobject-2.0) +pkg_check_modules(GIO REQUIRED gio-2.0) +pkg_check_modules(JSON-GLib REQUIRED json-glib-1.0) + +add_library(openlmijobmanager SHARED + lmi_job.c + lmi_job_serialization.c + job_manager.c +) + +set(OPENLMIJOBMANAGER_VERSION_MAJOR 0) +set(OPENLMIJOBMANAGER_VERSION_MINOR 0) +set(OPENLMIJOBMANAGER_VERSION_PATCH 1) +set(OPENLMIJOBMANAGER_VERSION "${OPENLMIJOBMANAGER_VERSION_MAJOR}.${OPENLMIJOBMANAGER_VERSION_MINOR}.${OPENLMIJOBMANAGER_VERSION_PATCH}") + +set_target_properties(openlmijobmanager PROPERTIES VERSION ${OPENLMIJOBMANAGER_VERSION}) +set_target_properties(openlmijobmanager PROPERTIES SOVERSION ${OPENLMIJOBMANAGER_VERSION_MAJOR}) + +#include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMPI_INCLUDE_DIR} ${GLIB_INCLUDE_DIRS} ${LIBUSER_INCLUDE_DIRS} ${CMAKE_SOURCE_DIR}/src/indmanager) +include_directories( + ${GOBJECT_INCLUDE_DIRS} + ${GIO_INCLUDE_DIRS} + ${JSON-GLib_INCLUDE_DIRS} + ${CMAKE_SOURCE_DIR}/src/libs/indsender +) + +target_link_libraries(openlmijobmanager + openlmicommon + openlmiindsender + ${GLIB_LIBRARIES} + ${GOBJECT_LIBRARIES} + ${GIO_LIBRARIES} + ${JSON-GLib_LIBRARIES} + pthread + dl +) + +install(TARGETS openlmijobmanager DESTINATION lib${LIB_SUFFIX}) +install(FILES job_manager.h lmi_job.h DESTINATION include/openlmi) + +configure_file(openlmijobmanager.pc.in ${CMAKE_CURRENT_BINARY_DIR}/openlmijobmanager.pc @ONLY) +install(FILES ${CMAKE_CURRENT_BINARY_DIR}/openlmijobmanager.pc DESTINATION lib${LIB_SUFFIX}/pkgconfig) diff --git a/src/libs/jobmanager/job_manager.c b/src/libs/jobmanager/job_manager.c new file mode 100644 index 0000000..d2869b2 --- /dev/null +++ b/src/libs/jobmanager/job_manager.c @@ -0,0 +1,3323 @@ +/* + * Copyright (C) 2014 Red Hat, Inc. All rights reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: Michal Minar <miminar@redhat.com> + */ + +#include <ctype.h> +#include <pthread.h> +#include <stdint.h> +#include <stdio.h> +#include <string.h> +#include "openlmi.h" +#include "openlmi-utils.h" +#include "ind_sender.h" +#include "job_manager.h" + +#define JOB_MANAGER_THREAD_NAME "JobManager" +#define MINIMUM_TIME_BEFORE_REMOVAL (5*60) +#define DEFAULT_TIME_BEFORE_REMOVAL MINIMUM_TIME_BEFORE_REMOVAL +#define DEFAULT_CIM_CLASS_NAME "LMI_ConcreteJob" +#define JOB_FILE_EXTENSION ".json" +#define JOB_FILE_NAME_REGEX ("^(.*)-(\\d+)\\" JOB_FILE_EXTENSION "$") +#define CIM_INST_METHOD_CALL_CLASS_NAME "CIM_InstMethodCall" +#define CIM_ERROR_CLASS_NAME "CIM_Error" + +#define CIM_CONCRETE_JOB_COMMUNICATION_STATUS_NOT_AVAILABLE 0 +#define CIM_CONCRETE_JOB_LOCAL_OR_UTC_TIME_UTC_TIME 2 +#define CIM_ERROR_SOURCE_FORMAT_OBJECT_PATH 2 +#define INST_METHOD_CALL_RETURN_VALUE_TYPE_0 2 + +typedef struct _JobTypeInfo { + const gchar *cim_class_name; + JobToCimInstanceCallback convert_func; + MakeJobParametersCallback make_job_params_func; + MethodResultValueTypeEnum return_value_type; + JobProcessCallback process_func; + gboolean running_job_cancellable; + gboolean use_persistent_storage; +}JobTypeInfo; + +typedef struct _PendingJobKey { + guint priority; + guint number; +}PendingJobKey; + +typedef enum { + JOB_ACTION_REMOVE, + JOB_ACTION_LAST +}JobActionEnum; + +typedef struct _CalendarEvent { + /* time of event's execution in milliseconds since epoch */ + guint64 time; + LmiJob *job; + JobActionEnum action; +}CalendarEvent; + +typedef struct _JobModifiedParams { + LmiJob *job; + LmiJobPropEnum property; + GVariant *old_value; +}JobModifiedParams; + +#define JM_CRITICAL_CRITICAL_BEGIN \ + { \ + pthread_t _thread_id = pthread_self(); \ + _clog(CLOG_COLOR_MAGENTA, "[tid=%lu] locking", _thread_id); \ + pthread_mutex_lock(&_lock); \ + _clog(CLOG_COLOR_MAGENTA, "[tid=%lu] locked", _thread_id); \ + } + +#define JM_CRITICAL_CRITICAL_END \ + { \ + pthread_t _thread_id = pthread_self(); \ + _clog(CLOG_COLOR_MAGENTA, "[tid=%lu] unlocking", _thread_id); \ + pthread_mutex_unlock(&_lock); \ + _clog(CLOG_COLOR_MAGENTA, "[tid=%lu] unlocked", _thread_id); \ + } + +/** + * Guards `_initialized_counter` in jobmgr_init() and jobmgr_cleanup() functions. + */ +pthread_mutex_t _init_lock = PTHREAD_MUTEX_INITIALIZER; +/** + * Guards private date structures of Job Manager. + * It is a recursive lock. + */ +pthread_mutex_t _lock = PTHREAD_MUTEX_INITIALIZER; +const CMPIBroker *_cb = NULL; +const CMPIContext *_cmpi_ctx = NULL; +gchar *_provider_name = NULL; +guint _initialized_counter = 0; +gboolean _concurrent_processing = FALSE; +/* Maps GType of job to JobTypeInfo. This does not change after manager's init. + */ +static GTree *_job_type_map = NULL; +/* Maps job number to job instance. It contains all known jobs accessible with + * GetInstance() call. No other job container of this job manager manipulates + * reference count of jobs. Once the job is created, it is placed here and its + * reference count is set to 1. Once it's deleted it's removed from here and + * its reference count decreased. + */ +static GTree *_job_map = NULL; +/* Priority queue of pending jobs. Key is a pair (job's priority, job's number) + * represented by `PendingJobKey`. Value is corresponding job. Job can not be + * key because its priority may change anytime, therefore its lookup wouldn't + * work. + */ +static GTree *_job_queue = NULL; +/* Set of running jobs. The upper limit of jobs is determined by + * *concurrent_processing* flag. If `true`, there is no limit. Otherwise at + * most 1 job can be present. Ponter to corresponding GTask is an associated + * value. + */ +static GTree *_running_jobs = NULL; +/* Calendar for inner events. Event is some action scheduled for execution on + * particular job. Tree contains just keys. Key is of type CalendarEvent. + * If some action on particular job, which is already present, is being scheduled, + * only the one scheduled for earlier time will be kept. + */ +static GTree *_event_calendar = NULL; +/** + * Thread object executing main loop. Its behaviour is defined in *run()* + * function. + */ +static GThread *_manager_thread = NULL; +/** + * Context belonging to the main loop. + */ +static GMainContext *_main_ctx = NULL; +/** + * Main loop object executed by *_manager_thread*. It handles calendar events + * and all job modified and finished signals. + */ +static GMainLoop *_main_loop = NULL; + +static const gchar *_action_names[] = { + "remove" +}; + +static const gchar *_filter_queries[] = { + /* STATIC_FILTER_ID_CHANGED */ + "SELECT * FROM LMI_%sInstModification WHERE " + "SourceInstance ISA %s AND " + "SourceInstance.CIM_ConcreteJob::JobState <> " + "PreviousInstance.CIM_ConcreteJob::JobState", + /* STATIC_FILTER_ID_CREATED */ + "SELECT * FROM LMI_%sInstCreation WHERE " + "SourceInstance ISA %s", + /* STATIC_FILTER_ID_DELETED */ + "SELECT * FROM LMI_%sInstDeletion WHERE " + "SourceInstance ISA %s", + /* STATIC_FILTER_ID_FAILED */ + "SELECT * FROM LMI_%sInstModification WHERE " + "SourceInstance ISA %s AND " + "SourceInstance.CIM_ConcreteJob::JobState = 10", + /* STATIC_FILTER_ID_PERCENT_UPDATED */ + "SELECT * FROM LMI_%sInstModification WHERE " + "SourceInstance ISA %s AND " + "SourceInstance.CIM_ConcreteJob::PercentComplete <> " + "PreviousInstance.CIM_ConcreteJob::PercentComplete", + /* STATIC_FILTER_ID_SUCCEEDED */ + "SELECT * FROM LMI_%sInstModification WHERE " + "SourceInstance ISA %s AND " + "SourceInstance.CIM_ConcreteJob::JobState = 7", +}; + +/* Forward declarations */ +static gint cmp_calendar_events(gconstpointer a, + gconstpointer b, + gpointer unused); +static gint cmp_g_types(gconstpointer a, + gconstpointer b, + gpointer unused); +static gint cmp_job_file_names(gconstpointer a, + gconstpointer b, + gpointer counter); +static gint cmp_lmi_job_queue_keys(gconstpointer a, + gconstpointer b, + gpointer unused); +static gint cmp_pointers(gconstpointer a, + gconstpointer b, + gpointer unused); +static gint cmp_uints(gconstpointer a, + gconstpointer b, + gpointer unused); +static CMPIStatus delete_job(LmiJob *job); +static void job_deletion_request_changed_callback(LmiJob *job, + gboolean delete_on_completion, + gint64 time_before_removal, + gpointer unused); +static gboolean dup_job_id(gpointer key, + gpointer value, + gpointer data); +static gboolean dup_running_job_id(gpointer key, + gpointer value, + gpointer data); +static void find_and_delete_calendar_event(gpointer item, + gpointer unused); +static gboolean find_event_by_job(gpointer key, + gpointer value, + gpointer data); +static void free_calendar_event(gpointer ptr); +static const gchar *get_persistent_storage_path(void); +static void job_finished_callback(LmiJob *job, + LmiJobStateEnum old_state, + LmiJobStateEnum new_state, + GVariant *result, + const gchar *error, + gpointer unused); +static void job_modified_callback(LmiJob *job, + LmiJobPropEnum property, + GVariant *old_value, + GVariant *new_value, + gpointer unused); +static void job_modified_params_free(JobModifiedParams *params); +static void job_process_start(GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable); +static void job_processed_callback(GObject *object, + GAsyncResult *result, + gpointer user_data); +static CMPIUint16 job_state_to_cim(LmiJobStateEnum state); +static CMPIArray *job_state_to_cim_operational_status(LmiJobStateEnum state); +static const gchar *job_state_to_status(LmiJobStateEnum state); +static CMPIStatus job_to_cim_instance(const LmiJob *job, + CMPIInstance **instance); +static gboolean launch_jobs(gpointer unused); +static LmiJob *load_job_from_file(const gchar *job_file_path); +static gboolean load_job_from_file_cb(gpointer key, + gpointer value, + gpointer data); +static void load_jobs_from_persistent_storage(void); +static const JobTypeInfo *lookup_job_type_info(GType job_type); +static const JobTypeInfo *lookup_job_type_info_for_job(const LmiJob *job); +static gchar *make_job_file_path(const LmiJob *job); +static CMPIStatus make_method_parameters_cim_instance_for_job( + const LmiJob *job, + const gchar *cim_class_name, + CMPIInstance **instance); +static gsize make_method_parameters_class_name(gchar *buf, + gsize buflen, + const gchar *method_name, + gboolean output); +static gboolean process_calendar_event(gpointer unused); +static gboolean process_job_finished_callback(gpointer user_data); +static gboolean process_job_modified_callback(gpointer user_data); +static gboolean register_job_classes_with_ind_sender(gpointer key, + gpointer value, + gpointer data); +static gboolean register_job(LmiJob *job, gboolean is_new); +static CMPIStatus register_static_filters(const gchar *cim_class_name); +static gboolean reschedule_present_event(gpointer key, + gpointer value, + gpointer data); +static gpointer run(gpointer data); +static void save_job(const LmiJob *job, const JobTypeInfo *info); +static void schedule_event(guint64 timeout, + LmiJob *job, + JobActionEnum action); +static gboolean stop_search_for_job_by_id(gpointer key, + gpointer value, + gpointer data); +static gboolean stop_search_for_job_by_name(gpointer key, + gpointer value, + gpointer data); +static gboolean stop_search_for_job_by_number(gpointer key, + gpointer value, + gpointer data); + +static gint cmp_g_types(gconstpointer a, + gconstpointer b, + gpointer unused) +{ + GType aval = (GType) GPOINTER_TO_UINT(a); + GType bval = (GType) GPOINTER_TO_UINT(b); + if (aval < bval) + return -1; + if (aval > bval) + return 1; + return 0; +} + +static gint cmp_uints(gconstpointer a, + gconstpointer b, + gpointer unused) +{ + guint va = (guint) GPOINTER_TO_UINT(a); + guint vb = (guint) GPOINTER_TO_UINT(b); + if (va < vb) + return -1; + if (va > vb) + return 1; + return 0; +} + +static gint cmp_job_file_names(gconstpointer a, + gconstpointer b, + gpointer counter) +{ + gchar const *na = a; + gchar const *nb = b; + gchar const *fst_num_start; + gchar const *snd_num_start; + guint64 fst_id; + guint64 snd_id; + + g_assert((fst_num_start = rindex(na, '-')) != NULL); + g_assert((snd_num_start = rindex(nb, '-')) != NULL); + ++fst_num_start; + ++snd_num_start; + fst_id = g_ascii_strtoull(fst_num_start, NULL, 10); + snd_id = g_ascii_strtoull(snd_num_start, NULL, 10); + if (fst_id < snd_id) + return -1; + if (fst_id > snd_id) + return 1; + return 0; +} + +static gint cmp_lmi_job_queue_keys(gconstpointer a, + gconstpointer b, + gpointer unused) +{ + const PendingJobKey *fst = a; + const PendingJobKey *snd = b; + if (fst->priority < snd->priority) + return -1; + if (fst->priority > snd->priority) + return 1; + if (fst->number < snd->number) + return -1; + if (fst->number > snd->number) + return 1; + return 0; +} + +static gint cmp_pointers(gconstpointer a, + gconstpointer b, + gpointer unused) +{ + guint fst = GPOINTER_TO_UINT(a); + guint snd = GPOINTER_TO_UINT(b); + if (fst < snd) + return -1; + if (snd > fst) + return 1; + return 0; +} + +static gint cmp_calendar_events(gconstpointer a, + gconstpointer b, + gpointer unused) +{ + const CalendarEvent *fst = a; + const CalendarEvent *snd = b; + if (fst->time < snd->time) + return -1; + if (fst->time > snd->time) + return 1; + guint na = lmi_job_get_number(fst->job); + guint nb = lmi_job_get_number(snd->job); + if (na < nb) + return -1; + if (na > nb) + return 1; + return 0; +} + +static void free_calendar_event(gpointer ptr) +{ + CalendarEvent *event = (CalendarEvent *) ptr; + g_assert(event); + g_free(event); +} + +static const JobTypeInfo *lookup_job_type_info(GType job_type) +{ + const JobTypeInfo *info = NULL; + g_assert(_job_type_map); + g_assert(g_type_is_a(job_type, LMI_TYPE_JOB)); + + if (!g_tree_lookup_extended(_job_type_map, + GUINT_TO_POINTER(job_type), NULL, (gpointer *) &info)) + { + lmi_error("Job type \"%s\" is not registered with job manager!", + g_type_name(job_type)); + } + return info; +} + +static const JobTypeInfo *lookup_job_type_info_for_job(const LmiJob *job) +{ + return lookup_job_type_info(G_OBJECT_TYPE(job)); +} + +static CMPIUint16 job_state_to_cim(LmiJobStateEnum state) +{ + CMPIUint16 res; + switch (state) { + case LMI_JOB_STATE_ENUM_NEW: + res = CIM_CONCRETE_JOB_JOB_STATE_NEW; + break; + case LMI_JOB_STATE_ENUM_RUNNING: + res = CIM_CONCRETE_JOB_JOB_STATE_RUNNING; + break; + case LMI_JOB_STATE_ENUM_COMPLETED: + res = CIM_CONCRETE_JOB_JOB_STATE_COMPLETED; + break; + case LMI_JOB_STATE_ENUM_TERMINATED: + res = CIM_CONCRETE_JOB_JOB_STATE_TERMINATED; + break; + case LMI_JOB_STATE_ENUM_EXCEPTION: + res = CIM_CONCRETE_JOB_JOB_STATE_EXCEPTION; + break; + default: + g_assert_not_reached(); /* impossible */ + } + return res; +} + +static const gchar *job_state_to_status(LmiJobStateEnum state) +{ + const gchar *res; + switch (state) { + case LMI_JOB_STATE_ENUM_NEW: + res = "Enqueued"; + break; + case LMI_JOB_STATE_ENUM_RUNNING: + res = "Running"; + break; + case LMI_JOB_STATE_ENUM_COMPLETED: + res = "Completed successfully"; + break; + case LMI_JOB_STATE_ENUM_TERMINATED: + res = "Terminated"; + break; + case LMI_JOB_STATE_ENUM_EXCEPTION: + res = "Failed"; + break; + default: + g_assert_not_reached(); /* impossible */ + } + return res; +} + +static CMPIArray *job_state_to_cim_operational_status(LmiJobStateEnum state) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + CMPIUint16 fst, snd = 0; + CMPIValue value; + CMPIArray *res = CMNewArray(_cb, + state == LMI_JOB_STATE_ENUM_COMPLETED ? 2 : 1, + CMPI_uint16, &status); + + if (!res || status.rc) { + if (status.msg) { + lmi_error(CMGetCharPtr(status.msg)); + } else { + lmi_error("Memory allocation failed"); + } + } else { + switch (state) { + case LMI_JOB_STATE_ENUM_NEW: + fst = CIM_CONCRETE_JOB_OPERATIONAL_STATUS_DORMANT; + break; + case LMI_JOB_STATE_ENUM_RUNNING: + fst = CIM_CONCRETE_JOB_OPERATIONAL_STATUS_OK; + break; + case LMI_JOB_STATE_ENUM_COMPLETED: + fst = CIM_CONCRETE_JOB_OPERATIONAL_STATUS_OK; + snd = CIM_CONCRETE_JOB_OPERATIONAL_STATUS_COMPLETED; + break; + case LMI_JOB_STATE_ENUM_TERMINATED: + fst = CIM_CONCRETE_JOB_OPERATIONAL_STATUS_STOPPED; + break; + case LMI_JOB_STATE_ENUM_EXCEPTION: + fst = CIM_CONCRETE_JOB_OPERATIONAL_STATUS_ERROR; + break; + default: + g_assert_not_reached(); /* impossible */ + } + + value.uint16 = fst; + status = CMSetArrayElementAt(res, 0, &value, CMPI_uint16); + if (!status.rc && snd) { + value.uint16 = snd; + status = CMSetArrayElementAt(res, 1, &value, CMPI_uint16); + } + if (status.rc) { + lmi_error("Memory allocation failed"); + CMRelease(res); + res = NULL; + } + } + return res; +} + +static CMPIStatus make_inst_method_call_instance_for_job(const LmiJob *job, + gboolean pre, + CMPIInstance **instance) +{ + gchar *namespace; + CMPIObjectPath *op; + CMPIStatus status = {CMPI_RC_OK, NULL}; + CMPIValue value; + const JobTypeInfo *info; + gchar buf[BUFLEN]; + CMPIArray *arr; + LmiJobStateEnum state; + g_assert(instance != NULL); + + if ((namespace = lmi_read_config("CIM", "Namespace")) == NULL) { + lmi_error("Memory allocation failed"); + goto err; + } + + op = CMNewObjectPath(_cb, namespace, + CIM_INST_METHOD_CALL_CLASS_NAME, &status); + if (op == NULL || status.rc) + goto namespace_err; + g_free(namespace); + namespace = NULL; + + if ((*instance = CMNewInstance(_cb, op, &status)) == NULL || status.rc) + goto op_err; + op = NULL; + + info = lookup_job_type_info_for_job(job); + + JOB_CRITICAL_BEGIN(job); + + if ((status = jobmgr_job_to_cim_instance(job, &value.inst)).rc) + goto critical_end_err; + if ((status = CMSetProperty(*instance, "SourceInstance", + &value, CMPI_instance)).rc) + { + CMRelease(value.inst); + goto critical_end_err; + } + + if ((status = jobmgr_job_to_cim_op(job, &op)).rc) + goto critical_end_err; + if ((value.string = CMObjectPathToString(op, &status)) == NULL) { + lmi_error("Memory allocation failed"); + goto critical_end_err; + } + CMRelease(op); + op = NULL; + if ((status = CMSetProperty(*instance, "SourceInstanceModelPath", + &value, CMPI_string)).rc) + goto string_err; + + if ((value.string = CMNewString(_cb, lmi_job_get_method_name(job), + &status)) == NULL) + { + lmi_error("Memory allocation failed"); + goto critical_end_err; + } + if ((status = CMSetProperty(*instance, "MethodName", + &value, CMPI_string)).rc) + goto string_err; + + value.boolean = pre != FALSE; + if ((status = CMSetProperty(*instance, "PreCall", &value, CMPI_boolean)).rc) + goto critical_end_err; + + value.uint16 = ( info->return_value_type + + INST_METHOD_CALL_RETURN_VALUE_TYPE_0); + if ((status = CMSetProperty(*instance, "ReturnValueType", + &value, CMPI_uint16)).rc) + goto critical_end_err; + + if (!pre) { + state = lmi_job_get_state(job); + if ((arr = CMNewArray(_cb, state == LMI_JOB_STATE_ENUM_EXCEPTION ? 1:0, + CMPI_instance, &status)) == NULL) + { + lmi_error("Memory allocation failed"); + goto critical_end_err; + } + if (state == LMI_JOB_STATE_ENUM_EXCEPTION) { + if ((status = jobmgr_job_to_cim_error(job, &value.inst)).rc) + goto arr_err; + if ((status = CMSetArrayElementAt( + arr, 0, &value.inst, CMPI_instance)).rc) + { + CMRelease(value.inst); + goto arr_err; + } + } + value.array = arr; + if ((status = CMSetProperty(*instance, "Error", + &value, CMPI_instanceA)).rc) + { + lmi_error("Memory allocation failed"); + goto arr_err; + } + + if (lmi_job_get_state(job) == LMI_JOB_STATE_ENUM_COMPLETED) { + make_method_parameters_class_name(buf, BUFLEN, + lmi_job_get_method_name(job), TRUE); + if ((status = make_method_parameters_cim_instance_for_job( + job, buf, &value.inst)).rc) + goto critical_end_err; + if ((status = info->make_job_params_func( + _cb, _cmpi_ctx, job, + FALSE, /* TODO: change this to TRUE when + * tog-pegasus properly supports unknown classes */ + TRUE, value.inst)).rc) + { + CMRelease(value.inst); + goto critical_end_err; + } + if ((status = CMSetProperty(*instance, "MethodParameters", + &value, CMPI_instance)).rc) + { + CMRelease(value.inst); + goto critical_end_err; + } + } + } + + JOB_CRITICAL_END(job); + + return status; + +arr_err: + CMRelease(value.array); + goto critical_end_err; +string_err: + CMRelease(value.string); +critical_end_err: + JOB_CRITICAL_END(job); + CMRelease(*instance); + *instance = NULL; + goto err; +op_err: + if (op) + CMRelease(op); +namespace_err: + g_free(namespace); +err: + if (!status.rc) + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + return status; +} + +/** + * Make a CIM class name for instance representing method parameters. + * This is constructed dependeing on *output* parameter: + * + * a) *output* is `FALSE` + * `__MethodParameters_<method_name>` + * b) *output* is `TRUE` + * `__MethodParameters_<method_name>_Result` + * + * Both classes need to be properly registered wih CIMOM. The latter is a + * subclass of the former. + * + * @param output Whether the desired instance shall contain output parameters. + */ +static gsize make_method_parameters_class_name(gchar *buf, + gsize buflen, + const gchar *method_name, + gboolean output) +{ + return g_snprintf(buf, buflen, "__MethodParameters_%s%s", + method_name, output ? "_Result" : ""); +} + +/** + * Create an instance of class holding method parameters. + * + * @param cim_class_name Name of CIM class obtained with + * `make_method_parameters_class_name()`. + */ +static CMPIStatus make_method_parameters_cim_instance_for_job( + const LmiJob *job, + const gchar *cim_class_name, + CMPIInstance **instance) +{ + gchar *namespace; + CMPIObjectPath *op; + CMPIStatus status = {CMPI_RC_OK, NULL}; + g_assert(instance != NULL); + + if ((namespace = lmi_read_config("CIM", "Namespace")) == NULL) { + lmi_error("Memory allocation failed"); + goto err; + } + + op = CMNewObjectPath(_cb, namespace, cim_class_name, &status); + if (op == NULL || status.rc) + goto namespace_err; + g_free(namespace); + namespace = NULL; + + if ((*instance = CMNewInstance(_cb, op, &status)) == NULL || status.rc) + goto op_err; + + return status; + +op_err: + CMRelease(op); +namespace_err: + g_free(namespace); +err: + if (!status.rc) + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + return status; +} + +/** + * Set `JobInParameters` and `JobOutParameters` of `LMI_ConcreteJob` instance. + * + * Note that `JobInParameters are not currently set due to shortcomings of + * tog-pegasus broker. + * + * `JobOutParameters` shall be set only for completed job. + */ +static CMPIStatus fill_cim_job_parameters(const LmiJob *job, + CMPIInstance *instance) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + CMPIValue value; + gchar buf[BUFLEN] = {0}; + const JobTypeInfo *info; + const gchar *method_name; + g_assert(instance != NULL); + + if ((method_name = lmi_job_get_method_name(job)) != NULL) { + info = lookup_job_type_info_for_job(job); + /* + * TODO: uncomment this, when Pegasus properly supports instances of + * unknown classes + * + make_method_parameters_class_name( + buf, BUFLEN, method_name, FALSE); + if ((status = make_method_parameters_cim_instance_for_job( + job, buf, value.inst)).rc) + goto err; + if ((status = info->make_job_params_func(cb, job, + TRUE, FALSE, value.inst)).rc) + goto inst_err; + if ((status = CMSetProperty(value.inst, "JobInParameters", + &value, CMPI_instance)).rc) + { + CMRelease(value.inst); + goto inst_err; + } + */ + + if (lmi_job_get_state(job) == LMI_JOB_STATE_ENUM_COMPLETED) { + make_method_parameters_class_name(buf, BUFLEN, method_name, TRUE); + if ((status = make_method_parameters_cim_instance_for_job( + job, buf, &value.inst)).rc) + goto err; + if ((status = info->make_job_params_func( + _cb, _cmpi_ctx, job, FALSE, TRUE, value.inst)).rc) + goto inst_err; + if ((status = CMSetProperty(instance, "JobOutParameters", + &value, CMPI_instance)).rc) + goto inst_err; + } + } + + return status; + +inst_err: + lmi_error("Memory allocation failed"); + CMRelease(value.inst); +err: + return status; +} + +/** + * Create an instance of `LMI_ConcreteJob` corresponding to given job. + * + * Note this does not call `convert_func()` callback to fill additional + * properties. + */ +static CMPIStatus job_to_cim_instance(const LmiJob *job, + CMPIInstance **instance) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + CMPIObjectPath *op; + CMPIInstance *inst; + CMPIValue value; + CMPIData data; + LmiJobStateEnum state; + CMPIUint64 elapsed = 0; + g_assert(instance); + g_assert(LMI_IS_JOB(job)); + g_assert(_initialized_counter > 0); + + if ((status = jobmgr_job_to_cim_op(job, &op)).rc) + goto err; + + if ((inst = CMNewInstance(_cb, op, &status)) == NULL || status.rc) + goto op_err; + + data = CMGetKey(op, "InstanceID", NULL); + value.string = CMClone(data.value.string, &status); + if (value.string == NULL || status.rc) + goto inst_err; + if ((status = CMSetProperty(inst, "InstanceID", &value, CMPI_string)).rc) + goto string_err; + + value.uint16 = CIM_CONCRETE_JOB_COMMUNICATION_STATUS_NOT_AVAILABLE; + if ((status = CMSetProperty(inst, "CommunicationStatus", + &value, CMPI_uint16)).rc) + goto inst_err; + + value.boolean = lmi_job_get_delete_on_completion(job); + if ((status = CMSetProperty(inst, "DeleteOnCompletion", + &value, CMPI_boolean)).rc) + goto inst_err; + + if (lmi_job_is_finished(job)) { + elapsed = ( lmi_job_get_time_of_last_state_change(job) + - lmi_job_get_start_time(job)); + } else if (lmi_job_get_state(job) == LMI_JOB_STATE_ENUM_RUNNING) { + elapsed = time(NULL) - lmi_job_get_start_time(job); + } + value.dateTime = CMNewDateTimeFromBinary(_cb, + elapsed * 1000000L, TRUE, &status); + if (value.dateTime == NULL || status.rc) + goto inst_err; + if ((status = CMSetProperty(inst, "ElapsedTime", + &value, CMPI_dateTime)).rc) + goto datetime_err; + + value.string = CMNewString(_cb, + lmi_job_get_jobid(job), &status); + if (value.string == NULL || status.rc) + goto inst_err; + if ((status = CMSetProperty(inst, "ElementName", &value, CMPI_string)).rc) + goto string_err; + + state = lmi_job_get_state(job); + value.uint16 = state == LMI_JOB_STATE_ENUM_EXCEPTION ? 1 : 0; + if ((status = CMSetProperty(inst, "ErrorCode", &value, CMPI_uint16)).rc) + goto inst_err; + + if ((status = fill_cim_job_parameters(job, inst)).rc) { + goto inst_err; + } + + value.uint16 = job_state_to_cim(state); + if ((status = CMSetProperty(inst, "JobState", &value, CMPI_uint16)).rc) + goto inst_err; + + if ( (value.string = CMNewString(_cb, + job_state_to_status(state), &status)) == NULL + || status.rc) + goto inst_err; + if ((status = CMSetProperty(inst, "JobStatus", &value, CMPI_string)).rc) + goto string_err; + + value.uint16 = CIM_CONCRETE_JOB_LOCAL_OR_UTC_TIME_UTC_TIME; + if ((status = CMSetProperty(inst, "LocalOrUtcTime", + &value, CMPI_uint16)).rc) + goto inst_err; + + if (lmi_job_get_method_name(job)) { + if ( (value.string = CMNewString(_cb, + lmi_job_get_method_name(job), &status)) == NULL + || status.rc) + goto inst_err; + }else { + value.string = NULL; + } + if ((status = CMSetProperty(inst, "MethodName", &value, CMPI_string)).rc) + goto string_err; + + if (lmi_job_get_name(job)) { + if ( (value.string = CMNewString(_cb, + lmi_job_get_name(job), &status)) == NULL + || status.rc) + goto inst_err; + }else { + value.string = NULL; + } + if ((status = CMSetProperty(inst, "Name", &value, CMPI_string)).rc) + goto string_err; + + value.uint16 = lmi_job_get_percent_complete(job); + if ((status = CMSetProperty(inst, "PercentComplete", + &value, CMPI_uint16)).rc) + goto inst_err; + + if ((value.array = job_state_to_cim_operational_status(state)) == NULL) + goto inst_err; + if ((status = CMSetProperty(inst, "OperationalStatus", + &value, CMPI_uint16A)).rc) + goto array_err; + + value.uint32 = lmi_job_get_priority(job); + if ((status = CMSetProperty(inst, "Priority", &value, CMPI_uint32)).rc) + goto inst_err; + + if (state != LMI_JOB_STATE_ENUM_NEW) { + value.dateTime = CMNewDateTimeFromBinary(_cb, + lmi_job_get_start_time(job) * 1000000L, + FALSE, &status); + if (value.dateTime == NULL || status.rc) + goto inst_err; + if ((status = CMSetProperty(inst, "StartTime", + &value, CMPI_dateTime)).rc) + goto datetime_err; + } + + value.dateTime = CMNewDateTimeFromBinary(_cb, + lmi_job_get_time_before_removal(job) * 1000000L, + TRUE, &status); + if (value.dateTime == NULL || status.rc) + goto inst_err; + if ((status = CMSetProperty(inst, "TimeBeforeRemoval", + &value, CMPI_dateTime)).rc) + goto datetime_err; + + value.dateTime = CMNewDateTimeFromBinary(_cb, + lmi_job_get_time_of_last_state_change(job) * 1000000L, + FALSE, &status); + if (value.dateTime == NULL || status.rc) + goto inst_err; + if ((status = CMSetProperty(inst, "TimeOfLastStateChange", + &value, CMPI_dateTime)).rc) + goto datetime_err; + + value.dateTime = CMNewDateTimeFromBinary(_cb, + lmi_job_get_time_submitted(job) * 1000000L, + FALSE, &status); + if (value.dateTime == NULL || status.rc) + goto inst_err; + if ((status = CMSetProperty(inst, "TimeSubmitted", + &value, CMPI_dateTime)).rc) + goto datetime_err; + + *instance = inst; + return status; + +array_err: + CMRelease(value.array); + goto inst_err; +datetime_err: + CMRelease(value.dateTime); + goto inst_err; +string_err: + CMRelease(value.string); +inst_err: + CMRelease(inst); +op_err: + CMRelease(op); +err: + if (!status.rc) { + lmi_error("Memory allocation failed"); + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + } + return status; +} + +struct _CalendarEventContainer { + LmiJob *job; + GList *matches; +}; + +static CMPIStatus delete_job(LmiJob *job) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + const JobTypeInfo *info; + gchar *file_path = NULL; + GFile *file = NULL; + gchar *jobid = NULL; + struct _CalendarEventContainer cec = {job, NULL}; + char err_buf[BUFLEN] = {0}; + CMPIInstance *inst = NULL; + GError *gerror = NULL; + + JOB_CRITICAL_BEGIN(job); + + if ((jobid = lmi_job_get_jobid(job)) == NULL) + goto mem_err; + if (!lmi_job_is_finished(job)) { + snprintf(err_buf, BUFLEN, + "Can't delete unfinished job \"%s\"!", jobid); + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_FAILED, err_buf); + lmi_error(err_buf); + goto done; + } + if ((info = lookup_job_type_info_for_job(job)) == NULL) { + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_NOT_FOUND, + "Can't delete unregistered job!"); + goto done; + } + + g_signal_handlers_disconnect_by_func(job, job_modified_callback, NULL); + g_signal_handlers_disconnect_by_func(job, job_finished_callback, NULL); + g_signal_handlers_disconnect_by_func(job, + job_deletion_request_changed_callback, NULL); + + g_tree_foreach(_event_calendar, find_event_by_job, &cec); + if (cec.matches) { + g_list_foreach(cec.matches, find_and_delete_calendar_event, NULL); + g_list_free(cec.matches); + } + + if (info->use_persistent_storage) { + if ((file_path = make_job_file_path(job)) == NULL) + goto mem_err; + if ((file = g_file_new_for_path(file_path)) == NULL) + goto mem_err; + if (!g_file_delete(file, NULL, &gerror)) { + lmi_error("Failed to delete job's file \"%s\": %s", + file_path, gerror->message); + goto err; + } + g_clear_object(&file); + g_free(file_path); + file_path = NULL; + } + + /* create source instance for instance deletion indication */ + if ( (status = job_to_cim_instance(job, &inst)).rc + || ( info->convert_func + && (status = (*info->convert_func) (_cb, _cmpi_ctx, job, inst)).rc)) + { + if (status.msg) { + lmi_error("Failed to create instance deletion indication: %s", + status.msg); + } else { + lmi_error("Failed to create instance deletion indication!"); + } + } + + JOB_CRITICAL_END(job); + /* this effectively destroys the job object */ + g_tree_remove(_job_map, GUINT_TO_POINTER(lmi_job_get_number(job))); + lmi_info("Deleted job instance \"%s\".", jobid); + + /* send instance deletion indication */ + if (!status.rc && (status = ind_sender_send_instdeletion( + _cb, _cmpi_ctx, inst, STATIC_FILTER_ID_DELETED)).rc) + { + if (status.msg) { + lmi_error("Failed to send instance deletion indication: %s", + status.msg); + } else { + lmi_error("Failed to send instance deletion indication!"); + } + } else if (!status.rc) { + lmi_info("Sent instance deletion indication for job \"%s\".", + jobid); + } + + if (inst) + CMRelease(inst); + g_free(jobid); + return status; + +mem_err: + lmi_error("Memory allocation failed"); +err: + g_clear_error(&gerror); + g_clear_object(&file); + g_free(file_path); + CMSetStatus(&status, CMPI_RC_ERR_FAILED); +done: + JOB_CRITICAL_END(job); + g_free(jobid); + return status; +} + +struct _DuplicateEventContainer { + CalendarEvent *subject; + CalendarEvent *duplicate; +}; + +/** + * Traversing function for `_event_calendar` that just founds duplicate event + * ignoring scheduled time. + */ +static gboolean reschedule_present_event(gpointer key, + gpointer value, + gpointer data) +{ + CalendarEvent *event = key; + struct _DuplicateEventContainer *cnt = data; + if (event->action == cnt->subject->action && + event->job == cnt->subject->job) + { /* same event is already scheduled */ + cnt->duplicate = event; + return TRUE; + } + return FALSE; +} + +static gboolean process_calendar_event(gpointer unused) +{ + CalendarEvent *event; + JobActionEnum action; + LmiJob *job; + gchar *jobid = NULL; + guint64 timeout; + guint64 current_millis; + g_assert(g_main_context_is_owner(_main_ctx)); + + JM_CRITICAL_CRITICAL_BEGIN; + while ( (event = lmi_tree_peek_first(_event_calendar, NULL)) + && event->time <= ((guint64) time(NULL))*1000L) + { + action = event->action; + job = event->job; + jobid = lmi_job_get_jobid(job); + lmi_debug("Processing scheduled event \"%s\" on job \"%s\".", + _action_names[event->action], jobid); + g_tree_remove(_event_calendar, event); + + switch (action) { + case JOB_ACTION_REMOVE: + /* last check before removal */ + if (lmi_job_get_delete_on_completion(job)) + delete_job(job); + break; + default: + g_assert_not_reached(); /* no other actions yet */ + break; + } + } + if (event) { + current_millis = ((guint64) time(NULL))*1000L; + if (current_millis > event->time) { + timeout = 0; + } else { + timeout = event->time - current_millis; + } + if (jobid == NULL) + jobid = lmi_job_get_jobid(event->job); + /* Can't use: + * g_timeout_add((guint) timeout, process_calendar_event, NULL); + * since it does not ensure the callback will be called in desired + * context (_main_ctx). */ + GSource *source = g_timeout_source_new((guint) timeout); + g_source_set_callback(source, process_calendar_event, NULL, NULL); + g_source_attach(source, _main_ctx); + lmi_debug("Next event \"%s\" will be executed" + " after \"%llu\" milliseconds on job \"%s\".", + _action_names[event->action], timeout, jobid); + } + JM_CRITICAL_CRITICAL_END; + g_free(jobid); + return FALSE; +} + +/** + * @param timeout Number of milliseconds to wait before running the action. + */ +static void schedule_event(guint64 timeout, + LmiJob *job, + JobActionEnum action) +{ + CalendarEvent *event; + struct _DuplicateEventContainer cnt; + gchar *jobid = NULL; + guint64 current_time; + g_assert(LMI_IS_JOB(job)); + + current_time = (guint64) time(NULL); + + if ((event = g_new(CalendarEvent, 1)) == NULL) { + lmi_error("Memory allocation failed"); + } else { + event->time = current_time*1000L + timeout; + event->job = job; + event->action = action; + cnt.subject = event; + cnt.duplicate = NULL; + g_tree_foreach(_event_calendar, reschedule_present_event, &cnt); + if (cnt.duplicate && cnt.duplicate->time > event->time) + { /* remove duplicate event only if it's scheduled later */ + lmi_debug("Removing duplicate event [%s] on job %s " + " timing out in %lu seconds.", _action_names[event->action], + jobid, cnt.duplicate->time/1000L); + g_tree_remove(_event_calendar, cnt.duplicate); + } + if (!cnt.duplicate || cnt.duplicate->time > event->time) { + g_tree_insert(_event_calendar, event, NULL); + jobid = lmi_job_get_jobid(job); + lmi_debug("Scheduled %s action on job %s to occur after %lu" + " seconds.", _action_names[event->action], + jobid, timeout/1000L); + if (event == lmi_tree_peek_first(_event_calendar, NULL)) { + GSource *source = g_timeout_source_new(timeout); + g_source_set_callback(source, process_calendar_event, NULL, NULL); + g_source_attach(source, _main_ctx); + } + } else { + g_free(event); + } + } + g_free(jobid); +} + +/** + * Callback run when job moves to finished process. + */ +static void job_processed_callback(GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + LmiJob *job = LMI_JOB(object); + gchar *jobid; + g_assert(g_main_context_is_owner(_main_ctx)); + + JM_CRITICAL_CRITICAL_BEGIN; + JOB_CRITICAL_BEGIN(job); + + if ((jobid = lmi_job_get_jobid(job)) == NULL) { + lmi_error("Memory allocation failed"); + goto err; + } + lmi_debug("Job #%u (jobid=%s, state=%s) processed.", + lmi_job_get_number(job), jobid, + lmi_job_state_to_string(lmi_job_get_state(job))); + if (!lmi_job_is_finished(job)) { + lmi_error("Job processing callback did not finish a job \"%s\"!" + " Setting state to EXCEPTION!", jobid); + lmi_job_finish_exception(job, + LMI_JOB_STATUS_CODE_ENUM_FAILED, + "job processing callback terminated unexpectedly"); + } + + g_tree_remove(_running_jobs, job); + if (lmi_job_get_delete_on_completion(job)) { + time_t tbr = lmi_job_get_time_before_removal(job); + schedule_event(((guint64) tbr)*1000L, job, JOB_ACTION_REMOVE); + } + if (!_concurrent_processing && lmi_tree_peek_first(_job_queue, NULL)) + { /* check for another pending job and launch it */ + /* Can't use: + * g_idle_add(launch_job, NULL); + * since it does not ensure the callback will be called in desired + * context (_main_ctx). */ + GSource *source = g_idle_source_new(); + g_source_set_callback(source, launch_jobs, NULL, NULL); + g_source_attach(source, _main_ctx); + } + +err: + JOB_CRITICAL_END(job); + JM_CRITICAL_CRITICAL_END; +} + +/** + * This shall be run in its own thread. It runs synchronous callback provided + * at job type's registration which finishes the job. + */ +static void job_process_start(GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + LmiJob *job = LMI_JOB(source_object); + const JobTypeInfo *info; + g_assert(G_IS_TASK(task)); + g_assert(LMI_IS_JOB(job)); + g_assert(!lmi_job_is_finished(job)); + + JOB_CRITICAL_BEGIN(job); + if (g_task_return_error_if_cancelled(task)) { + lmi_job_finish_terminate(job); + JOB_CRITICAL_END(job); + } else { + /* re-launched serialized job may be already in RUNNING state */ + if (lmi_job_get_state(job) == LMI_JOB_STATE_ENUM_NEW) + lmi_job_start(job); + JOB_CRITICAL_END(job); + info = lookup_job_type_info_for_job(job); + (*info->process_func) (job, cancellable); + g_task_return_boolean(task, TRUE); + } + g_object_unref(task); +} + +/** + * Launch asynchonous thread to process one or more jobs prepaired to be run. + * Such a job is removed from `_job_queue` and appended to `_running_jobs`. + */ +static gboolean launch_jobs(gpointer unused) +{ + const JobTypeInfo *info; + LmiJob *job; + LmiJob *running; + gchar *jobid = NULL, *running_jobid; + GTask *task = NULL; + GCancellable *cancellable = NULL; + PendingJobKey job_key; + g_assert(g_main_context_is_owner(_main_ctx)); + + JM_CRITICAL_CRITICAL_BEGIN; + + while (lmi_tree_peek_first(_job_queue, (gpointer *) &job)) { + if ((jobid = lmi_job_get_jobid(job)) == NULL) + goto err; + + info = lookup_job_type_info_for_job(job); + + if ( _concurrent_processing + || (running = lmi_tree_peek_first(_running_jobs, NULL)) == NULL) + { + if (info->running_job_cancellable) { + if ((cancellable = g_cancellable_new()) == NULL) + goto err; + } + if ((task = g_task_new(job, cancellable, + job_processed_callback, task)) == NULL) + goto err; + g_tree_insert(_running_jobs, job, task); + job_key.priority = lmi_job_get_priority(job); + job_key.number = lmi_job_get_number(job); + g_tree_remove(_job_queue, &job_key); + g_task_run_in_thread(task, job_process_start); + lmi_debug("Launched task to process job \"%s\".", jobid); + } else { + running_jobid = lmi_job_get_jobid(running); + lmi_debug("Waiting for another job (%s) to finish before starting \"%s\".", + running_jobid, jobid); + g_free(running_jobid); + break; + } + g_free(jobid); + jobid = NULL; + } + + JM_CRITICAL_CRITICAL_END; + return FALSE; + +err: + JM_CRITICAL_CRITICAL_END; + g_clear_object(&task); + g_free(jobid); + lmi_error("Memory allocation failed"); + return FALSE; +} + +/** + * Compute and cache persistent storage path. + * + * @note Since static variable is not guearded, caller must use global + * *_lock* as a guard for this function. + * @return Base directory path, where job files for particular provider + * shall be stored. It must not be freed. + */ +static const gchar *get_persistent_storage_path(void) +{ + static gchar *path = NULL; + g_assert(_provider_name); + + if (!path) { + path = g_strdup_printf("%s/%s", + JOB_MANAGER_PERSISTENT_STORAGE_PREFIX, + _provider_name); + if (path == NULL) { + lmi_error("Memory allocation failed"); + } else { + for ( size_t i=strlen(JOB_MANAGER_PERSISTENT_STORAGE_PREFIX) + 1 + ; i < strlen(JOB_MANAGER_PERSISTENT_STORAGE_PREFIX) + + strlen(_provider_name); ++i) + { + path[i] = tolower(path[i]); + } + } + } + return path; +} + +/** + * Make an absolute path of job file. + */ +static gchar *make_job_file_path(const LmiJob *job) +{ + const gchar *storage_path; + gchar *file_path = NULL; + g_assert(LMI_IS_JOB(job)); + + storage_path = get_persistent_storage_path(); + if (storage_path == NULL) + goto err; + file_path = g_strdup_printf("%s/%s-%u.json", storage_path, + g_type_name(G_OBJECT_TYPE(job)), lmi_job_get_number(job)); + if (file_path == NULL) + goto err; + + return file_path; + +err: + lmi_error("Memory allocation failed"); + return file_path; +} + +static void save_job(const LmiJob *job, const JobTypeInfo *info) +{ + gchar *file_path = NULL; + GFile *file = NULL, *parent = NULL; + GError *gerror = NULL; + gchar *tmp = NULL; + g_assert(LMI_IS_JOB(job)); + + if (!info) + info = lookup_job_type_info_for_job(job); + + if (info->use_persistent_storage) { + if ((file_path = make_job_file_path(job)) == NULL) + goto mem_err; + if ((file = g_file_new_for_commandline_arg(file_path)) == NULL) + goto mem_err; + if ((parent = g_file_get_parent(file)) == NULL) + goto mem_err; + if ((tmp = g_file_get_parse_name(parent)) == NULL) + goto mem_err; + + // make sure that job storage dir exists + if (!g_file_make_directory_with_parents(parent, NULL, &gerror)) { + if (gerror->code != G_IO_ERROR_EXISTS) { + lmi_error("Could not make directory for job persistent" + " storage at \"%s\": %s", tmp, gerror->message); + goto done; + } + } else { + lmi_debug("Created persistent storage for jobs at \"%s\".", tmp); + } + lmi_job_dump_to_file(job, file_path); + } + goto done; + +mem_err: + lmi_error("Memory allocation failed"); +done: + g_clear_object(&parent); + g_clear_object(&file); + g_free(file_path); + g_free(tmp); + g_clear_error(&gerror); +} + +/** + * Every job must be registered with this function. If the job is not new, it + * will be put into `_job_queue` as a pending job. + * + * @param is_new Indicates whether the job is completely new (it was created by + * a call to `jobmgr_new_job()`) or it was loased from a file. + * @return Whether the job was successfully registered. + */ +static gboolean register_job(LmiJob *job, gboolean is_new) +{ + gchar *jobid; + const JobTypeInfo *info; + CMPIStatus status; + CMPIInstance *inst = NULL; + g_assert(LMI_IS_JOB(job)); + + if ((info = lookup_job_type_info_for_job(job)) == NULL) + return FALSE; + if ((jobid = lmi_job_get_jobid(job)) == NULL) { + lmi_error("Memory allocation failed"); + return FALSE; + } + + g_signal_connect(job, "modified", G_CALLBACK(job_modified_callback), NULL); + g_signal_connect(job, "finished", G_CALLBACK(job_finished_callback), NULL); + g_signal_connect(job, "deletion-request-changed", + G_CALLBACK(job_deletion_request_changed_callback), NULL); + + g_tree_insert(_job_map, GUINT_TO_POINTER(lmi_job_get_number(job)), job); + g_object_ref(job); + save_job(job, info); + + lmi_debug("New job \"%s\" of type %s registered with job manager.", + jobid, g_type_name(G_OBJECT_TYPE(job))); + + if (is_new) { + /* send instance creation indication */ + if ((status = job_to_cim_instance(job, &inst)).rc || + (info->convert_func && (status = (*info->convert_func) ( + _cb, _cmpi_ctx, job, inst)).rc)) + { + if (status.msg) { + lmi_error("Failed to create instance creation indication: %s", + status.msg); + } else { + lmi_error("Failed to create instance creation indication!"); + } + } else if ((status = ind_sender_send_instcreation(_cb, _cmpi_ctx, inst, + STATIC_FILTER_ID_CREATED)).rc) + { + if (status.msg) { + lmi_error("Failed to send instance creation indication: %s", + status.msg); + } else { + lmi_error("Failed to send instance creation indication!"); + } + } else { + lmi_info("Sent instance creation indication for job \"%s\".", + jobid); + } + if (inst) + CMRelease(inst); + } else { + if (lmi_job_is_finished(job) && lmi_job_get_delete_on_completion(job)) + { + time_t seconds_finished = time(NULL) \ + - lmi_job_get_time_of_last_state_change(job); + time_t tbr = lmi_job_get_time_before_removal(job); + tbr = seconds_finished >= tbr ? 0 : (tbr - seconds_finished); + schedule_event(((guint64) tbr)*1000L, job, JOB_ACTION_REMOVE); + } else if (!lmi_job_is_finished(job)) { + jobmgr_run_job(job); + } + } + + g_free(jobid); + return TRUE; +} + +static LmiJob *load_job_from_file(gchar const *job_file_path) +{ + LmiJob *res = NULL; + gchar *tmp = NULL; + GType job_type; + gchar job_type_name[BUFLEN]; + const gchar *basename; + + g_assert((tmp = rindex(job_file_path, '-')) != NULL); + if ((basename = rindex(job_file_path, '/')) != NULL) { + ++basename; + } else { + basename = job_file_path; + } + lmi_debug("Loading job file \"%s\".", basename); + strncpy(job_type_name, basename, MIN(tmp - basename, BUFLEN)); + job_type_name[MIN(tmp - basename, BUFLEN)] = 0; + if ((job_type = g_type_from_name(job_type_name)) == 0) { + lmi_error("Job type \"%s\" is not known to glib type system.", + job_type_name); + goto err; + } + if ((res = lmi_job_load_from_file(job_file_path, job_type)) != NULL) { + if (!register_job(res, FALSE)) { + g_clear_object(&res); + } + } +err: + return res; +} + +/** + * Traversing function of a dictionary with job file paths. + * Each job is loaded and registered with job manager. + */ +static gboolean load_job_from_file_cb(gpointer key, + gpointer value, + gpointer data) +{ + const gchar *file_path = key; + guint *count = data; + + LmiJob *res = load_job_from_file(file_path); + if (res && count) { + *count = *count + 1; + } + return FALSE; +} + +static void load_jobs_from_persistent_storage(void) +{ + GFileEnumerator *fenum = NULL; + GFile *storage, *file = NULL; + GFileInfo *info = NULL; + GTree *sorted_job_paths = NULL; + GError *gerror = NULL; + const gchar *storage_path; + const gchar *name; + size_t length; + int count = 0; + GRegex *regex; + gchar *path; + g_assert(g_main_context_is_owner(_main_ctx)); + g_assert(_provider_name); + + storage_path = get_persistent_storage_path(); + if (storage_path == NULL) + { + lmi_error("Memory allocation failed"); + goto err; + } + storage = g_file_new_for_commandline_arg(storage_path); + if (storage == NULL) { + lmi_error("Memory allocation failed"); + goto err; + } + + if ((fenum = g_file_enumerate_children(storage, + G_FILE_ATTRIBUTE_STANDARD_NAME "," + G_FILE_ATTRIBUTE_STANDARD_TYPE "," + G_FILE_ATTRIBUTE_ACCESS_CAN_READ, + 0, NULL, &gerror)) == NULL) + { + lmi_error("Job persistent storage \"%s\" can not be enumerated: %s", + storage_path, gerror->message); + goto err; + } + + if ((sorted_job_paths = g_tree_new_full( + cmp_job_file_names, NULL, g_free, NULL)) == NULL) + { + lmi_error("Memory allocation failed"); + goto err; + } + if ((regex = g_regex_new(JOB_FILE_NAME_REGEX, 0, 0, &gerror)) == NULL) { + lmi_error("Failed to compile regular expression: %s", gerror->message); + goto sorted_job_paths_err; + } + + while ((info = g_file_enumerator_next_file(fenum, NULL, &gerror)) != NULL) { + name = g_file_info_get_name(info); + length = strlen(name); + + /* filter files by extension */ + if (length <= strlen(JOB_FILE_EXTENSION) || + strncmp(name + length - strlen(JOB_FILE_EXTENSION), + JOB_FILE_EXTENSION, strlen(JOB_FILE_EXTENSION))) + goto next_info; + /* filter out non-regular files */ + if (g_file_info_get_file_type(info) != G_FILE_TYPE_REGULAR && + g_file_info_get_file_type(info) != G_FILE_TYPE_SYMBOLIC_LINK) + goto next_info; + /* filter out not accessible */ + if (!g_file_info_get_attribute_boolean(info, + G_FILE_ATTRIBUTE_ACCESS_CAN_READ)) { + goto next_info; + } + + /* filter by reqular expression */ + if (!g_regex_match(regex, name, 0, NULL)) { + lmi_warn("File \"%s\" does not match regular expression" + " \"%s\".", name, JOB_FILE_NAME_REGEX); + goto next_info; + } + + if ((file = g_file_get_child(storage, name)) == NULL) { + lmi_error("Memory allocation failed"); + goto regex_err; + } + if ((path = g_file_get_path(file)) == NULL) { + g_clear_object(&file); + lmi_error("Memory allocation failed"); + goto regex_err; + } + g_clear_object(&file); + g_tree_insert(sorted_job_paths, path, NULL); + +next_info: + g_clear_object(&info); + } + if (gerror) { + lmi_error("Can't read persistent storage: %s", gerror->message); + } else { + g_tree_foreach(sorted_job_paths, load_job_from_file_cb, &count); + lmi_info("Successfully loaded %u jobs of %s provider.", + count, _provider_name); + } + +regex_err: + g_clear_object(&info); + g_regex_unref(regex); +sorted_job_paths_err: + g_tree_unref(sorted_job_paths); +err: + g_clear_object(&fenum); + g_clear_object(&storage); + g_clear_error(&gerror); + return; +} + +static void job_modified_params_free(JobModifiedParams *params) +{ + g_assert(params); + + g_variant_unref(params->old_value); + g_free(params); +} + +static gboolean process_job_modified_callback(gpointer user_data) +{ + CMPIInstance *old, *new; + CMPIStatus status = {CMPI_RC_OK, NULL}; + LmiJobStateEnum state; + const JobTypeInfo *info; + CMPIValue value; + gchar *jobid = NULL; + int filter_index = 0; + const char *filter_ids[] = {NULL, NULL, NULL}; + char err_buf[BUFLEN] = {0}; + PendingJobKey *job_key = NULL; + JobModifiedParams *params = user_data; + g_assert(_initialized_counter); + g_assert(LMI_IS_JOB(params->job)); + g_assert(g_main_context_is_owner(_main_ctx)); + + info = lookup_job_type_info_for_job(params->job); + + JM_CRITICAL_CRITICAL_BEGIN; + JOB_CRITICAL_BEGIN(params->job); + + if ((jobid = lmi_job_get_jobid(params->job)) == NULL) + goto memory_err; + + lmi_debug("Running modified callback for job %s triggered by" + " \"%s\" property.", lmi_job_get_jobid(params->job), + lmi_job_prop_to_string(params->property)); + + save_job(params->job, info); + + /* process just following properties */ + if (params->property != LMI_JOB_PROP_ENUM_STATE && + params->property != LMI_JOB_PROP_ENUM_PRIORITY && + params->property != LMI_JOB_PROP_ENUM_PERCENT_COMPLETE) + goto done; + + if ((status = job_to_cim_instance(params->job, &new)).rc) + { + if (status.msg) { + snprintf(err_buf, BUFLEN, + "Failed to create source instance for job \"%s\": %s!", + jobid, CMGetCharPtr(status.msg)); + } else { + snprintf(err_buf, BUFLEN, + "Failed to create source instance for job \"%s\"!", + jobid); + } + goto jobid_err; + } + + if (info->convert_func) { + if ((status = (*info->convert_func) ( + _cb, _cmpi_ctx, params->job, new)).rc) + { + if (status.msg) + strncpy(err_buf, CMGetCharPtr(status.msg), BUFLEN); + goto new_err; + } + } + + if ((old = CMClone(new, &status)) == NULL || status.rc) + goto new_err; + + switch (params->property) { + case LMI_JOB_PROP_ENUM_STATE: + state = g_variant_get_uint32(params->old_value); + value.uint16 = job_state_to_cim(state); + CMSetProperty(old, "JobState", &value, CMPI_uint16); + if ((value.array = job_state_to_cim_operational_status(state)) == NULL) + goto old_err; + CMSetProperty(old, "OperationalStatus", &value, CMPI_uint16A); + if ((value.string = CMNewString(_cb, + job_state_to_status(state), + &status)) == NULL || status.rc) + goto old_err; + CMSetProperty(old, "JobStatus", &value, CMPI_string); + switch (lmi_job_get_state(params->job)) { + case LMI_JOB_STATE_ENUM_COMPLETED: + filter_ids[filter_index++] = STATIC_FILTER_ID_SUCCEEDED; + break; + case LMI_JOB_STATE_ENUM_EXCEPTION: + filter_ids[filter_index++] = STATIC_FILTER_ID_FAILED; + break; + default: + break; + } + filter_ids[filter_index++] = STATIC_FILTER_ID_CHANGED; + break; + + case LMI_JOB_PROP_ENUM_PERCENT_COMPLETE: + value.uint16 = g_variant_get_uint32(params->old_value); + CMSetProperty(old, "PercentComplete", &value, CMPI_uint16); + filter_ids[filter_index++] = STATIC_FILTER_ID_PERCENT_UPDATED; + break; + + case LMI_JOB_PROP_ENUM_PRIORITY: + if ((job_key = g_new(PendingJobKey, 1)) == NULL) + goto old_err; + if (g_tree_lookup(_job_queue, job_key)) { + /* if job is pending, reinsert it into queue */ + job_key->priority = g_variant_get_uint32(params->old_value); + job_key->number = lmi_job_get_number(params->job); + g_tree_remove(_job_queue, job_key); + job_key->priority = lmi_job_get_priority(params->job); + g_tree_insert(_job_queue, job_key, params->job); + } + value.uint32 = g_variant_get_uint32(params->old_value); + CMSetProperty(old, "Priority", &value, CMPI_uint32); + break; + + default: + break; + } + + while (filter_index > 0) { + --filter_index; + if ((status = ind_sender_send_instmodification(_cb, _cmpi_ctx, + old, new, filter_ids[filter_index])).rc) + { + if (status.msg) { + snprintf(err_buf, BUFLEN, + "Failed to send instance modification \"%s\" for job" + " \"%s\": %s!", filter_ids[filter_index], jobid, + CMGetCharPtr(status.msg)); + } else { + snprintf(err_buf, BUFLEN, + "Failed to send instance modification \"%s\" for job" + " \"%s\"!", filter_ids[filter_index], jobid); + } + goto old_err; + } + lmi_info("Sent instance modification indication \"%s\" for job" + " \"%s\" due to property \"%s\".", filter_ids[filter_index], + jobid, lmi_job_prop_to_string(params->property)); + } + + CMRelease(old); + CMRelease(new); +done: + JOB_CRITICAL_END(params->job); + JM_CRITICAL_CRITICAL_END; + g_free(jobid); + return FALSE; + +old_err: + CMRelease(old); +new_err: + CMRelease(new); +jobid_err: + g_free(jobid); +memory_err: + JOB_CRITICAL_END(params->job); + JM_CRITICAL_CRITICAL_END; + if (!err_buf[0]) { + lmi_error("Memory allocation failed"); + } else { + lmi_error(err_buf); + } + return FALSE; +} + +/** + * Callback for calling `process_job_modified_callback`. It just passes + * parameters and ensures the callback is called in correct context. + */ +static void job_modified_callback(LmiJob *job, + LmiJobPropEnum property, + GVariant *old_value, + GVariant *new_value, + gpointer unused) +{ + JobModifiedParams *params; + + if ((params = g_new(JobModifiedParams, 1)) == NULL) { + lmi_error("Memory allocation failed"); + return; + } + params->job = job; + params->property = property; + params->old_value = g_variant_ref(old_value); + + g_main_context_invoke_full( + _main_ctx, + G_PRIORITY_DEFAULT, + process_job_modified_callback, + params, + (GDestroyNotify) job_modified_params_free); +} + +static gboolean process_job_finished_callback(gpointer user_data) +{ + LmiJob *job = LMI_JOB(user_data); + g_assert(_initialized_counter); + g_assert(g_main_context_is_owner(_main_ctx)); + + JM_CRITICAL_CRITICAL_BEGIN; + JOB_CRITICAL_BEGIN(job); + + gchar *jobid = lmi_job_get_jobid(job); + PendingJobKey job_key = {lmi_job_get_priority(job), lmi_job_get_number(job)}; + + if (!jobid) + goto memory_err; + if (lmi_job_get_delete_on_completion(job)) { + time_t tbr = lmi_job_get_time_before_removal(job); + schedule_event(((guint64) tbr)*1000L, job, JOB_ACTION_REMOVE); + } + g_tree_remove(_job_queue, &job_key); + + JOB_CRITICAL_END(job); + JM_CRITICAL_CRITICAL_END; + g_free(jobid); + return FALSE; + +memory_err: + JOB_CRITICAL_END(job); + JM_CRITICAL_CRITICAL_END; + lmi_error("Memory allocation failed"); + return FALSE; +} + +/** + * This callback just passes relevant arguments to + * `process_job_finished_callback()` and ensures it is executed in the right + * context. + */ +static void job_finished_callback(LmiJob *job, + LmiJobStateEnum old_state, + LmiJobStateEnum new_state, + GVariant *result, + const gchar *error, + gpointer unused) +{ + g_main_context_invoke(_main_ctx, process_job_finished_callback, job); +} + +static gboolean process_job_deletion_request_changed_callback( + gpointer user_data) +{ + LmiJob *job = LMI_JOB(user_data); + gboolean doc; + gint64 tbr; + struct _DuplicateEventContainer cnt; + gchar *jobid; + CalendarEvent event = {0, job, JOB_ACTION_REMOVE}; + guint64 current_millis = time(NULL) * 1000L; + g_assert(_initialized_counter); + g_assert(g_main_context_is_owner(_main_ctx)); + + JM_CRITICAL_CRITICAL_BEGIN; + JOB_CRITICAL_BEGIN(job); + doc = lmi_job_get_delete_on_completion(job); + tbr = lmi_job_get_time_before_removal(job); + + if (doc) { + /* schedule finished job for deletion */ + if (lmi_job_is_finished(job)) { + tbr = ( tbr*1000L + /* milliseconds elapsed since finished */ + - ( current_millis + - lmi_job_get_time_of_last_state_change(job)*1000L)); + tbr = MAX(tbr, 0); + schedule_event(tbr, job, event.action); + } + } else { + /* remove scheduled removal event from calendar if any */ + cnt.subject = &event; + cnt.duplicate = NULL; + g_tree_foreach(_event_calendar, reschedule_present_event, &cnt); + if (cnt.duplicate) { + jobid = lmi_job_get_jobid(job); + tbr = cnt.duplicate->time - current_millis; + tbr = MAX(tbr, 0); + lmi_debug("Removing event [%s] on job %s " + " timing out in %lu seconds.", _action_names[event.action], + jobid, tbr/1000L); + g_tree_remove(_event_calendar, cnt.duplicate); + g_free(jobid); + } + } + JOB_CRITICAL_END(job); + JM_CRITICAL_CRITICAL_END; + + return FALSE; +} + +/** + * This callback just passes relevant arguments to + * `process_job_deletion_request_changed_callback()` and ensures it is executed + * in the right context. + */ +static void job_deletion_request_changed_callback(LmiJob *job, + gboolean delete_on_completion, + gint64 time_before_removal, + gpointer unused) +{ + g_main_context_invoke(_main_ctx, + process_job_deletion_request_changed_callback, job); +} + +/** + * Traversing function for `_job_type_map` tree. It registeres all known job + * type classes with indication sender. + */ +static gboolean register_job_classes_with_ind_sender(gpointer key, + gpointer value, + gpointer data) +{ + JobTypeInfo *info = value; + CMPIStatus *status = data; + + if (status->rc) + return TRUE; + lmi_debug("Registering static filters for %s.", info->cim_class_name); + *status = register_static_filters(info->cim_class_name); + return status->rc ? TRUE:FALSE; +} + +static CMPIStatus register_static_filters(const gchar *cim_class_name) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + StaticFilter filters[IND_SENDER_STATIC_FILTER_ENUM_LAST]; + int i; + + for (i=0; i < IND_SENDER_STATIC_FILTER_ENUM_LAST; ++i) { + filters[i].id = ind_sender_static_filter_names[i]; + if ((filters[i].query = g_strdup_printf( + _filter_queries[i], _provider_name, cim_class_name)) == NULL) + { + lmi_error("Memory allocation failed"); + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + goto err; + } + } + status = ind_sender_add_static_filters( + cim_class_name, + filters, + IND_SENDER_STATIC_FILTER_ENUM_LAST); + +err: + while (i >= 0) { + g_free((char *) filters[--i].query); + } + return status; +} + +/** + * Traversing function for `_job_type_map`. + * + * @param data is a pointer to boolean which is set to TRUE if + * any registered job type needs persistent storage. + */ +static gboolean is_persistent_storage_needed(gpointer key, + gpointer value, + gpointer data) +{ + const JobTypeInfo *info = value; + gboolean *result = data; + if (info->use_persistent_storage) { + *result = TRUE; + return TRUE; + } + return FALSE; +} + +/** + * Executes main loop. + * + * It takes care of calendar events and executes pending jobs. + */ +static gpointer run(gpointer data) +{ + gboolean load_persistent_storage = FALSE; + pthread_cond_t *initialized_cond = data; + + lmi_debug("Starting job processing thread."); + CBAttachThread(_cb, _cmpi_ctx); + g_main_context_push_thread_default(_main_ctx); + + g_tree_foreach(_job_type_map, + is_persistent_storage_needed, + &load_persistent_storage); + if (load_persistent_storage) { + lmi_debug("Loading persistent jobs."); + load_jobs_from_persistent_storage(); + } + + JM_CRITICAL_CRITICAL_BEGIN; + pthread_cond_signal(initialized_cond); + JM_CRITICAL_CRITICAL_END; + g_main_loop_run(_main_loop); + lmi_debug("Terminating main loop."); + g_main_context_pop_thread_default(_main_ctx); + CBDetachThread(_cb, _cmpi_ctx); + return NULL; +} + +CMPIStatus jobmgr_init(const CMPIBroker *cb, + const CMPIContext *ctx, + const gchar *provider_name, + gboolean concurrent_processing, + const JobTypeEntry *job_types, + gsize n_job_types) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + pthread_cond_t thread_started = PTHREAD_COND_INITIALIZER; + pthread_mutexattr_t lock_attr; + g_assert(cb); + g_assert(ctx); + g_assert(provider_name); + + lmi_debug("Began initialization."); + if ((!_job_type_map || lmi_tree_peek_first(_job_type_map, NULL) == NULL) && + n_job_types < 1) + { + lmi_error("Can't initialize job manager without" + " any registered job type!"); + CMSetStatusWithChars(cb, &status, CMPI_RC_ERR_FAILED, + "No job type registered for job manager to work."); + return status; + } + + pthread_mutex_lock(&_init_lock); + if (_initialized_counter > 0) { + ++_initialized_counter; + goto done; + } + + pthread_mutexattr_init(&lock_attr); + pthread_mutexattr_settype(&lock_attr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&_lock, &lock_attr); + pthread_mutexattr_destroy(&lock_attr); + + JM_CRITICAL_CRITICAL_BEGIN; + + if ((status = ind_sender_init(provider_name)).rc) + goto done; + + _cb = cb; + _concurrent_processing = concurrent_processing; + if ((_provider_name = g_strdup(provider_name)) == NULL) + goto err; + if ((_job_map = g_tree_new_full( + cmp_uints, NULL, NULL, g_object_unref)) == NULL) + goto provider_name_err; + if ((_job_queue = g_tree_new_full( + cmp_lmi_job_queue_keys, NULL, g_free, NULL)) == NULL) + goto job_map_err; + if ((_running_jobs = g_tree_new_full( + cmp_pointers, NULL, + g_object_unref, NULL)) == NULL) + goto job_queue_err; + if ((_event_calendar = g_tree_new_full( + cmp_calendar_events, NULL, + free_calendar_event, NULL)) == NULL) + goto running_jobs_err; + if ((_main_ctx = g_main_context_new()) == NULL) + goto event_calendar_err; + if ((_main_loop = g_main_loop_new(_main_ctx, FALSE)) == NULL) + goto main_ctx_err; + + if (n_job_types > 0) { + for (gsize i=0; i < n_job_types; ++i) { + status = jobmgr_register_job_type( + job_types[i].job_type, + job_types[i].cim_class_name, + job_types[i].convert_func, + job_types[i].make_job_params_func, + job_types[i].return_value_type, + job_types[i].process_func, + job_types[i].running_job_cancellable, + job_types[i].use_persistent_storage); + if (status.rc) + goto main_loop_err; + } + } + lmi_debug("Registering static filters with indication sender."); + g_tree_foreach(_job_type_map, register_job_classes_with_ind_sender, &status); + if (status.rc) + goto main_loop_err; + + if ((_cmpi_ctx = CBPrepareAttachThread(cb, ctx)) == NULL) + goto main_loop_err; + /* We need to be initialized even before thread is started because first + * thing the thread do is loading serialized jobs using functions checking + * for job manager's initialization. */ + ++_initialized_counter; + if ((_manager_thread = g_thread_new(JOB_MANAGER_THREAD_NAME, + run, (gpointer) &thread_started)) == NULL) + goto initialized_counter_err; + + /* wait for spawned thread to initialize itself and load serialized jobs */ + pthread_cond_wait(&thread_started, &_lock); + + JM_CRITICAL_CRITICAL_END; + pthread_mutex_unlock(&_init_lock); + lmi_debug("Job manager initialized for %s.", provider_name); + return status; + +initialized_counter_err: + --_initialized_counter; +main_loop_err: + g_main_loop_unref(_main_loop); + _main_loop = NULL; +main_ctx_err: + g_main_context_unref(_main_ctx); + _main_ctx = NULL; +event_calendar_err: + g_tree_unref(_event_calendar); + _event_calendar = NULL; +running_jobs_err: + g_tree_unref(_running_jobs); + _running_jobs = NULL; +job_queue_err: + g_tree_unref(_job_queue); + _job_queue = NULL; +job_map_err: + g_tree_unref(_job_map); + _job_map = NULL; +provider_name_err: + g_free(_provider_name); + _provider_name = NULL; +err: + ind_sender_cleanup(); + _cb = NULL; + _cmpi_ctx = NULL; + if (!status.rc) { + lmi_error("Memory allocation failed"); + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + } +done: + JM_CRITICAL_CRITICAL_END; + pthread_mutex_unlock(&_init_lock); + return status; +} + +CMPIStatus jobmgr_cleanup(void) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + + pthread_mutex_lock(&_init_lock); + JM_CRITICAL_CRITICAL_BEGIN; + + if (_initialized_counter == 0) { + lmi_error("Job Manager cleanup() called more times than init()!"); + } else { + if (_initialized_counter == 1) { + g_assert(_manager_thread); + g_main_loop_quit(_main_loop); + g_thread_join(_manager_thread); + g_main_loop_unref(_main_loop); + _main_loop = NULL; + g_main_context_unref(_main_ctx); + _main_ctx = NULL; + _manager_thread = NULL; + g_tree_unref(_event_calendar); + _event_calendar = NULL; + g_tree_unref(_running_jobs); + _running_jobs = NULL; + g_tree_unref(_job_queue); + _job_queue = NULL; + g_tree_unref(_job_map); + _job_map = NULL; + g_free(_provider_name); + _provider_name = NULL; + g_tree_unref(_job_type_map); + _job_type_map = NULL; + status = ind_sender_cleanup(); + } + --_initialized_counter; + } + + JM_CRITICAL_CRITICAL_END; + pthread_mutex_unlock(&_init_lock); + + lmi_debug("Job manager cleanup finished."); + + return status; +} + +CMPIStatus jobmgr_register_job_type(GType job_type, + const gchar *cim_class_name, + JobToCimInstanceCallback convert_func, + MakeJobParametersCallback make_job_params_func, + MethodResultValueTypeEnum return_value_type, + JobProcessCallback process_func, + gboolean running_job_cancellable, + gboolean use_persistent_storage) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + JobTypeInfo *info; + g_assert(!_initialized_counter); + g_assert(g_type_is_a(job_type, LMI_TYPE_JOB)); + + if ( _job_type_map == NULL + && (_job_type_map = g_tree_new_full( + cmp_g_types, NULL, NULL, g_free)) == NULL) + goto err; + + if ((info = g_new(JobTypeInfo, 1)) == NULL) + goto err; + if ((info->cim_class_name = g_strdup(cim_class_name ? + cim_class_name : DEFAULT_CIM_CLASS_NAME)) == NULL) + goto err; + info->running_job_cancellable = running_job_cancellable; + info->convert_func = convert_func; + info->make_job_params_func = make_job_params_func; + info->return_value_type = return_value_type; + info->process_func = process_func; + info->use_persistent_storage = use_persistent_storage; + g_tree_insert(_job_type_map, GUINT_TO_POINTER(job_type), info); + + lmi_debug("Registered job type %s for job class %s.", + g_type_name(job_type), cim_class_name); + + return status; + +err: + lmi_error("Memory allocation failed"); + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + return status; +} + +LmiJob *jobmgr_new_job(GType job_type) +{ + LmiJob *res = NULL; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + if ((res = g_object_new(job_type, NULL)) == NULL) + goto err; + if (!register_job(res, TRUE)) + goto err; + + JM_CRITICAL_CRITICAL_END; + return res; + +err: + g_clear_object(&res); + lmi_error("Failed to create new job!"); + JM_CRITICAL_CRITICAL_END; + return res; +} + +CMPIStatus jobmgr_run_job(LmiJob *job) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + const JobTypeInfo *info; + PendingJobKey *job_key = NULL; + gchar *jobid; + g_assert(_initialized_counter); + g_assert(LMI_IS_JOB(job)); + + if ((info = lookup_job_type_info_for_job(job)) == NULL) { + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + goto err; + } + + JM_CRITICAL_CRITICAL_BEGIN; + JOB_CRITICAL_BEGIN(job); + if ((job_key = g_new(PendingJobKey, 1)) == NULL) { + lmi_error("Memory allocation failed"); + goto job_lock_release; + } + job_key->priority = lmi_job_get_priority(job); + job_key->number = lmi_job_get_number(job); + if (g_tree_lookup_extended(_job_queue, &job_key, NULL, NULL)) { + jobid = lmi_job_get_jobid(job); + lmi_warn("Can't run job \"%s\" second time.", jobid); + g_free(jobid); + CMSetStatusWithChars(_cb, &status, + CMPI_RC_ERR_FAILED, "Job already started!"); + goto job_key_err; + } + JOB_CRITICAL_END(job); + g_tree_insert(_job_queue, job_key, job); + GSource *source = g_idle_source_new(); + g_source_set_callback(source, launch_jobs, NULL, NULL); + g_source_attach(source, _main_ctx); + + JM_CRITICAL_CRITICAL_END; + return status; + +job_key_err: + g_free(job_key); +job_lock_release: + JOB_CRITICAL_END(job); + JM_CRITICAL_CRITICAL_END; +err: + return status; +} + +CMPIStatus jobmgr_run_job_id(const char *jobid) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + LmiJob *job; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + job = g_tree_lookup(_job_map, jobid); + JM_CRITICAL_CRITICAL_END; + if (!job) { + char *msg; + if (asprintf(&msg, "No such job with id \"%s\"!", jobid)) { + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_NOT_FOUND, + msg); + free(msg); + } else { + lmi_error("No such job with id \"%s\"!", jobid); + CMSetStatus(&status, CMPI_RC_ERR_NOT_FOUND); + } + }else { + status = jobmgr_run_job(job); + } + + return status; +} + +static gboolean dup_job_id(gpointer key, + gpointer value, + gpointer data) +{ + gchar ***dest_ptr = data; + if ((**dest_ptr = lmi_job_get_jobid(value)) == NULL) + /* Stop iteration. */ + return TRUE; + (*dest_ptr)++; + return FALSE; +} + +gchar **jobmgr_get_job_ids(guint *count) +{ + gchar **res = NULL, **ptr; + guint size; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + size = g_tree_nnodes(_job_map); + if ((res = g_new(gchar *, size)) == NULL) + goto err; + ptr = res; + g_tree_foreach(_job_map, dup_job_id, &ptr); + if (ptr - res < size) + goto res_err; + *ptr = NULL; + if (count) + *count = size; + JM_CRITICAL_CRITICAL_END; + + return res; + +res_err: + while (ptr > res) + g_free(*(--ptr)); + g_free(res); + res = NULL; +err: + JM_CRITICAL_CRITICAL_END; + lmi_error("Memory allocation failed"); + return res; +} + +static gboolean dup_job_number(gpointer key, + gpointer value, + gpointer data) +{ + guint **dest_ptr = data; + **dest_ptr = lmi_job_get_number(value); + (*dest_ptr)++; + return FALSE; +} + + +guint *jobmgr_get_job_numbers(guint *count) +{ + guint *res = NULL, *ptr; + guint size; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + size = g_tree_nnodes(_job_map); + if ((res = g_new(guint, size + 1)) == NULL) + goto err; + ptr = res; + g_tree_foreach(_job_map, dup_job_number, &ptr); + if (ptr - res < size) + goto res_err; + *ptr = 0; + if (count) + *count = size; + JM_CRITICAL_CRITICAL_END; + + return res; + +res_err: + g_free(res); + res = NULL; +err: + JM_CRITICAL_CRITICAL_END; + lmi_error("Memory allocation failed"); + return res; +} + +gchar **jobmgr_get_pending_job_ids(guint *count) +{ + gchar **res = NULL, **ptr; + guint size; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + size = g_tree_nnodes(_job_queue); + if ((res = g_new(gchar *, size + 1)) == NULL) + goto err; + ptr = res; + g_tree_foreach(_job_queue, dup_job_id, &ptr); + if (ptr - res < size) + goto res_err; + *ptr = NULL; + if (count) + *count = size; + JM_CRITICAL_CRITICAL_END; + + return res; + +res_err: + g_free(res); + while (ptr > res) + g_free(*(--ptr)); + g_free(res); + res = NULL; +err: + JM_CRITICAL_CRITICAL_END; + lmi_error("Memory allocation failed"); + return res; +} + +guint *jobmgr_get_pending_job_numbers(guint *count) +{ + guint *res = NULL, *ptr; + guint size; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + size = g_tree_nnodes(_job_queue); + if ((res = g_new(guint, size + 1)) == NULL) + goto err; + ptr = res; + g_tree_foreach(_job_queue, dup_job_number, &ptr); + if (ptr - res < size) + goto res_err; + *ptr = 0; + if (count) + *count = size; + JM_CRITICAL_CRITICAL_END; + + return res; + +res_err: + g_free(res); + res = NULL; +err: + JM_CRITICAL_CRITICAL_END; + lmi_error("Memory allocation failed"); + return res; +} + +static gboolean dup_running_job_id(gpointer key, + gpointer value, + gpointer data) +{ + gchar ***dest_ptr = data; + if ((**dest_ptr = lmi_job_get_jobid(key)) == NULL) + /* Stop iteration. */ + return TRUE; + (*dest_ptr)++; + return FALSE; +} + +gchar **jobmgr_get_running_job_ids(guint *count) +{ + gchar **res = NULL, **ptr; + guint size; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + size = g_tree_nnodes(_running_jobs); + if ((res = g_new(gchar *, size + 1)) == NULL) + goto err; + ptr = res; + g_tree_foreach(_running_jobs, dup_running_job_id, &ptr); + if (ptr - res < size) + goto res_err; + *ptr = NULL; + if (count) + *count = size; + JM_CRITICAL_CRITICAL_END; + + return res; + +res_err: + while (ptr > res) + g_free(*(--ptr)); + g_free(res); + res = NULL; +err: + JM_CRITICAL_CRITICAL_END; + lmi_error("Memory allocation failed"); + return res; +} + +static gboolean dup_running_job_number(gpointer key, + gpointer value, + gpointer data) +{ + guint **dest_ptr = data; + **dest_ptr = lmi_job_get_number(key); + (*dest_ptr)++; + return FALSE; +} + +guint *jobmgr_get_running_job_numbers(guint *count) +{ + guint *res = NULL, *ptr; + guint size; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + size = g_tree_nnodes(_running_jobs); + if ((res = g_new(guint, size + 1)) == NULL) + goto err; + ptr = res; + g_tree_foreach(_running_jobs, dup_running_job_number, &ptr); + if (ptr - res < size) + goto res_err; + *ptr = 0; + if (count) + *count = size; + JM_CRITICAL_CRITICAL_END; + + return res; + +res_err: + g_free(res); + res = NULL; +err: + JM_CRITICAL_CRITICAL_END; + lmi_error("Memory allocation failed"); + return res; +} + +struct _JobNameContainer { + const gchar * name; + LmiJob *job; +}; + +static gboolean stop_search_for_job_by_id(gpointer key, + gpointer value, + gpointer data) +{ + LmiJob *job = LMI_JOB(value); + struct _JobNameContainer *jnc = data; + if (!g_strcmp0(lmi_job_get_jobid(job), jnc->name)) { + jnc->job = job; + return TRUE; + } + return FALSE; +} + +LmiJob *jobmgr_get_job_by_id(const gchar *jobid) +{ + struct _JobNameContainer jnc = {jobid, NULL}; + g_assert(_initialized_counter); + g_assert(jobid); + + JM_CRITICAL_CRITICAL_BEGIN; + g_tree_foreach(_job_map, stop_search_for_job_by_id, &jnc); + JM_CRITICAL_CRITICAL_END; + if (jnc.job) + g_object_ref(jnc.job); + + return jnc.job; +} + +struct _JobNumberContainer { + guint number; + LmiJob *job; +}; + +static gboolean stop_search_for_job_by_number(gpointer key, + gpointer value, + gpointer data) +{ + LmiJob *job = LMI_JOB(value); + struct _JobNumberContainer *jnc = data; + if (lmi_job_get_number(job) == jnc->number) { + jnc->job = job; + return TRUE; + } + return FALSE; +} + +LmiJob *jobmgr_get_job_by_number(guint number) +{ + struct _JobNumberContainer jnc = {number, NULL}; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + g_tree_foreach(_job_map, stop_search_for_job_by_number, &jnc); + JM_CRITICAL_CRITICAL_END; + if (jnc.job) + g_object_ref(jnc.job); + + return jnc.job; +} + +/** + * Traversing function for `_job_map` tree. It tries to find job by name and + * assignes it to `_JobNameContainer`. + */ +static gboolean stop_search_for_job_by_name(gpointer key, + gpointer value, + gpointer data) +{ + LmiJob *job = LMI_JOB(value); + struct _JobNameContainer *jnc = data; + if (!g_strcmp0(lmi_job_get_name(job), jnc->name)) { + jnc->job = job; + return TRUE; + } + return FALSE; +} + +LmiJob *jobmgr_get_job_by_name(const gchar *name) +{ + struct _JobNameContainer jnc = {name, NULL}; + g_assert(_initialized_counter); + g_assert(name); + + JM_CRITICAL_CRITICAL_BEGIN; + g_tree_foreach(_job_map, stop_search_for_job_by_name, &jnc); + JM_CRITICAL_CRITICAL_END; + if (jnc.job) + g_object_ref(jnc.job); + + return jnc.job; +} + +CMPIStatus jobmgr_job_to_cim_op(const LmiJob *job, CMPIObjectPath **op) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + gchar *namespace = NULL; + char instance_id[BUFLEN]; + CMPIValue value; + const JobTypeInfo *info; + g_assert(op); + g_assert(LMI_IS_JOB(job)); + g_assert(_initialized_counter > 0); + + if ((info = lookup_job_type_info_for_job(job)) == NULL) { + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_NOT_FOUND, + "Job type is not registered with job manager."); + goto done; + } + if ((namespace = lmi_read_config("CIM", "Namespace")) == NULL) + goto err; + if ((*op = CMNewObjectPath(_cb, namespace, info->cim_class_name, + &status)) == NULL || status.rc) + goto namespace_err; + g_free(namespace); + + g_snprintf(instance_id, BUFLEN, "LMI:%s:%u", info->cim_class_name, + lmi_job_get_number(job)); + + value.string = CMNewString(_cb, instance_id, &status); + if (value.string == NULL || status.rc) + goto op_err; + + if ((status = CMAddKey(*op, "InstanceID", &value, CMPI_string)).rc) + goto string_err; + +done: + return status; + +string_err: + CMRelease(value.string); +op_err: + CMRelease(*op); +namespace_err: + g_free(namespace); +err: + lmi_error("Memory allocation failed"); + if (!status.rc) + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + return status; +} + +CMPIStatus jobmgr_job_to_cim_instance(const LmiJob *job, + CMPIInstance **instance) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + const JobTypeInfo *info; + g_assert(instance); + + if ((info = lookup_job_type_info_for_job(job)) == NULL) { + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_NOT_FOUND, + "Job type is not registered with job manager."); + goto done; + } + + JOB_CRITICAL_BEGIN(job); + status = job_to_cim_instance(job, instance); + if (!status.rc) { + if (info->convert_func && (status = (*info->convert_func) ( + _cb, _cmpi_ctx, job, *instance)).rc) + { + CMRelease(*instance); + instance = NULL; + } + } + JOB_CRITICAL_END(job); + +done: + return status; +} + +CMPIStatus jobmgr_job_to_method_result_op(const LmiJob *job, + const gchar *class_name, + CMPIObjectPath **op) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + gchar *namespace = NULL; + char instance_id[BUFLEN]; + CMPIValue value; + const gchar *cim_class_name = class_name; + gchar buf[BUFLEN]; + g_assert(op); + g_assert(LMI_IS_JOB(job)); + g_assert(_initialized_counter > 0); + + if (class_name == NULL) { + g_snprintf(buf, BUFLEN, "LMI_%sMethodResult", _provider_name); + cim_class_name = buf; + } + + if ((namespace = lmi_read_config("CIM", "Namespace")) == NULL) + goto err; + if ((*op = CMNewObjectPath(_cb, namespace, buf, &status)) == NULL) + goto namespace_err; + g_free(namespace); + namespace = NULL; + + g_snprintf(instance_id, BUFLEN, "LMI:%s:%u", cim_class_name, + lmi_job_get_number(job)); + + value.string = CMNewString(_cb, instance_id, &status); + if (value.string == NULL || status.rc) + goto op_err; + + if ((status = CMAddKey(*op, "InstanceID", &value, CMPI_string)).rc) + goto string_err; + + return status; + +string_err: + CMRelease(value.string); +op_err: + CMRelease(*op); +namespace_err: + g_free(namespace); +err: + lmi_error("Memory allocation failed"); + if (!status.rc) + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + return status; +} + +CMPIStatus jobmgr_job_to_method_result_instance(const LmiJob *job, + const gchar *class_name, + CMPIInstance **instance) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + CMPIObjectPath *op; + CMPIInstance *inst; + CMPIValue value; + CMPIData data; + gchar buf[BUFLEN]; + const gchar *method_name; + g_assert(instance); + g_assert(LMI_IS_JOB(job)); + g_assert(_initialized_counter > 0); + + JOB_CRITICAL_BEGIN(job); + + if ((method_name = lmi_job_get_method_name(job)) == NULL) { + g_snprintf(buf, BUFLEN, "Can't create LMI_MethodResult instance" + " out of job #%u with method-name unset.", lmi_job_get_number(job)); + goto err; + } + + if ((status = jobmgr_job_to_method_result_op(job, class_name, &op)).rc) + goto err; + + if ((inst = CMNewInstance(_cb, op, &status)) == NULL || status.rc) + goto op_err; + + data = CMGetKey(op, "InstanceID", NULL); + value.string = CMClone(data.value.string, &status); + if (value.string == NULL || status.rc) + goto inst_err; + if ((status = CMSetProperty(inst, "InstanceID", &value, CMPI_string)).rc) + goto string_err; + + g_snprintf(buf, BUFLEN, "Result of method %s", + lmi_job_get_method_name(job)); + if ((value.string = CMNewString(_cb, buf, &status)) == NULL) + goto inst_err; + if ((status = CMSetProperty(inst, "Caption", &value, CMPI_string)).rc) + goto string_err; + + g_snprintf(buf, BUFLEN, "Result of asynchronous job #%d created" + " as a result of \"%s\" method's invocation.", + lmi_job_get_number(job), lmi_job_get_method_name(job)); + if ((value.string = CMNewString(_cb, buf, &status)) == NULL) + goto inst_err; + if ((status = CMSetProperty(inst, "Description", &value, CMPI_string)).rc) + goto string_err; + + g_snprintf(buf, BUFLEN, "MethodResult-%u", lmi_job_get_number(job)); + if ((value.string = CMNewString(_cb, buf, &status)) == NULL) + goto inst_err; + if ((status = CMSetProperty(inst, "ElementName", &value, CMPI_string)).rc) + goto string_err; + + if ((status = make_inst_method_call_instance_for_job( + job, TRUE, &value.inst)).rc) + goto inst_err; + if ((status = CMSetProperty(inst, "PreCallIndication", + &value, CMPI_instance)).rc) + { + CMRelease(value.inst); + goto inst_err; + } + + if (lmi_job_is_finished(job)) { + if ((status = make_inst_method_call_instance_for_job( + job, FALSE, &value.inst)).rc) + goto inst_err; + if ((status = CMSetProperty(inst, "PostCallIndication", + &value, CMPI_instance)).rc) + { + CMRelease(value.inst); + goto inst_err; + } + } + + JOB_CRITICAL_END(job); + + *instance = inst; + return status; + +string_err: + CMRelease(value.string); +inst_err: + CMRelease(inst); + *instance = NULL; + goto err; +op_err: + CMRelease(op); +err: + if (!status.rc) { + lmi_error("Memory allocation failed"); + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + } + JOB_CRITICAL_END(job); + return status; +} + +CMPIStatus jobmgr_job_to_cim_error(const LmiJob *job, + CMPIInstance **instance) +{ + gchar *namespace; + CMPIObjectPath *op = NULL; + CMPIStatus status = {CMPI_RC_OK, NULL}; + CMPIValue value; + g_assert(instance != NULL); + + JOB_CRITICAL_BEGIN(job); + if (lmi_job_get_state(job) != LMI_JOB_STATE_ENUM_EXCEPTION) { + /* make the CIMError instance only for failed job */ + JOB_CRITICAL_END(job); + return status; + } + + if ((namespace = lmi_read_config("CIM", "Namespace")) == NULL) { + lmi_error("Memory allocation failed"); + goto err; + } + + op = CMNewObjectPath(_cb, namespace, CIM_ERROR_CLASS_NAME, &status); + if (op == NULL || status.rc) + goto namespace_err; + g_free(namespace); + namespace = NULL; + + if ((*instance = CMNewInstance(_cb, op, &status)) == NULL || status.rc) + goto op_err; + op = NULL; + + + value.uint32 = lmi_job_get_status_code(job); + if ((status = CMSetProperty(*instance, "CIMStatusCode", + &value, CMPI_uint32)).rc) + goto instance_err; + + if ((status = jobmgr_job_to_cim_op(job, &op)).rc) + goto instance_err; + if ((value.string = CMObjectPathToString(op, &status)) == NULL) + goto instance_err; + CMRelease(op); + op = NULL; + if ((status = CMSetProperty(*instance, "ErrorSource", + &value, CMPI_string)).rc) + goto string_err; + + value.uint16 = CIM_ERROR_SOURCE_FORMAT_OBJECT_PATH; + if ((status = CMSetProperty(*instance, "ErrorSourceFormat", + &value, CMPI_uint16)).rc) + goto instance_err; + + value.uint16 = lmi_job_get_error_type(job); + if ((status = CMSetProperty(*instance, "ErrorType", + &value, CMPI_uint16)).rc) + goto instance_err; + + if (lmi_job_get_error(job) != NULL) { + if ((value.string = CMNewString( + _cb, lmi_job_get_error(job), &status)) == NULL) + goto string_err; + if ((status = CMSetProperty(*instance, "Message", + &value, CMPI_string)).rc) + goto string_err; + } + + JOB_CRITICAL_END(job); + + return status; + +string_err: + CMRelease(value.string); +instance_err: + CMRelease(*instance); + *instance = NULL; + JOB_CRITICAL_END(job); +op_err: + if (op) + CMRelease(op); +namespace_err: + g_free(namespace); +err: + if (!status.rc) { + lmi_error("Memory allocation failed"); + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + } + return status; +} + +/** + * Traversing function for `_event_calendar` tree. + */ +static gboolean find_event_by_job(gpointer key, + gpointer value, + gpointer data) +{ + struct _CalendarEventContainer *cec = data; + CalendarEvent *ev = key; + if (ev->job == cec->job) { + cec->matches = g_list_append(cec->matches, ev); + } + return FALSE; +} + +static void find_and_delete_calendar_event(gpointer item, + gpointer unused) +{ + CalendarEvent *ev = item; + g_tree_remove(_event_calendar, ev); +} + +CMPIStatus jobmgr_delete_job(LmiJob *job) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + g_assert(_initialized_counter); + + JM_CRITICAL_CRITICAL_BEGIN; + status = delete_job(job); + JM_CRITICAL_CRITICAL_END; + + return status; +} + +CMPIStatus jobmgr_terminate_job(LmiJob *job) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + const JobTypeInfo *info; + GTask *task; + GCancellable *cancellable; + gchar *jobid = NULL; + char err_buf[BUFLEN] = {0}; + g_assert(_initialized_counter); + + if ((info = lookup_job_type_info_for_job(job)) == NULL) { + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_NOT_FOUND, + "Can't terminate unregistered job!"); + goto err; + } + + if ((jobid = lmi_job_get_jobid(job)) == NULL) + goto memory_err; + + JM_CRITICAL_CRITICAL_BEGIN; + JOB_CRITICAL_BEGIN(job); + if (lmi_job_is_finished(job)) { + snprintf(err_buf, BUFLEN, + "Can't terminate finished job \"%s\"!", jobid); + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_FAILED, err_buf); + goto release_lock_err; + } + if (!info->running_job_cancellable && + lmi_job_get_state(job) == LMI_JOB_STATE_ENUM_RUNNING) + { + snprintf(err_buf, BUFLEN, + "Can't terminate running job \"%s\"!", jobid); + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_NOT_SUPPORTED, err_buf); + goto release_lock_err; + } + + if ((task = g_tree_lookup(_running_jobs, job)) == NULL) { + /* job has not been started yet */ + if (!lmi_job_finish_terminate(job)) { + snprintf(err_buf, BUFLEN, + "Failed to terminate job \"%s\"!", jobid); + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_FAILED, err_buf); + goto release_lock_err; + } + } else { + lmi_debug("Cancelling job \"%s\".", jobid); + cancellable = g_task_get_cancellable(task); + g_cancellable_cancel(cancellable); + } + lmi_info("Job #%u (id=%s) terminated successfully.", + lmi_job_get_number(job), jobid); + + JOB_CRITICAL_END(job); + JM_CRITICAL_CRITICAL_END; + + lmi_job_wait_until_finished(job); + if (lmi_job_get_state(job) != LMI_JOB_STATE_ENUM_TERMINATED) { + snprintf(err_buf, BUFLEN, + "Failed to terminate job \"%s\"!", jobid); + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_FAILED, err_buf); + goto release_lock_err; + } + + g_free(jobid); + return status; + +release_lock_err: + JOB_CRITICAL_END(job); + JM_CRITICAL_CRITICAL_END; +memory_err: + if (err_buf[0]) { + lmi_error(err_buf); + } else { + lmi_error("Memory allocation failed"); + } + if (!status.rc) + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + g_free(jobid); +err: + return status; +} + +struct _JobTypeSearchContainer { + const gchar *cim_class_name; + GType job_type; +}; + +static gboolean find_job_type_for_cim_class_name(gpointer key, + gpointer value, + gpointer data) +{ + GType job_type = (GType) GPOINTER_TO_UINT(key); + const JobTypeInfo *info = value; + struct _JobTypeSearchContainer *jtsc = data; + + if (!g_strcmp0(info->cim_class_name, jtsc->cim_class_name)) { + jtsc->job_type = job_type; + return TRUE; + } + return FALSE; +} + +CMPIStatus jobmgr_get_job_matching_op(const CMPIObjectPath *path, + LmiJob **job) +{ + CMPIStatus status = {CMPI_RC_OK, NULL}; + CMPIData data; + CMPIString *str; + const char *instance_id; + const char *class_name_start; + const char *class_name_end; + guint64 number; + gchar *endptr; + LmiJob *jobptr = NULL; + struct _JobTypeSearchContainer jtsc = {NULL, G_TYPE_INVALID}; + const JobTypeInfo *info; + gchar error[BUFLEN]; + g_assert(_initialized_counter); + + if ((str = CMGetClassName(path, &status)) == NULL) + return status; + data = CMGetKey(path, "InstanceID", &status); + if (status.rc) + return status; + if (data.state != (CMPI_keyValue | CMPI_goodValue) || + data.type != CMPI_string) + { + g_snprintf(error, BUFLEN, "InstanceID must be non-empty string."); + goto err; + } + + instance_id = CMGetCharsPtr(data.value.string, NULL); + if (g_ascii_strncasecmp(LMI_ORGID ":", instance_id, strlen(LMI_ORGID) + 1)) { + g_snprintf(error, BUFLEN, + "Invalid InstanceID \"%s\" of job object path.", instance_id); + goto err; + } + class_name_start = instance_id + strlen(LMI_ORGID) + 1; + class_name_end = index(class_name_start, ':'); + if (class_name_end == NULL) { + g_snprintf(error, BUFLEN, + "Invalid InstanceID \"%s\" of job object path" + " (expected ':' after class name).", instance_id); + goto err; + } + if ((jtsc.cim_class_name = g_strndup(class_name_start, + class_name_end - class_name_start)) == NULL) + { + lmi_error("Memory allocation failed."); + CMSetStatus(&status, CMPI_RC_ERR_FAILED); + goto err; + } + + g_tree_foreach(_job_type_map, find_job_type_for_cim_class_name, &jtsc); + + if (jtsc.job_type == G_TYPE_INVALID) { + lmi_error("Class name \'%s\" not found in %u items!", + jtsc.cim_class_name, g_tree_nnodes(_job_type_map)); + g_snprintf(error, BUFLEN, + "Class name \"%s\" in job's InstanceID does not belong" + " to any known job type.", jtsc.cim_class_name); + goto err; + } + + number = g_ascii_strtoull(class_name_end + 1, &endptr, 10); + if (number == G_MAXUINT64 || number == 0 || number > (guint64) G_MAXINT32) { + g_snprintf(error, BUFLEN, + "Missing or invalid job number in job's InstanceID" + " \"%s\".", instance_id); + goto err; + } + + if (*endptr != '\0') { + g_snprintf(error, BUFLEN, + "Junk after job number in job's InstanceID \"%s\".", + instance_id); + goto err; + } + + if ((jobptr = jobmgr_get_job_by_number(number)) == NULL) { + g_snprintf(error, BUFLEN, "Job #%u does not exist.", (guint32) number); + goto err; + } + + info = lookup_job_type_info_for_job(jobptr); + if (g_ascii_strcasecmp(info->cim_class_name, jtsc.cim_class_name)) { + g_snprintf(error, BUFLEN, + "No job matching given InstanceID \"%s\".", instance_id); + goto err; + } + g_free((gchar *) jtsc.cim_class_name); + + if (job) { + *job = jobptr; + } else { + g_object_unref(jobptr); + } + + return status; + +err: + g_clear_object(&jobptr); + g_free((gchar *) jtsc.cim_class_name); + if (!status.rc) { + if (error[0]) { + CMSetStatusWithChars(_cb, &status, CMPI_RC_ERR_NOT_FOUND, error); + } else { + CMSetStatus(&status, CMPI_RC_ERR_NOT_FOUND); + } + } + if (error[0]) + lmi_warn(error); + return status; +} diff --git a/src/libs/jobmanager/job_manager.h b/src/libs/jobmanager/job_manager.h new file mode 100644 index 0000000..1dd0107 --- /dev/null +++ b/src/libs/jobmanager/job_manager.h @@ -0,0 +1,483 @@ +/* + * Copyright (C) 2014 Red Hat, Inc. All rights reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: Michal Minar <miminar@redhat.com> + */ + +/** + * Job Manager module. + * + * Typical usage: + * 1. Register callbacks for all type of jobs you want to support. + * If just one job type is enough, your registration will look + * like this: + * + * jobmgr_register_job_type( + * LMI_TYPE_JOB, // job type + * "LMI_SoftwareJob", // CIM class name + * job_to_cim_cb, // conversion function + * // function converting in/out parameters to CIM values + * make_job_params_cb, + * // return value job of async method + * METHOD_RESULT_VALUE_TYPE_UINT32, + * job_process_cb, // job processing function + * // whether the jobs can be processed in parallel + * true, + * // whether to make the jobs persistent + * true); + * + * + * 2. Place `jobmgr_init()` to your provider's init. + * 3. Define your callbacks - see descriptions of + * `JobToCimInstanceCallback` and `JobProcessCallback`. + * + * 4. In your implementation of asynchronous method: + * + * 1. Check input arguments. + * 2. Create a job with `jobmgr_new_job()`. + * 3. Set its method name and input parameters according + * to method being implemented. Optionally set other + * properties such as priority, name, etc. + * 4. Enqueue the job for execution with `jobmgr_run_job()`. + * 5. Return standard value for "Asynchronous Job Started". + * + * 5. Place jobmgr_cleanup()` to your provider's cleanup. + */ + +#ifndef JOB_MANAGER_H +#define JOB_MANAGER_H + +#include <cmpimacs.h> +#include "lmi_job.h" + +#ifndef JOB_MANAGER_PERSISTENT_STORAGE_PREFIX + #define JOB_MANAGER_PERSISTENT_STORAGE_PREFIX "/var/lib/openlmi-providers/jobs" +#endif + +#define CIM_CONCRETE_JOB_JOB_STATE_NEW 2 +#define CIM_CONCRETE_JOB_JOB_STATE_STARTING 3 +#define CIM_CONCRETE_JOB_JOB_STATE_RUNNING 4 +#define CIM_CONCRETE_JOB_JOB_STATE_SUSPENDED 5 +#define CIM_CONCRETE_JOB_JOB_STATE_SHUTTING_DOWN 6 +#define CIM_CONCRETE_JOB_JOB_STATE_COMPLETED 7 +#define CIM_CONCRETE_JOB_JOB_STATE_TERMINATED 8 +#define CIM_CONCRETE_JOB_JOB_STATE_KILLED 9 +#define CIM_CONCRETE_JOB_JOB_STATE_EXCEPTION 10 +#define CIM_CONCRETE_JOB_JOB_STATE_SERVICE 11 +#define CIM_CONCRETE_JOB_JOB_STATE_QUERY_PENDING 12 + +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_UNKNOWN 0 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_OTHER 1 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_OK 2 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_DEGRADED 3 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_STRESSED 4 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_PREDICTIVE_FAILURE 5 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_ERROR 6 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_NON_RECOVERABLE_ERROR 7 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_STARTING 8 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_STOPPING 9 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_STOPPED 10 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_IN_SERVICE 11 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_NO_CONTACT 12 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_LOST_COMMUNICATION 13 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_ABORTED 14 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_DORMANT 15 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_SUPPORTING_ENTITY_IN_ERROR 16 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_COMPLETED 17 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_POWER_MODE 18 +#define CIM_CONCRETE_JOB_OPERATIONAL_STATUS_RELOCATING 19 + +/** + * Type of value that can be set to result property of job. This value is used + * in CIM_InstMethodCall.ReturnValueType property of associated + * LMI_MethodResult instance. + */ +typedef enum { + METHOD_RESULT_VALUE_TYPE_BOOLEAN, + METHOD_RESULT_VALUE_TYPE_STRING, + METHOD_RESULT_VALUE_TYPE_CHAR16, + METHOD_RESULT_VALUE_TYPE_UINT8, + METHOD_RESULT_VALUE_TYPE_SINT8, + METHOD_RESULT_VALUE_TYPE_UINT16, + METHOD_RESULT_VALUE_TYPE_SINT16, + METHOD_RESULT_VALUE_TYPE_UINT32, + METHOD_RESULT_VALUE_TYPE_SINT32, + METHOD_RESULT_VALUE_TYPE_UINT64, + METHOD_RESULT_VALUE_TYPE_SINT64, + METHOD_RESULT_VALUE_TYPE_DATETIME, + METHOD_RESULT_VALUE_TYPE_REAL32, + METHOD_RESULT_VALUE_TYPE_REAL64, + METHOD_RESULT_VALUE_TYPE_REFERENCE, + METHOD_RESULT_VALUE_TYPE_LAST, +}MethodResultValueTypeEnum; + +/** + * Callback that converts job instance to instance of `CIM_ConcreteJob`. Some + * properties are already set. Modify them at will. This callback shall + * be used at least to set `JobInParameters` and `JobOutParameters` properties. + * + * This callback is invoked anytime a new indication is about to be created and + * sent. + * + * These properties will be already set: + * + * * InstanceID + * * CommunicationStatus + * * DeleteOnCompletion + * * ElapsedTime + * * ElementName + * * ErrorCode + * * JobState + * * JobStatus + * * LocalOrUtcTime + * * MethodName + * * Name + * * PercentComplete + * * OperationalStatus + * * Priority + * * StartTime + * * TimeBeforeRemoval + * * TimeOfLastStateChange + * * TimeSubmitted + */ +typedef CMPIStatus (*JobToCimInstanceCallback) (const CMPIBroker *cb, + const CMPIContext *ctx, + const LmiJob *job, + CMPIInstance *instance); + +/** + * Callback that fills properties of `__MethodResult_<MethodName>` instance + * with regard to given job. This callback is called when making an instance of + * `LMI_ConreteJob` to create values of properties `JobInParameters` and + * `JobOutParameters`. And also when making an instance of `CIM_InstMethodCall` + * to get a value of `MethodParameters` property. The latter is contained in + * `PostCallIndication` and `PreCallIndication` properties of associated job + * method result instance. + * + * @param include_input Whether the input parameters of asynchronous method + * shall be set. + * @param include_output Whether the output parameters of asynchronous method + * shall be set. + */ +typedef CMPIStatus (*MakeJobParametersCallback) (const CMPIBroker *cb, + const CMPIContext *ctx, + const LmiJob *job, + gboolean include_input, + gboolean include_output, + CMPIInstance *instance); + +/** + * Callback invoked when job is run. It is a synchronous operation that: + * + * 1. starts an operation on back-end + * 2. sets job's `jobid` obtained from back-end + * 3. monitors the process and updates `percent-complete` property of job + * 4. when the job is about to be successfully completed, its job + * output parameters need to be properly filled; this needs to be + * done before a call to `lmi_job_finish_ok()` + * 5. calls one of `lmi_job_finish_ok()`, `lmi_job_finish_exception()`, + * `lmi_job_finish_terminate()` + * + * If *concurrent_processing* is on, multiple callbacks can be run + * in parallel. + * + * @note This callback can be called twice or more times for the same job if + * the provider is restarted during its execution. Therefore you should + * check for job's status and especially jobid to see, whether this is + * a job that may have already finished. + */ +typedef void (*JobProcessCallback) (LmiJob *job, GCancellable *cancellable); + +/** + * Used in `jobmgr_init` to register job types. + */ +typedef struct _JobTypeEntry { + GType job_type; /*!< Type representing a subclass of `LmiJob` + * that shall be handled by job manager. */ + const gchar *cim_class_name;/*!< Name of CIM class of generated CIM + * instances representing jobs. If `NULL`, + * `LMI_ConcreteJob` will be used. */ + JobToCimInstanceCallback convert_func; /*!< A callback that should set + * additional CIM instance's properties. Invoked + * right before an indication is sent. May be `NULL`. */ + MakeJobParametersCallback make_job_params_func; + /*!< A callback that should fill input and output + * parameters of job's asynchronous method to prepaired + * method. May be `NULL`. */ + MethodResultValueTypeEnum return_value_type; + /*!< Type of value that can be set to result property + * of job. This value is used in + * CIM_InstMethodCall.ReturnValueType property of + * associated LMI_MethodResult instance. */ + JobProcessCallback process_func; /*!< A callback that should execute + * given job and finally set it to final state. */ + gboolean running_job_cancellable; /*!< Does a job support cancellation? + *Â If yes, *process_func* will be passed *cancellable* + * object that needs to be checked during job's + * processing for cancelled state. It enables the use + * of `jobmgr_terminate_job()` function. */ + gboolean use_persistent_storage; /*!< Whether to write job's state to + *Â file under persistent storage directory whenever it + * gets modified or created. Persistent storage is read + * upon job manager's initialization to populate + * internal structures with saved jobs. */ +}JobTypeEntry; + +/** + * Initialize job manager. Call this function in your provider's initialization + * function if you need to use job manager. Indication sender is initialized + * by job manager as well. + * + * @note This needs to be called after `lmi_init()` and after all the job types + * are registered with `jobmgr_register_job_type()`. + * + * @param provider_name Name of provider. It should not contain any special + * characters because it is used as a directory name in job persistent + * storage path. For example if you implement *Software* provider, pass + * "Software" here. The same string is then used for indication filters + * registration with indication sender. + * @param concurrent_processing Says whether multiple jobs can be run in + * parallel. If `false`, it will be ensured that at most one job can be in + * *RUNNING* state. The others will stay *NEW* until currently running job + * completes. + * @param job_type_entries Optional job type information. If NULL or empty, it's + * expected that job types were already registered with + * `jobmgr_register_job_type()`. + * @param n_job_types Number of entries in `job_type_entries` array. + */ +CMPIStatus jobmgr_init(const CMPIBroker *cb, + const CMPIContext *ctx, + const gchar *provider_name, + gboolean concurrent_processing, + const JobTypeEntry *job_type_entries, + gsize n_job_types); + +/** + * Cleanup job manager. Include this in you provider's cleanup handler if your + * init called `jobmgr_init()`. + */ +CMPIStatus jobmgr_cleanup(void); + +/** + * Provider may define subclasses of `LmiJob` if it needs to differentiate + * between jobs invoked from different CIM methods. Typical provider will + * register just one `job_type` - `LmiJob` in particular. + * + * It also takes care of static filter registrations with indication sender. + * @see ind_sender_add_static_filters(). + * + * This shall be called before the `jobmgr_init()` at least once. + * + * @param job_type Type representing a subclass of `LmiJob` that shall be + * handled by job manager. + * @param cim_class_name Name of CIM class of generated CIM instances + * representing jobs. If `NULL`, `LMI_ConcreteJob` will be used. + * @param convert_func A callback that should set additional CIM instance's + * properties. Invoked right before an indication is sent. + * @param make_job_params_func A callback that should fill input and output + * parameters of job's asynchronous method to prepaired method. May be + * `NULL`. + * @param return_value_type Type of value that can be set to result property of + * job. This value is used in CIM_InstMethodCall.ReturnValueType property + * of associated LMI_MethodResult instance. + * @param process_func A callback that should execute given job and finally + * set it to final state. + * @param running_job_cancellable Does a job support cancellation? If yes, + * *process_func* will be passed *cancellable* object that needs to be checked + * during job's processing for cancelled state. It enables the use of + * `jobmgr_terminate_job()` function. + * @param use_persistent_storage Whether to write job's state to file under + * persistent storage directory whenever it gets modified or created. + * Persistent storage is read upon job manager's initialization to populate + * internal structures with saved jobs. + */ +CMPIStatus jobmgr_register_job_type(GType job_type, + const gchar *cim_class_name, + JobToCimInstanceCallback convert_func, + MakeJobParametersCallback make_job_params_func, + MethodResultValueTypeEnum return_value_type, + JobProcessCallback process_func, + gboolean running_job_cancellable, + gboolean use_persistent_storage); + +/** + * Create a new job. Caller should then set at least *method-name* property and + * input parameters and then run it with `job_mgr_run_job()`. + * + * @note You should decrease its reference count with `g_object_unref()` after + * you are done with it. + */ +LmiJob *jobmgr_new_job(GType job_type); + +/** + * This needs to be called to run the newly created job after the essential + * properties are set. Be sure not to modify job's state before calling this function. + * + * Depending on *concurrent_processing* option and number of pending jobs, given + * job will be started either immediately or will be appended to pending ones. + */ +CMPIStatus jobmgr_run_job(LmiJob *job); + +/** + * Get jobids of all known jobs. + * + * @note that jobid is not constant for the whole life of job. + * Prefer `jobmgr_get_job_numbers()`. + * + * Release the result with `g_strfreev()`. + */ +gchar **jobmgr_get_job_ids(guint *count); + +/** + * Get numbers of all known jobs. + * + * Release the result with `g_free()`. + */ +guint *jobmgr_get_job_numbers(guint *count); + +/** + * Get jobids of all pending jobs. Those in *NEW* state that were requested to + * be run with `jobmgr_run_job()`. + * + * @note that jobid is not constant for the whole life of job. + * Prefer `jobmgr_get_job_pending_job_ids()`. + * + * Release the result with `g_strfreev()`. + */ +gchar **jobmgr_get_pending_job_ids(guint *count); + +/** + * Get numbers of all pending jobs. Those in *NEW* state that were requested to + * be run with `jobmgr_run_job()`. + * + * Release the result with `g_free()`. + */ +guint *jobmgr_get_pending_job_numbers(guint *count); + +/** + * Get jobids of all running jobs. Those in *RUNNING* state. + * + * @note that jobid is not constant for the whole life of job. + * Prefer `jobmgr_get_job_running_job_ids()`. + * + * Release the result with `g_strfreev()`. + */ +gchar **jobmgr_get_running_job_ids(guint *count); + +/** + * Get numbers of all running jobs. Those in *RUNNING* state. + * + * Release the result with `g_free()`. + */ +guint *jobmgr_get_running_job_numbers(guint *count); + +/** + * Get a job with given `jobid`. + * + * @note You should decrease its reference count with `g_object_unref()` after + * you are done with it. + */ +LmiJob *jobmgr_get_job_by_id(const gchar *jobid); + +/** + * Get a job with given number. + * + * @note You should decrease its reference count with `g_object_unref()` after + * you are done with it. + */ +LmiJob *jobmgr_get_job_by_number(guint number); + +/** + * Get a job with given name. + * + * @note Name property is freely modifiable by client, thus it may not be + * unique. This function stops the search upon first matching job. + * + * @note You should decrease its reference count with `g_object_unref()` after + * you are done with it. + */ +LmiJob *jobmgr_get_job_by_name(const gchar *name); + +/** + * Delete the job. Only completed jobs can be deleted. + * + * @note Don't forget to unreference the job yourself after a call to this + * function. + */ +CMPIStatus jobmgr_delete_job(LmiJob *job); + +/** + * Terminate running job. This may succeed only if the *running_cancellable* flag + * was set during job type's registration. + */ +CMPIStatus jobmgr_terminate_job(LmiJob *job); + +/****************************************************************************** + * CIM related functionality + *****************************************************************************/ + +/** + * Get a CIM object path for a job. + */ +CMPIStatus jobmgr_job_to_cim_op(const LmiJob *job, CMPIObjectPath **op); + +/** + * Convert a job to CIM instance. + * + * This calls `convert_func()` callback provided during job type's registration + * to fill additional properties. + */ +CMPIStatus jobmgr_job_to_cim_instance(const LmiJob *job, + CMPIInstance **instance); + +/** + * Get an object path of `LMI_MethodResult` for given job. + * + * @param class_name: A desired cim class name of resulting method result + * instance. If `NULL`, it will be set to + * `LMI_<provider_name>MethodResult`. + */ +CMPIStatus jobmgr_job_to_method_result_op(const LmiJob *job, + const gchar *class_name, + CMPIObjectPath **op); + +/** + * Make an instance of `LMI_MethodResult` for given job. + * + * @param class_name: A desired cim class name of resulting method result + * instance. If `NULL`, it will be set to + * `LMI_<provider_name>MethodResult`. + * @param instance: Pointer to a pointer where newly allocated method result + * instance will be stored. + */ +CMPIStatus jobmgr_job_to_method_result_instance(const LmiJob *job, + const gchar *class_name, + CMPIInstance **instance); + +/** + * Make a `CIM_Error` instance corresponding to failed job. + * + * If a job is not in `EXCEPTION` state, `instance` parameter won't be touched. + */ +CMPIStatus jobmgr_job_to_cim_error(const LmiJob *job, CMPIInstance **instance); + +/** + * Find and get job matching object path. + */ +CMPIStatus jobmgr_get_job_matching_op(const CMPIObjectPath *path, LmiJob **job); + +#endif /* end of include guard: JOB_MANAGER_H */ diff --git a/src/libs/jobmanager/lmi_job.c b/src/libs/jobmanager/lmi_job.c new file mode 100644 index 0000000..41a91d4 --- /dev/null +++ b/src/libs/jobmanager/lmi_job.c @@ -0,0 +1,2094 @@ +/* + * Copyright (C) 2014 Red Hat, Inc. All rights reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: Michal Minar <miminar@redhat.com> + */ + +#include <string.h> +#include <time.h> +#include <pthread.h> +#include <json-glib/json-glib.h> +#include "openlmi.h" +#include "openlmi-utils.h" +#include "lmi_job.h" + +#define DEFAULT_PRIORITY 128 +#define DEFAULT_TIME_BEFORE_REMOVAL 5 * 60 +#define MINIMUM_TIME_BEFORE_REMOVAL 10 +#define DEFAULT_DELETE_ON_COMPLETION TRUE +#define IS_FINAL_STATE(state) \ + ( state == LMI_JOB_STATE_ENUM_COMPLETED \ + || state == LMI_JOB_STATE_ENUM_TERMINATED \ + || state == LMI_JOB_STATE_ENUM_EXCEPTION) + +/* Forward declarations */ +static void lmi_job_finalize(GObject *object); +static void lmi_job_serializable_iface_init(JsonSerializableIface *iface); + +#define LMI_JOB_GET_PRIVATE(o) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((o), LMI_TYPE_JOB, LmiJobPrivate)) + +#define PRIV_CRITICAL_CRITICAL_BEGIN(priv) \ + { \ + pthread_t _thread_id = pthread_self(); \ + _clog(CLOG_COLOR_GREEN, "[tid=%lu] locking job %u", \ + _thread_id, priv->number); \ + pthread_mutex_lock(&priv->lock); \ + _clog(CLOG_COLOR_GREEN, "[tid=%lu] locked job %u", \ + _thread_id, priv->number); \ + } + +#define PRIV_CRITICAL_CRITICAL_END(priv) \ + { \ + pthread_t _thread_id = pthread_self(); \ + _clog(CLOG_COLOR_GREEN, "[tid=%lu] unlocking job %u", \ + _thread_id, priv->number); \ + pthread_mutex_unlock(&priv->lock); \ + _clog(CLOG_COLOR_GREEN, "[tid=%lu] unlocked job %u", \ + _thread_id, priv->number); \ + } + +/** + * Publicly accessible properties. + */ +enum _PublicProperties { + PROP_0, + PROP_NUMBER, + /* !< Constant, unique, typed as uint32 accessible as "number". + */ + PROP_NAME, /*!< String property initially set to `NULL`. It's modifiable + * by cim client. Accessible as "name". */ + PROP_JOBID, /*!< String property initially set to `NULL`. Job processing + * function should set it to value identifying job in backend. + * Note that `lmi_job_get_jobid()` never returns `NULL` unless + * memory error occurs. Job number is returned as a fallback + * value. Accessible as "jobid". */ + PROP_STATE, /*!< State of job typed as `LmiJobStateEnum`. Accessible as + * "state". */ + PROP_PRIORITY, + /*!< Priority of job typed as uint32. Modifiable by client. + * Defaults to `DEFAULT_PRIORITY`. Accessible as "priority". */ + PROP_METHOD_NAME, + /*!< String property holding a name of asynchronous method that + * created the job. Accessible as "method-name". */ + PROP_TIME_SUBMITTED, + /*!< Time of job's creation as a number of seconds since the + * Epoch. Accessible as "time-submitted". */ + PROP_TIME_BEFORE_REMOVAL, + /*!< Number of seconds the job is kept alive after its + * completion. Typed as int64. Accessible as + * "time-before-removal". */ + PROP_TIME_OF_LAST_STATE_CHANGE, + /*!< Time of job's last state change as a number of seconds + * since the Epoch. Accessible as "time-of-last-state-change". + */ + PROP_START_TIME, + /*!< Time the job was started as a number of seconds since the + * Epoch. Accessible as "start-time". */ + PROP_DELETE_ON_COMPLETION, + /*!< Boolean property saying whether the job shall be + * automatically deleted after its completion. Job is not + * deleted immediatelly. "time-before-removal" can be used to + * modify a time interval. Accessible as + * "delete-on-completion". */ + PROP_PERCENT_COMPLETE, + /*!< Uint32 property ranging from 0 to 100. Accessible as + * "percent-complete". */ + PROP_STATUS_CODE, + /*!< CIM status code property set upon error. It's set together + * with state *EXCEPTION*. Accessible as "status-code". + * Defaults to `CIM_ERR_FAILED` for job in `EXCEPTION` state. + * Please refer to `CIM_Error.CIMStatusCode` description in + * CIM schema. */ + PROP_RESULT, + /*!< Result of operation. Typed as `GVariant` without any + * limitation to type it can hold. Accessible as "result". + * Defaults to `NULL` which is represented by GVariant of type + * `G_VARIANT_TYPE_HANDLE` with value 0 when using getter and + * setter of gobject framework. `lmi_job_get_result()` returns + * `NULL` instead. This property can only be set together with + * state *COMPLETED*. */ + PROP_ERROR_TYPE, + /*!< Uint32 property holding error type. Accessible as + * "error-type". It defaults to `Unknown` (0). Please refer to + * `CIM_Error.ErrorType` description in CIM schema for more + * information. */ + PROP_ERROR, /*!< String property holding an error message. Accessible as + * "error". It can only be set together with state *EXCEPTION*. + */ + PROP_LAST, +}; + +#define PROP_IN_PARAMETERS (PROP_LAST + 0) +#define PROP_OUT_PARAMETERS (PROP_LAST + 1) + +/** + * Privately accessible properties. + * These are meant to be accessed only by json serializer. + */ +enum _PrivateProperties { + PROP_PRIVATE_IN_PARAMETERS, + /*!< Private input properties of asynchronous method that + * created particular job. They are stored in hash table with + * name of property as a key and `GVariant` as a value. + * Accessible as "in-parameters" only through + * `JsonSerializableIface` interface. */ + PROP_PRIVATE_OUT_PARAMETERS, + /*! Private output properties. Accessible as "out-parameters". */ + PROP_PRIVATE_LAST, +}; + +#define PROP_IN_PARAMETERS_NAME \ + s_prop_private_names[PROP_PRIVATE_IN_PARAMETERS] +#define PROP_OUT_PARAMETERS_NAME \ + s_prop_private_names[PROP_PRIVATE_OUT_PARAMETERS] + +enum { + SIGNAL_MODIFIED, + SIGNAL_STATE_CHANGED, + SIGNAL_FINISHED, + SIGNAL_PRIORITY_CHANGED, + SIGNAL_DELETION_REQUEST_CHANGED, + SIGNAL_LAST +}; + +/* Used to deserialize result property. First string is a variant type, second + * is its value serialized with `g_variant_print`. */ +#define PARAM_VARIANT_TYPE_STR "{ss}" +/* This is an dictionary entry where key is a name of input/output parameter + * and value is another dictionary entry matching `PARAM_VARIANT_TYPE_STR` above. */ +#define PARAMS_ENTRY_VARIANT_TYPE_STR "{sv}" +/* This is variant type used for serialization of input/output parameters. */ +#define PARAMS_VARIANT_TYPE_STR ("a" PARAMS_ENTRY_VARIANT_TYPE_STR) +#define PARAMS_VARIANT_TYPE \ + ((const GVariantType *) PARAMS_VARIANT_TYPE_STR) + +/** + * Array of job signals. + */ +static guint _signals[SIGNAL_LAST] = { 0 }; +/** + * Guard for `_last_job_number`. + */ +static pthread_mutex_t _job_number_lock = PTHREAD_MUTEX_INITIALIZER; +/** + * Number of newest job created. It is automatically incremented after each + * newly created job. 0 is not a valid job number. This variable shall be + * accessed only in critical section guarded with `_last_job_number`. + */ +static guint _last_job_number = 0; +/** + * Storage for private properties which are not stored in LmiJobClass. + */ +static GParamSpec *_private_prop_specs[PROP_PRIVATE_LAST] = { NULL }; + +struct _LmiJobPrivate { + pthread_mutex_t lock; + guint number; + gchar *name; + gchar *jobid; + LmiJobStateEnum state; + guint priority; + gchar *method_name; + time_t time_submitted; + time_t time_before_removal; + time_t time_of_last_state_change; + time_t start_time; + gboolean delete_on_completion; + GHashTable *in_parameters; + GHashTable *out_parameters; + guint percent_complete; + GVariant *result; + LmiJobStatusCodeEnum status_code; + LmiJobErrorTypeEnum error_type; + gchar *error; +}; + +/** + * Define LMI_TYPE_JOB implementing JsonSerializableIface interface. + */ +G_DEFINE_TYPE_WITH_CODE(LmiJob, lmi_job, G_TYPE_OBJECT, + G_IMPLEMENT_INTERFACE(JSON_TYPE_SERIALIZABLE, + lmi_job_serializable_iface_init)); + +static const gchar *s_job_state_enum_names[] = { + "new", "running", "completed", "terminated", "exception" }; +static const gchar *s_job_prop_enum_names[] = { + "name", "jobid", "state", "priority", "method-name", + "time-before-removal", "time-of-last-state-change", "start-time", + "delete-on-completion", "percent-complete", "status-code", + "result", "error-type", "error" }; +static const gchar *s_prop_private_names[] = { + "in-parameters", "out-parameters" }; + +static void lmi_job_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + LmiJob *job = LMI_JOB(object); + LmiJobPrivate *priv = job->priv; + GVariant *tmp; + + switch (prop_id) { + case PROP_NUMBER: + g_value_set_uint(value, priv->number); + break; + case PROP_NAME: + g_value_set_string(value, priv->name); + break; + case PROP_JOBID: + g_value_set_string(value, priv->jobid); + break; + case PROP_STATE: + g_value_set_uint(value, priv->state); + break; + case PROP_PRIORITY: + g_value_set_uint(value, priv->priority); + break; + case PROP_METHOD_NAME: + g_value_set_string(value, priv->method_name); + break; + case PROP_TIME_SUBMITTED: + g_value_set_int64(value, priv->time_submitted); + break; + case PROP_TIME_BEFORE_REMOVAL: + g_value_set_int64(value, priv->time_before_removal); + break; + case PROP_TIME_OF_LAST_STATE_CHANGE: + g_value_set_int64(value, priv->time_of_last_state_change); + break; + case PROP_START_TIME: + g_value_set_int64(value, priv->start_time); + break; + case PROP_DELETE_ON_COMPLETION: + g_value_set_boolean(value, priv->delete_on_completion); + break; + case PROP_PERCENT_COMPLETE: + g_value_set_uint(value, priv->percent_complete); + break; + case PROP_STATUS_CODE: + g_value_set_uint(value, priv->status_code); + break; + case PROP_RESULT: + if (priv->result) { + tmp = priv->result; + } else { + tmp = g_variant_new_handle(0); + } + g_value_set_variant(value, tmp); + break; + case PROP_ERROR_TYPE: + g_value_set_uint(value, priv->error_type); + break; + case PROP_ERROR: + g_value_set_string(value, priv->error); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void lmi_job_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + LmiJob *job = LMI_JOB(object); + LmiJobPrivate *priv = job->priv; + GVariant *variant; + GValue defval = G_VALUE_INIT; + + PRIV_CRITICAL_CRITICAL_BEGIN(priv); + switch (prop_id) { + case PROP_NUMBER: + priv->number = MAX(priv->number, g_value_get_uint(value)); + pthread_mutex_lock(&_job_number_lock); + if (priv->number > _last_job_number) + { + _last_job_number = priv->number; + } + pthread_mutex_unlock(&_job_number_lock); + break; + case PROP_NAME: + lmi_job_set_name(job, g_value_get_string(value)); + break; + case PROP_JOBID: + lmi_job_set_jobid(job, g_value_get_string(value)); + break; + case PROP_STATE: + priv->state = g_value_get_uint(value); + break; + case PROP_PRIORITY: + lmi_job_set_priority(job, g_value_get_uint(value)); + break; + case PROP_METHOD_NAME: + lmi_job_set_method_name(job, g_value_get_string(value)); + break; + case PROP_TIME_SUBMITTED: + g_value_init(&defval, G_TYPE_INT64); + g_value_copy(value, &defval); + if (!g_param_value_defaults(pspec, &defval)) + priv->time_submitted = g_value_get_int64(value); + break; + case PROP_TIME_BEFORE_REMOVAL: + lmi_job_set_time_before_removal(job, g_value_get_int64(value)); + break; + case PROP_TIME_OF_LAST_STATE_CHANGE: + g_value_init(&defval, G_TYPE_INT64); + g_value_copy(value, &defval); + if (!g_param_value_defaults(pspec, &defval)) + priv->time_of_last_state_change = g_value_get_int64(value); + break; + case PROP_START_TIME: + g_value_init(&defval, G_TYPE_INT64); + g_value_copy(value, &defval); + if (!g_param_value_defaults(pspec, &defval)) + priv->start_time = g_value_get_int64(value); + break; + case PROP_DELETE_ON_COMPLETION: + lmi_job_set_delete_on_completion(job, g_value_get_boolean(value)); + break; + case PROP_PERCENT_COMPLETE: + lmi_job_set_percent_complete(job, g_value_get_uint(value)); + break; + case PROP_STATUS_CODE: + priv->status_code = g_value_get_uint(value); + break; + case PROP_RESULT: + variant = g_value_get_variant(value); + if (priv->result != variant) { + if (priv->result) { + g_variant_unref(priv->result); + } + if ((g_variant_is_of_type(variant, G_VARIANT_TYPE_HANDLE) && + g_variant_get_handle(variant) == 0) || variant == NULL) + { + priv->result = NULL; + } else { + priv->result = g_variant_ref(variant); + } + } + break; + case PROP_ERROR_TYPE: + lmi_job_set_error_type(job, g_value_get_uint(value)); + break; + case PROP_ERROR: + priv->error = g_strdup(g_value_get_string(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } + PRIV_CRITICAL_CRITICAL_END(priv); +} + +/** + * Marshaller used for `modified()` signal. + */ +static void g_cclosure_marshal_VOID__UINT_VARIANT_VARIANT( + GClosure *closure, + GValue *return_value G_GNUC_UNUSED, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint G_GNUC_UNUSED, + gpointer marshal_data) +{ + typedef void (*GMarshalFunc_VOID__UINT_VARIANT_VARIANT) ( + gpointer data1, + guint arg_1, + GVariant *arg_2, + GVariant *arg_3, + gpointer data2); + register GMarshalFunc_VOID__UINT_VARIANT_VARIANT callback; + register GCClosure *cc = (GCClosure*) closure; + register gpointer data1, data2; + + g_return_if_fail (n_param_values == 4); + + if (G_CCLOSURE_SWAP_DATA (closure)) { + data1 = closure->data; + data2 = g_value_peek_pointer(param_values + 0); + } else { + data1 = g_value_peek_pointer(param_values + 0); + data2 = closure->data; + } + callback = (GMarshalFunc_VOID__UINT_VARIANT_VARIANT) ( + marshal_data ? marshal_data : cc->callback); + + callback( + data1, + g_value_get_uint(param_values + 1), + g_value_get_variant(param_values + 2), + g_value_get_variant(param_values + 3), + data2); +} + +/** + * Marshaller used for `priority-changed()` and `state-changed()` signals. + */ +static void g_cclosure_marshal_VOID__UINT_UINT( + GClosure *closure, + GValue *return_value G_GNUC_UNUSED, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint G_GNUC_UNUSED, + gpointer marshal_data) +{ + typedef void (*GMarshalFunc_VOID__UINT_UINT) ( + gpointer data1, + guint arg_1, + guint arg_2, + gpointer data2); + register GMarshalFunc_VOID__UINT_UINT callback; + register GCClosure *cc = (GCClosure*) closure; + register gpointer data1, data2; + + g_return_if_fail (n_param_values == 3); + + if (G_CCLOSURE_SWAP_DATA (closure)) { + data1 = closure->data; + data2 = g_value_peek_pointer(param_values + 0); + } else { + data1 = g_value_peek_pointer(param_values + 0); + data2 = closure->data; + } + callback = (GMarshalFunc_VOID__UINT_UINT) ( + marshal_data ? marshal_data : cc->callback); + + callback( + data1, + g_value_get_uint(param_values + 1), + g_value_get_uint(param_values + 2), + data2); +} + +/** + * Marshaller used for `finished()` signal. + */ +static void g_cclosure_marshal_VOID__UINT_UINT_VARIANT_STRING( + GClosure *closure, + GValue *return_value G_GNUC_UNUSED, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint G_GNUC_UNUSED, + gpointer marshal_data) +{ + typedef void (*GMarshalFunc_VOID__UINT_UINT_VARIANT_STRING) ( + gpointer data1, + guint arg_1, + guint arg_2, + GVariant *arg_4, + const gchar *arg_5, + gpointer data2); + register GMarshalFunc_VOID__UINT_UINT_VARIANT_STRING callback; + register GCClosure *cc = (GCClosure*) closure; + register gpointer data1, data2; + + g_return_if_fail (n_param_values == 5); + + if (G_CCLOSURE_SWAP_DATA (closure)) { + data1 = closure->data; + data2 = g_value_peek_pointer(param_values + 0); + } else { + data1 = g_value_peek_pointer(param_values + 0); + data2 = closure->data; + } + callback = (GMarshalFunc_VOID__UINT_UINT_VARIANT_STRING) ( + marshal_data ? marshal_data : cc->callback); + + callback( + data1, + g_value_get_uint(param_values + 1), + g_value_get_uint(param_values + 2), + g_value_get_variant(param_values + 3), + g_value_get_string(param_values + 4), + data2); +} + +/** + * Marshaller used for `deletion-request-changed()` signal. + */ +static void g_cclosure_marshal_VOID__BOOLEAN_INT64( + GClosure *closure, + GValue *return_value G_GNUC_UNUSED, + guint n_param_values, + const GValue *param_values, + gpointer invocation_hint G_GNUC_UNUSED, + gpointer marshal_data) +{ + typedef void (*GMarshalFunc_VOID__BOOLEAN_INT64) ( + gpointer data1, + gboolean arg_1, + gint64 arg_2, + gpointer data2); + register GMarshalFunc_VOID__BOOLEAN_INT64 callback; + register GCClosure *cc = (GCClosure*) closure; + register gpointer data1, data2; + + g_return_if_fail (n_param_values == 3); + + if (G_CCLOSURE_SWAP_DATA (closure)) { + data1 = closure->data; + data2 = g_value_peek_pointer(param_values + 0); + } else { + data1 = g_value_peek_pointer(param_values + 0); + data2 = closure->data; + } + callback = (GMarshalFunc_VOID__BOOLEAN_INT64) ( + marshal_data ? marshal_data : cc->callback); + + callback( + data1, + g_value_get_boolean(param_values + 1), + g_value_get_int64(param_values + 2), + data2); +} + +static void lmi_job_class_init(LmiJobClass *klass) +{ + GParamSpec *pspec; + g_type_class_add_private(klass, sizeof(LmiJobPrivate)); + GObjectClass *gobject_class = G_OBJECT_CLASS(klass); + gobject_class->get_property = lmi_job_get_property; + gobject_class->set_property = lmi_job_set_property; + gobject_class->finalize = lmi_job_finalize; + + pspec = g_param_spec_uint("number", + NULL, "Sequential number of job. Guaranteed to be unique.", + 0, G_MAXUINT, 1, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_NUMBER, pspec); + + pspec = g_param_spec_string( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_NAME], + NULL, "Modifiable job name.", NULL, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_NAME, pspec); + + pspec = g_param_spec_string( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_JOBID], + NULL, "Job identification string. Must be unique.", + NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_JOBID, pspec); + + pspec = g_param_spec_uint( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_STATE], + NULL, "Job state.", + LMI_JOB_STATE_ENUM_NEW, LMI_JOB_STATE_ENUM_LAST - 1, + LMI_JOB_STATE_ENUM_NEW, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_STATE, pspec); + + pspec = g_param_spec_uint( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_PRIORITY], + NULL, "The urgency of execution of the Job." + " The lower the number, the higher the priority.", + 0, G_MAXUINT, DEFAULT_PRIORITY, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_PRIORITY, pspec); + + pspec = g_param_spec_string( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_METHOD_NAME], + NULL, "If not NULL, the name of the intrinsic operation or " + "extrinsic method for which this Job represents an invocation.", + NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_METHOD_NAME, pspec); + + pspec = g_param_spec_int64("time-submitted", + NULL, "Time of job's creation.", + G_MININT64, G_MAXINT64, 0, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_TIME_SUBMITTED, pspec); + + pspec = g_param_spec_int64( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_TIME_BEFORE_REMOVAL], + NULL, "The amount of time in seconds that the Job is retained after" + " it has finished executing.", + 0, G_MAXINT64, DEFAULT_TIME_BEFORE_REMOVAL, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, + PROP_TIME_BEFORE_REMOVAL, pspec); + + pspec = g_param_spec_int64( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_TIME_OF_LAST_STATE_CHANGE], + NULL, "The time when the state of the Job last changed.", + G_MININT64, G_MAXINT64, 0, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, + PROP_TIME_OF_LAST_STATE_CHANGE, pspec); + + pspec = g_param_spec_int64( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_START_TIME], + NULL, "The time that the Job was actually started.", + G_MININT64, G_MAXINT64, 0, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, + PROP_START_TIME, pspec); + + pspec = g_param_spec_boolean( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_DELETE_ON_COMPLETION], + NULL, "Indicates whether or not the job should be automatically" + " deleted upon completion.", + DEFAULT_DELETE_ON_COMPLETION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, + PROP_DELETE_ON_COMPLETION, pspec); + + pspec = g_param_spec_uint( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_PERCENT_COMPLETE], + NULL, "The percentage of the job that has completed at the time" + " that this value is requested.", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, + PROP_PERCENT_COMPLETE, pspec); + + pspec = g_param_spec_uint( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_STATUS_CODE], + NULL, "CIM status code of completed job.", + LMI_JOB_STATUS_CODE_ENUM_OK, LMI_JOB_STATUS_CODE_ENUM_LAST - 1, + LMI_JOB_STATUS_CODE_ENUM_OK, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_STATUS_CODE, pspec); + + pspec = g_param_spec_variant( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_RESULT], + NULL, "Result of invoked asynchronous method upon successful" + " execution.", + G_VARIANT_TYPE_ANY, NULL, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_RESULT, pspec); + + pspec = g_param_spec_uint( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_ERROR_TYPE], + NULL, "Error type corresponding to CIM_Error.ErrorType.", + LMI_JOB_ERROR_TYPE_ENUM_UNKNOWN, LMI_JOB_ERROR_TYPE_ENUM_LAST - 1, + LMI_JOB_ERROR_TYPE_ENUM_UNKNOWN, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_ERROR_TYPE, pspec); + + pspec = g_param_spec_string( + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_ERROR], + NULL, "Error description filled upon failure.", NULL, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS); + g_object_class_install_property(gobject_class, PROP_ERROR, pspec); + + _signals[SIGNAL_MODIFIED] = + g_signal_new("modified", + G_TYPE_FROM_CLASS(gobject_class), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET(LmiJobClass, modified), + NULL, NULL, g_cclosure_marshal_VOID__UINT_VARIANT_VARIANT, + G_TYPE_NONE, 3, G_TYPE_UINT, G_TYPE_VARIANT, G_TYPE_VARIANT); + + _signals[SIGNAL_STATE_CHANGED] = + g_signal_new("state-changed", + G_TYPE_FROM_CLASS(gobject_class), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET(LmiJobClass, state_changed), + NULL, NULL, g_cclosure_marshal_VOID__UINT_UINT, + G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT); + + _signals[SIGNAL_FINISHED] = + g_signal_new("finished", + G_TYPE_FROM_CLASS(gobject_class), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET(LmiJobClass, finished), + NULL, NULL, g_cclosure_marshal_VOID__UINT_UINT_VARIANT_STRING, + G_TYPE_NONE, 4, G_TYPE_UINT, G_TYPE_UINT, + G_TYPE_VARIANT, G_TYPE_STRING); + + _signals[SIGNAL_PRIORITY_CHANGED] = + g_signal_new("priority-changed", + G_TYPE_FROM_CLASS(gobject_class), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET(LmiJobClass, priority_changed), + NULL, NULL, g_cclosure_marshal_VOID__UINT_UINT, + G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT); + + _signals[SIGNAL_DELETION_REQUEST_CHANGED] = + g_signal_new("deletion-request-changed", + G_TYPE_FROM_CLASS(gobject_class), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET(LmiJobClass, deletion_request_changed), + NULL, NULL, g_cclosure_marshal_VOID__BOOLEAN_INT64, + G_TYPE_NONE, 2, G_TYPE_BOOLEAN, G_TYPE_INT64); + +} + +/** + * Generator of unique job numbers. Each call increments `_last_job_number`. + */ +static guint lmi_job_generate_number() +{ + guint res; + pthread_mutex_lock(&_job_number_lock); + res = ++_last_job_number; + pthread_mutex_unlock(&_job_number_lock); + return res; +} + +static void lmi_job_init(LmiJob *self) +{ + LmiJobPrivate *priv = LMI_JOB_GET_PRIVATE(self); + pthread_mutexattr_t lock_attr; + self->priv = priv; + + pthread_mutexattr_init(&lock_attr); + pthread_mutexattr_settype(&lock_attr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(&priv->lock, &lock_attr); + pthread_mutexattr_destroy(&lock_attr); + priv->number = lmi_job_generate_number(); + priv->name = NULL; + priv->jobid = NULL; + priv->state = LMI_JOB_STATE_ENUM_NEW; + priv->priority = DEFAULT_PRIORITY; + priv->method_name = NULL; + priv->time_submitted = time(NULL); + priv->time_before_removal = DEFAULT_TIME_BEFORE_REMOVAL; + priv->time_of_last_state_change = priv->time_submitted; + priv->start_time = -1; + priv->delete_on_completion = DEFAULT_DELETE_ON_COMPLETION; + priv->in_parameters = NULL; + priv->out_parameters = NULL; + priv->percent_complete = 0; + priv->status_code = LMI_JOB_STATUS_CODE_ENUM_OK; + priv->result = NULL; + priv->error_type = LMI_JOB_ERROR_TYPE_ENUM_UNKNOWN; + priv->error = NULL; +} + +static void lmi_job_finalize(GObject *object) +{ + LmiJob *self = LMI_JOB(object); + LmiJobPrivate *priv = self->priv; + + g_free(priv->name); + g_free(priv->jobid); + if (priv->in_parameters) + g_hash_table_unref(priv->in_parameters); + if (priv->out_parameters) + g_hash_table_unref(priv->out_parameters); + g_free(priv->method_name); + if (priv->result) + g_variant_unref(priv->result); + g_free(priv->error); + pthread_mutex_destroy(&priv->lock); + + G_OBJECT_CLASS(lmi_job_parent_class)->finalize(object); +} + +/** + * Initialize private properties. These are used only for (de)serialization. + */ +static gboolean init_private_prop_specs() { + if (_private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] == NULL) { + _private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] = g_param_spec_variant( + PROP_IN_PARAMETERS_NAME, + NULL, "Input parameters of asynchronous method.", + PARAMS_VARIANT_TYPE, NULL, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS); + if (_private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] == NULL) + goto err; + _private_prop_specs[PROP_PRIVATE_OUT_PARAMETERS] = g_param_spec_variant( + PROP_OUT_PARAMETERS_NAME, + NULL, "Output parameters of asynchronous method.", + PARAMS_VARIANT_TYPE, NULL, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS); + if (_private_prop_specs[PROP_PRIVATE_OUT_PARAMETERS] == NULL) { + g_free(_private_prop_specs[PROP_PRIVATE_IN_PARAMETERS]); + _private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] = NULL; + goto err; + } + } + return TRUE; + +err: + lmi_error("Memory allocation failed"); + return FALSE; +} + +/** + * Serializer needs to know about our private properties. We need to add them + * to a list of property specifications. + * + * @return NULL-terminated array of pointers to param specifications. Release it + * with `g_free()`. + */ +static GParamSpec **lmi_job_serializable_iface_list_properties( + JsonSerializable *serializable, + guint *n_pspecs) +{ + guint index = 0; + GParamSpec **pspecs = NULL; + GParamSpec **pspecs_orig; + g_assert(_private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] != NULL); + g_assert(_private_prop_specs[PROP_PRIVATE_OUT_PARAMETERS] != NULL); + + if ((pspecs_orig = g_object_class_list_properties(G_OBJECT_GET_CLASS( + LMI_JOB(serializable)), NULL)) == NULL) + goto err; + if ((pspecs = g_malloc_n(sizeof(GParamSpec *), PROP_LAST + PROP_PRIVATE_LAST)) == NULL) + goto pspecs_orig_err; + + while (index < PROP_LAST - 1) { + pspecs[index] = pspecs_orig[index]; + ++index; + } + pspecs[index++] = _private_prop_specs[PROP_PRIVATE_IN_PARAMETERS]; + pspecs[index++] = _private_prop_specs[PROP_PRIVATE_OUT_PARAMETERS]; + pspecs[index] = NULL; + if (n_pspecs) + *n_pspecs = index; + + return pspecs; + +pspecs_orig_err: + g_free(pspecs_orig); +err: + lmi_error("Memory allocation failed"); + return pspecs; +} + +/** + * Convert given variant to a dictionary entry (having "{ss}" variant type) + * where key is a type string of variant and value is its serialized value + * obtained with `g_variant_print()`. + * + * We need to know a type of variant we are parsing. And because *result* and + * *in/out-parameters* can contain arbitrary value, we need to convert them + * to string we know how to parse. + */ +static GVariant *make_param_entry(GVariant *source) +{ + gchar *value; + GVariant *result = NULL; + + if ((value = g_variant_print(source, FALSE)) == NULL) { + lmi_error("Failed to serialize variant!"); + } else if ((result = g_variant_new(PARAM_VARIANT_TYPE_STR, + g_variant_get_type_string(source), value)) == NULL) + { + g_free(value); + lmi_error("Memory allocation failed"); + } + return result; +} + +/** + * Get private properties of job. Input and output parameters are turned + * into a dictionary variant. + */ +static void lmi_job_serializable_iface_get_property( + JsonSerializable *serializable, + GParamSpec *pspec, + GValue *value) +{ + GVariantBuilder variant_builder; + GVariant *variant; + GHashTable *params; + GHashTableIter pi; + gpointer param_key; + gpointer param_value; + LmiJob *job = LMI_JOB(serializable); + int prop_id = -1; + g_assert(_private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] != NULL); + g_assert(_private_prop_specs[PROP_PRIVATE_OUT_PARAMETERS] != NULL); + + if (!g_strcmp0(pspec->name, PROP_IN_PARAMETERS_NAME)) { + prop_id = PROP_IN_PARAMETERS; + } else if (!g_strcmp0(pspec->name, PROP_OUT_PARAMETERS_NAME)) { + prop_id = PROP_OUT_PARAMETERS; + } + + if (prop_id >= 0) { + params = prop_id == PROP_IN_PARAMETERS ? + job->priv->in_parameters : job->priv->out_parameters; + g_variant_builder_init(&variant_builder, PARAMS_VARIANT_TYPE); + if (params) { + g_hash_table_iter_init(&pi, params); + while (g_hash_table_iter_next(&pi, ¶m_key, ¶m_value)) { + g_variant_builder_add(&variant_builder, + PARAMS_ENTRY_VARIANT_TYPE_STR, param_key, param_value); + } + } + if ((variant = g_variant_builder_end(&variant_builder)) == NULL) + goto variant_builder_err; + g_value_set_variant(value, variant); + } else { + /* handle public properties */ + g_object_get_property(G_OBJECT(serializable), pspec->name, value); + } + return; + +variant_builder_err: + g_variant_builder_clear(&variant_builder); + lmi_error("Memory allocation failed"); +} + +/** + * Set deserialized values of private properties. + */ +static void lmi_job_serializable_iface_set_property( + JsonSerializable *serializable, + GParamSpec *pspec, + const GValue *value) +{ + int prop_id = -1; + GVariant *variant; + GVariantIter iter; + gchar *param_key; + GVariant *param_value; + LmiJob *job = LMI_JOB(serializable); + g_assert(_private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] != NULL); + g_assert(_private_prop_specs[PROP_PRIVATE_OUT_PARAMETERS] != NULL); + + if (!g_strcmp0(pspec->name, PROP_IN_PARAMETERS_NAME)) { + prop_id = PROP_IN_PARAMETERS; + } else if (!g_strcmp0(pspec->name, PROP_OUT_PARAMETERS_NAME)) { + prop_id = PROP_OUT_PARAMETERS; + } + + if (prop_id >= 0) { + if ((variant = g_value_get_variant(value)) == NULL) + return; + g_variant_iter_init(&iter, variant); + while (g_variant_iter_loop(&iter, PARAMS_ENTRY_VARIANT_TYPE_STR, ¶m_key, ¶m_value)) + { + if (prop_id == PROP_IN_PARAMETERS) { + lmi_job_set_in_param(job, param_key, param_value); + } else { + lmi_job_set_out_param(job, param_key, param_value); + } + } + } else { + /* handle public properties */ + g_object_set_property(G_OBJECT(serializable), pspec->name, value); + } +} + +/** + * Serialize any `GVariant` properties. Variants need to be preprocessed + * before they can be passed to json serializer since it stores values + * without type information. Therefore any variant is stored as a + * dictionary entry holding strings. Where key is variant's type and + * value is its serialized value. + */ +static JsonNode *lmi_job_serializable_iface_serialize_property( + JsonSerializable *serializable, + const gchar *property_name, + const GValue *value, + GParamSpec *pspec) +{ + JsonNode *result = NULL; + int prop_id = -1; + GVariant *params_variant; + GVariant *param_key; + GVariant *param_value; + GVariant *entry; + GVariantBuilder builder; + GVariantIter iter; + g_assert(_private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] != NULL); + g_assert(_private_prop_specs[PROP_PRIVATE_OUT_PARAMETERS] != NULL); + + if (!g_strcmp0(pspec->name, PROP_IN_PARAMETERS_NAME)) { + prop_id = PROP_IN_PARAMETERS; + } else if (!g_strcmp0(pspec->name, PROP_OUT_PARAMETERS_NAME)) { + prop_id = PROP_OUT_PARAMETERS; + } else if (!g_strcmp0(pspec->name, + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_RESULT])) + { + prop_id = PROP_RESULT; + } + + switch (prop_id) { + case PROP_RESULT: + params_variant = g_value_get_variant(value); + entry = make_param_entry(params_variant); + if (entry) { + result = json_gvariant_serialize(entry); + g_variant_unref(entry); + } + break; + + case PROP_IN_PARAMETERS: + case PROP_OUT_PARAMETERS: + params_variant = g_value_get_variant(value); + g_variant_builder_init(&builder, (const GVariantType *) "a{s{ss}}"); + g_variant_iter_init(&iter, params_variant); + while (g_variant_iter_next(&iter, "{&sv}", ¶m_key, ¶m_value)) { + if ((entry = make_param_entry(param_value)) == NULL) { + lmi_error("Failed to serialize \"%s\" parameter or %s!", + param_key, property_name); + goto variant_builder_err; + } + g_variant_unref(param_value); + g_variant_builder_add(&builder, "{s@{ss}}", param_key, entry); + } + if ((params_variant = g_variant_builder_end(&builder)) == NULL) { + lmi_error("Failed to serialize \"%s\" property!", property_name); + goto variant_builder_err; + } + result = json_gvariant_serialize(params_variant); + g_variant_unref(params_variant); + break; + + default: + /* non-variant properties */ + result = json_serializable_default_serialize_property( + serializable, property_name, value, pspec); + break; + } + + return result; + +variant_builder_err: + g_variant_builder_clear(&builder); + return result; +} + +/** + * Reconstruct original variant converted by `make_param_entry()`. + * + * @param name Property name. Used just for error reporting. + * @param entry Dictionary entry variant. + * @return Original variant. + */ +static GVariant *parse_param_entry(const char *name, GVariant *entry) +{ + GError *gerror = NULL; + GVariant *result = NULL; + gchar *result_type; + gchar *result_value; + + g_variant_get(entry, "{&s&s}", &result_type, &result_value); + result = g_variant_parse((const GVariantType *) result_type, + result_value, NULL, NULL, &gerror); + + if (result == NULL) { + lmi_error("Failed to parse %s entry: %s.", name, gerror->message); + g_free(gerror); + } + return result; +} + +/** + * Reconstruct original variant converted by `make_param_entry()`. + * + * @param name Property name. Used just for error reporting. + * @param node Json node that is expected to hold dictionary entry variant + * produced with `make_param_entry()`. + * @param Original variant. + */ +static GVariant *parse_param_entry_node(const char *name, JsonNode *node) +{ + GError *gerror = NULL; + GVariant *variant; + GVariant *result = NULL; + + if ((variant = json_gvariant_deserialize(node, PARAM_VARIANT_TYPE_STR, &gerror)) != NULL) + { + result = parse_param_entry(name, variant); + g_variant_unref(variant); + } else { + lmi_error("Failed to deserialize %s entry: %s.", name, gerror->message); + g_error_free(gerror); + } + return result; +} + +/** + * Handle properties holding variants. + */ +static gboolean lmi_job_serializable_iface_deserialize_property( + JsonSerializable *serializable, + const gchar *property_name, + GValue *value, + GParamSpec *pspec, + JsonNode *property_node) +{ + int prop_id = -1; + GVariant *variant; + GVariantIter vi; + GVariantBuilder *builder; + GVariant *parsed; + GError *gerror = NULL; + gchar *key; + GVariant *param; + g_assert(_private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] != NULL); + g_assert(_private_prop_specs[PROP_PRIVATE_OUT_PARAMETERS] != NULL); + + if (!g_strcmp0(pspec->name, PROP_IN_PARAMETERS_NAME)) { + prop_id = PROP_IN_PARAMETERS; + } else if (!g_strcmp0(pspec->name, PROP_OUT_PARAMETERS_NAME)) { + prop_id = PROP_OUT_PARAMETERS; + } else if (!g_strcmp0(pspec->name, + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_RESULT])) + { + prop_id = PROP_RESULT; + } + + switch (prop_id) { + case PROP_RESULT: + if ((parsed = parse_param_entry_node("result", property_node)) == NULL) + goto err; + g_value_set_variant(value, parsed); + break; + + case PROP_IN_PARAMETERS: + case PROP_OUT_PARAMETERS: + if ((variant = json_gvariant_deserialize(property_node, + "a{s{ss}}", &gerror)) == NULL) + goto parse_err; + + if ((builder = g_variant_builder_new(PARAMS_VARIANT_TYPE)) == NULL) + goto variant_err; + g_variant_iter_init(&vi, variant); + while (g_variant_iter_next(&vi, "{&s@{ss}}", &key, ¶m)) + { + if ((parsed = parse_param_entry(key, param)) == NULL) { + g_variant_unref(param); + goto builder_err; + } + g_variant_unref(param); + g_variant_builder_add(builder, PARAMS_ENTRY_VARIANT_TYPE_STR, + key, parsed); + } + if ((parsed = g_variant_builder_end(builder)) == NULL) + goto builder_err; + g_variant_unref(variant); + g_value_set_variant(value, parsed); + break; + + default: + /* non-variant properties */ + return json_serializable_default_deserialize_property( + serializable, property_name, value, pspec, property_node); + } + return TRUE; + +parse_err: + lmi_error("Failed to parse \"%s\" property: %s.", + pspec->name, gerror->message); + g_free(gerror); + goto err; +builder_err: + g_variant_builder_unref(builder); +variant_err: + g_variant_unref(variant); +err: + return FALSE; +} + +/** + * Make sure private properties will be found by serializer. + */ +static GParamSpec *lmi_job_serializable_iface_find_property( + JsonSerializable *serializable, + const char *name) +{ + int prop_id = 0; + g_assert(_private_prop_specs[PROP_PRIVATE_IN_PARAMETERS] != NULL); + g_assert(_private_prop_specs[PROP_PRIVATE_OUT_PARAMETERS] != NULL); + + while (prop_id < PROP_PRIVATE_LAST) { + if (!g_strcmp0(name, s_prop_private_names[prop_id])) { + return _private_prop_specs[prop_id]; + } + ++prop_id; + } + + /* property must be public */ + return g_object_class_find_property( + G_OBJECT_GET_CLASS(serializable), name); +} + +static void lmi_job_serializable_iface_init(JsonSerializableIface *iface) +{ + iface->serialize_property = lmi_job_serializable_iface_serialize_property; + iface->deserialize_property = lmi_job_serializable_iface_deserialize_property; + iface->find_property = lmi_job_serializable_iface_find_property; + iface->list_properties = lmi_job_serializable_iface_list_properties; + iface->set_property = lmi_job_serializable_iface_set_property; + iface->get_property = lmi_job_serializable_iface_get_property; + init_private_prop_specs(); +} + +LmiJob *lmi_job_new() +{ + LmiJob *job; + job = g_object_new(LMI_TYPE_JOB, NULL); + return LMI_JOB(job); +} + +guint lmi_job_get_number(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + guint res = job->priv->number; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +const gchar *lmi_job_get_name(const LmiJob *job) +{ + gchar *res = NULL; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (job->priv->name) + res = job->priv->name; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +gchar *lmi_job_get_jobid(const LmiJob *job) +{ + gchar *res = NULL; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (job->priv->jobid) { + res = g_strdup(job->priv->jobid); + } else { + res = g_strdup_printf("%u", job->priv->number); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +gboolean lmi_job_has_own_jobid(const LmiJob *job) +{ + gboolean res; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + res = job->priv->jobid != NULL; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +LmiJobStateEnum lmi_job_get_state(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + LmiJobStateEnum res = job->priv->state; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +guint lmi_job_get_priority(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + guint res = job->priv->priority; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +const gchar *lmi_job_get_method_name(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + const gchar *res = job->priv->method_name; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +time_t lmi_job_get_time_submitted(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + time_t res = job->priv->time_submitted; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +time_t lmi_job_get_time_before_removal(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + time_t res = job->priv->time_before_removal; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +time_t lmi_job_get_time_of_last_state_change(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + time_t res = job->priv->time_of_last_state_change; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +time_t lmi_job_get_start_time(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + time_t res = job->priv->start_time; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +gboolean lmi_job_get_delete_on_completion(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + gboolean res = job->priv->delete_on_completion; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +guint lmi_job_get_percent_complete(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + guint res = job->priv->percent_complete; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +LmiJobStatusCodeEnum lmi_job_get_status_code(const LmiJob *job) +{ + LmiJobStatusCodeEnum res; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + res = job->priv->status_code; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +GVariant *lmi_job_get_result(const LmiJob *job) +{ + GVariant *res = NULL; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + res = job->priv->result; + if (res) + g_variant_ref(res); + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +guint32 lmi_job_get_result_code(const LmiJob *job) +{ + GVariant *res = lmi_job_get_result(job); + + if (res == NULL) + return G_MAXUINT32; + if (!g_variant_is_of_type(res, G_VARIANT_TYPE_UINT32)) + return G_MAXUINT32 - 1; + return g_variant_get_uint32(res); +} + +LmiJobErrorTypeEnum lmi_job_get_error_type(const LmiJob *job) +{ + LmiJobErrorTypeEnum res; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + res = job->priv->error_type; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +const gchar *lmi_job_get_error(const LmiJob *job) +{ + gchar *res = NULL; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + res = job->priv->error; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +gboolean lmi_job_is_finished(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + gboolean res = IS_FINAL_STATE(job->priv->state); + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +void lmi_job_set_name(LmiJob *job, const gchar *name) +{ + GVariant *vold, *vnew; + const gchar *old; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + old = job->priv->name; + if ( (old && name && strcmp(name, old)) + || (!old && name) || (old && !name)) + { + job->priv->name = g_strdup(name); + vold = g_variant_new_string(old ? old:""); + vnew = g_variant_new_string(name ? name:""); + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_NAME]); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_NAME, vold, vnew); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); +} + +void lmi_job_set_jobid(LmiJob *job, const gchar *jobid) +{ + GVariant *vold, *vnew; + const gchar *old; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + old = job->priv->jobid; + if ( (old && jobid && strcmp(jobid, old)) + || (!old && jobid) || (old && !jobid)) + { + job->priv->jobid = g_strdup(jobid); + vold = g_variant_new_string(old ? old:""); + vnew = g_variant_new_string(jobid ? jobid:""); + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_JOBID]); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_NAME, vold, vnew); + lmi_debug("Job #%u has just got an id=%s assigned.", + lmi_job_get_number(job), jobid); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); +} + +void lmi_job_set_priority(LmiJob *job, guint priority) +{ + GVariant *vold, *vnew; + guint old; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + old = job->priv->priority; + if (old != priority) { + job->priv->priority = priority; + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_PRIORITY]); + g_signal_emit(job, _signals[SIGNAL_PRIORITY_CHANGED], 0, + old, priority); + vold = g_variant_new_uint32(old); + vnew = g_variant_new_uint32(priority); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_JOBID, vold, vnew); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); +} + +void lmi_job_set_method_name(LmiJob *job, const gchar *method_name) +{ + GVariant *vold, *vnew; + const gchar *old; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + old = job->priv->method_name; + if ( (old && method_name && strcmp(method_name, old)) + || (!old && method_name) || (old && !method_name)) + { + job->priv->method_name = g_strdup(method_name); + vold = g_variant_new_string(old ? old:""); + vnew = g_variant_new_string(method_name ? method_name:""); + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_METHOD_NAME]); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_METHOD_NAME, vold, vnew); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); +} + +void lmi_job_set_time_before_removal(LmiJob *job, + gint64 time_before_removal) +{ + GVariant *vold, *vnew; + gint64 old; + g_assert(LMI_IS_JOB(job)); + + if (time_before_removal < MINIMUM_TIME_BEFORE_REMOVAL) + time_before_removal = MINIMUM_TIME_BEFORE_REMOVAL; + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + old = job->priv->time_before_removal; + if (old != time_before_removal) { + job->priv->time_before_removal = time_before_removal; + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_TIME_BEFORE_REMOVAL]); + g_signal_emit(job, _signals[SIGNAL_DELETION_REQUEST_CHANGED], 0, + job->priv->delete_on_completion, time_before_removal); + vold = g_variant_new_int64(old); + vnew = g_variant_new_int64(time_before_removal); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_TIME_BEFORE_REMOVAL, vold, vnew); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); +} + +void lmi_job_set_delete_on_completion(LmiJob *job, + gboolean delete_on_completion) +{ + GVariant *vold, *vnew; + gboolean old; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + old = job->priv->delete_on_completion; + if (old != delete_on_completion) { + job->priv->delete_on_completion = delete_on_completion; + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_DELETE_ON_COMPLETION]); + g_signal_emit(job, _signals[SIGNAL_DELETION_REQUEST_CHANGED], 0, + delete_on_completion, job->priv->time_before_removal); + vold = g_variant_new_boolean(old); + vnew = g_variant_new_boolean(delete_on_completion); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_DELETE_ON_COMPLETION, vold, vnew); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); +} + +void lmi_job_set_percent_complete(LmiJob *job, + guint percent_complete) +{ + GVariant *vold, *vnew; + guint old; + g_assert(LMI_IS_JOB(job)); + + if (percent_complete > 100) + percent_complete = 100; + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + old = job->priv->percent_complete; + if (old != percent_complete) { + job->priv->percent_complete = percent_complete; + vold = g_variant_new_uint32(old); + vnew = g_variant_new_uint32(percent_complete); + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_PERCENT_COMPLETE]); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_PERCENT_COMPLETE, vold, vnew); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); +} + +void lmi_job_set_error_type(LmiJob *job, LmiJobErrorTypeEnum error_type) +{ + GVariant *vold, *vnew; + guint old; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + old = job->priv->error_type; + if (old != error_type) { + job->priv->error_type = error_type; + vold = g_variant_new_uint32(old); + vnew = g_variant_new_uint32(error_type); + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_ERROR_TYPE]); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_ERROR_TYPE, vold, vnew); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); +} + +gboolean lmi_job_set_state(LmiJob *job, + LmiJobStateEnum state, + LmiJobStatusCodeEnum status_code, + GVariant *result, + const gchar *error) +{ + GVariant *vold, *vnew; + gchar *jobid = NULL; + LmiJobStateEnum old_state; + time_t old_start_time; + time_t old_time_of_last_state_change; + guint old_percent_complete; + guint old_status_code; + time_t current_time; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + old_state = job->priv->state; + old_start_time = job->priv->start_time; + old_percent_complete = job->priv->percent_complete; + old_status_code = job->priv->status_code; + old_time_of_last_state_change = job->priv->time_of_last_state_change; + current_time = time(NULL); + + if ((jobid = lmi_job_get_jobid(job)) == NULL) + goto err; + + /* check inputs */ + if ( old_state != LMI_JOB_STATE_ENUM_NEW + && state == LMI_JOB_STATE_ENUM_RUNNING) + { + error = "Can not start job \"%s\" second time!"; + goto bad_input; + } + + if ( old_state != LMI_JOB_STATE_ENUM_RUNNING + && state == LMI_JOB_STATE_ENUM_COMPLETED) + { + if (old_state == LMI_JOB_STATE_ENUM_NEW) { + error = "Can not finish unstarted job \"%s\"!"; + } else { + error = "Can not finish already finished job \"%s\"!"; + } + goto bad_input; + } + + if ( IS_FINAL_STATE(old_state) + && IS_FINAL_STATE(state)) + { + error = "Can not finish already finished job \"%s\"!"; + goto bad_input; + } + + if ( IS_FINAL_STATE(old_state) + && !IS_FINAL_STATE(state)) + { + error = "Can not change state of finished job \"%s\"!"; + goto bad_input; + } + + /* modify job */ + job->priv->state = state; + job->priv->time_of_last_state_change = current_time; + job->priv->status_code = status_code; + lmi_debug("State of job \"%s\" changed from \"%s\" to \"%s\".", + jobid, s_job_state_enum_names[old_state], s_job_state_enum_names[state]); + if (old_state == LMI_JOB_STATE_ENUM_NEW) { + job->priv->start_time = current_time; + } + if (IS_FINAL_STATE(state)) { + if (state == LMI_JOB_STATE_ENUM_COMPLETED && result) { + job->priv->result = g_variant_ref_sink(result); + } else if (state == LMI_JOB_STATE_ENUM_EXCEPTION && error) { + job->priv->error = g_strdup(error); + } + if (state == LMI_JOB_STATE_ENUM_COMPLETED) { + job->priv->percent_complete = 100; + } else { + job->priv->percent_complete = 0; + } + } + + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_STATE]); + if (result != NULL) + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_RESULT]); + if (error != NULL) + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_ERROR]); + if (IS_FINAL_STATE(state)) { + g_signal_emit(job, _signals[SIGNAL_FINISHED], 0, + old_state, state, + job->priv->result == NULL + ? g_variant_new_handle(0) + : job->priv->result, + job->priv->error); + } + + /* emit signals */ + g_signal_emit(job, _signals[SIGNAL_STATE_CHANGED], 0, + old_state, state); + + vold = g_variant_new_uint32(old_state); + vnew = g_variant_new_uint32(state); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_STATE, vold, vnew); + + if (old_state == LMI_JOB_STATE_ENUM_NEW) { + vold = g_variant_new_int64(old_start_time); + vnew = g_variant_new_int64(job->priv->start_time); + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_START_TIME]); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_START_TIME, vold, vnew); + } + + if (old_percent_complete != job->priv->percent_complete) { + vold = g_variant_new_uint32(old_percent_complete); + vnew = g_variant_new_uint32(job->priv->percent_complete); + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_PERCENT_COMPLETE]); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_PERCENT_COMPLETE, vold, vnew); + } + + if (old_status_code != job->priv->status_code) { + vold = g_variant_new_uint32(old_status_code); + vnew = g_variant_new_uint32(job->priv->status_code); + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_STATUS_CODE]); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_STATUS_CODE, vold, vnew); + } + + vold = g_variant_new_int64(old_time_of_last_state_change); + vnew = g_variant_new_int64(job->priv->time_of_last_state_change); + g_object_notify(G_OBJECT(job), + s_job_prop_enum_names[LMI_JOB_PROP_ENUM_TIME_OF_LAST_STATE_CHANGE]); + g_signal_emit(job, _signals[SIGNAL_MODIFIED], 0, + LMI_JOB_PROP_ENUM_TIME_OF_LAST_STATE_CHANGE, vold, vnew); + + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return TRUE; + +bad_input: + lmi_error(error, jobid); + g_free(jobid); +err: + PRIV_CRITICAL_CRITICAL_END(job->priv); + return FALSE; +} + +gboolean lmi_job_start(LmiJob *job) +{ + return lmi_job_set_state(job, LMI_JOB_STATE_ENUM_RUNNING, + LMI_JOB_STATUS_CODE_ENUM_OK, NULL, NULL); +} + +gboolean lmi_job_finish_ok(LmiJob *job, GVariant *result) +{ + return lmi_job_set_state(job, LMI_JOB_STATE_ENUM_COMPLETED, + LMI_JOB_STATUS_CODE_ENUM_OK, result, NULL); +} + +gboolean lmi_job_finish_ok_with_code(LmiJob *job, guint32 exit_code) +{ + gboolean result = false; + GVariant *variant = g_variant_new_uint32(exit_code); + if (variant != NULL) { + result = lmi_job_finish_ok(job, variant); + } else { + lmi_error("Memory allocation failed"); + } + return result; +} + +gboolean lmi_job_finish_exception(LmiJob *job, + LmiJobStatusCodeEnum status_code, + const gchar *error) +{ + return lmi_job_set_state(job, LMI_JOB_STATE_ENUM_EXCEPTION, + status_code, NULL, error); +} + +gboolean lmi_job_finish_terminate(LmiJob *job) +{ + return lmi_job_set_state(job, LMI_JOB_STATE_ENUM_TERMINATED, + LMI_JOB_STATUS_CODE_ENUM_OK, NULL, NULL); +} + +gchar **lmi_job_get_in_param_keys(const LmiJob *job) +{ + gchar **res ; + guint size = 0; + GHashTableIter hi; + gpointer key; + int index = 0; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (job->priv->in_parameters) + size = g_hash_table_size(job->priv->in_parameters); + res = g_new(gchar *, size + 1); + if (res && size > 1) { + g_hash_table_iter_init(&hi, job->priv->in_parameters); + while (g_hash_table_iter_next(&hi, &key, NULL)) { + res[index++] = g_strdup((const char *) key); + } + } + res[index] = NULL; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +gboolean lmi_job_has_in_param(const LmiJob *job, const gchar *param) +{ + gboolean res = FALSE; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (job->priv->in_parameters) + res = g_hash_table_lookup_extended(job->priv->in_parameters, param, + NULL, NULL); + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +GVariant *lmi_job_get_in_param(const LmiJob *job, const gchar *param) +{ + GVariant *res = NULL; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (job->priv->in_parameters) { + res = g_hash_table_lookup(job->priv->in_parameters, param); + if (res) + g_variant_ref(res); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +gboolean lmi_job_set_in_param(LmiJob *job, const gchar *param, GVariant *value) +{ + gchar *key; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (!job->priv->in_parameters) + job->priv->in_parameters = g_hash_table_new_full( + lmi_str_lcase_hash_func, lmi_str_icase_equal, g_free, + (GDestroyNotify) g_variant_unref); + if (!job->priv->in_parameters) + goto err; + if ((key = g_strdup(param)) == NULL) + goto err; + g_hash_table_insert(job->priv->in_parameters, key, value); + g_variant_ref_sink(value); + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return TRUE; +err: + PRIV_CRITICAL_CRITICAL_END(job->priv); + return FALSE; +} + +gchar **lmi_job_get_out_param_keys(const LmiJob *job) +{ + gchar **res = NULL; + guint size = 0; + GHashTableIter hi; + gpointer key; + int index = 0; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (job->priv->out_parameters) + size = g_hash_table_size(job->priv->out_parameters); + res = g_new(gchar *, size + 1); + if (res && size > 1) { + g_hash_table_iter_init(&hi, job->priv->out_parameters); + while (g_hash_table_iter_next(&hi, &key, NULL)) { + res[index++] = g_strdup((const char *) key); + } + } + res[index] = NULL; + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +gboolean lmi_job_has_out_param(const LmiJob *job, const gchar *param) +{ + gboolean res = FALSE; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (job->priv->out_parameters) + res = g_hash_table_lookup_extended(job->priv->out_parameters, param, + NULL, NULL); + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +GVariant *lmi_job_get_out_param(const LmiJob *job, const gchar *param) +{ + GVariant *res = NULL; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (job->priv->out_parameters) { + res = g_hash_table_lookup(job->priv->out_parameters, param); + if (res) + g_variant_ref(res); + } + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return res; +} + +gboolean lmi_job_set_out_param(LmiJob *job, const gchar *param, GVariant *value) +{ + gchar *key; + g_assert(LMI_IS_JOB(job)); + + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); + if (!job->priv->out_parameters) + job->priv->out_parameters = g_hash_table_new_full( + lmi_str_lcase_hash_func, lmi_str_icase_equal, g_free, + (GDestroyNotify) g_variant_unref); + if (!job->priv->out_parameters) + goto err; + if ((key = g_strdup(param)) == NULL) + goto err; + g_hash_table_insert(job->priv->out_parameters, key, value); + g_variant_ref_sink(value); + PRIV_CRITICAL_CRITICAL_END(job->priv); + + return TRUE; +err: + PRIV_CRITICAL_CRITICAL_END(job->priv); + return FALSE; +} + +const gchar *lmi_job_state_to_string(LmiJobStateEnum state) +{ + g_assert( state >= LMI_JOB_STATE_ENUM_NEW + && state < LMI_JOB_STATE_ENUM_LAST); + return s_job_state_enum_names[state]; +} + +const gchar *lmi_job_prop_to_string(LmiJobPropEnum prop) +{ + g_assert(prop >= 0 && prop < LMI_JOB_PROP_ENUM_LAST); + return s_job_prop_enum_names[prop]; +} + +void lmi_job_critical_section_begin(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + PRIV_CRITICAL_CRITICAL_BEGIN(job->priv); +} + +void lmi_job_critical_section_end(const LmiJob *job) +{ + g_assert(LMI_IS_JOB(job)); + PRIV_CRITICAL_CRITICAL_END(job->priv); +} + +static void process_finished_signal(LmiJob *job, + LmiJobStateEnum old_state, + LmiJobStateEnum new_state, + GVariant *result, + const gchar *error, + gpointer data) +{ + GMainLoop *main_loop = data; + g_main_loop_quit(main_loop); +} + +static gboolean unlock_job(gpointer data) +{ + LmiJob *job = LMI_JOB(data); + JOB_CRITICAL_END(job); + return FALSE; +} + +void lmi_job_wait_until_finished(LmiJob *job) +{ + /* Let's run private main loop which waits for *finished* signal of job. */ + GMainContext *ctx = NULL; + GMainLoop *main_loop = NULL; + gchar *jobid; + GSource *source = NULL; + + JOB_CRITICAL_BEGIN(job); + if (!lmi_job_is_finished(job)) { + ctx = g_main_context_new(); + if (ctx == NULL) + goto critical_end_err; + main_loop = g_main_loop_new(ctx, FALSE); + if (main_loop == NULL) + goto ctx_err; + g_main_context_push_thread_default(ctx); + + /* Callback needs reference to our private main loop because it + * needs to stop it */ + g_signal_connect(job, "finished", G_CALLBACK(process_finished_signal), + main_loop); + + /* First thing that needs to be done when the loop is running is + * unlocking the job so it can be finished in another thread. */ + if ((source = g_idle_source_new()) == NULL) + goto pop_ctx_err; + g_source_set_callback(source, unlock_job, job, NULL); + g_source_attach(source, ctx); + + jobid = lmi_job_get_jobid(job); + lmi_debug("Waiting for transition to finished state of job \"%s\".", + jobid); + g_free(jobid); + + /* Block until the main loop is stopped from *process_finished_signal* + * callback. */ + g_main_loop_run(main_loop); + + g_signal_handlers_disconnect_by_func( + job, process_finished_signal, NULL); + g_main_context_pop_thread_default(ctx); + g_main_loop_unref(main_loop); + g_main_context_unref(ctx); + } else { + JOB_CRITICAL_END(job); + } + return; + +pop_ctx_err: + g_signal_handlers_disconnect_by_func(job, process_finished_signal, NULL); + g_main_context_pop_thread_default(ctx); + g_main_loop_unref(main_loop); +ctx_err: + g_main_context_unref(ctx); +critical_end_err: + JOB_CRITICAL_END(job); + lmi_error("Memory allocation failed"); +} + diff --git a/src/libs/jobmanager/lmi_job.h b/src/libs/jobmanager/lmi_job.h new file mode 100644 index 0000000..a2f8f36 --- /dev/null +++ b/src/libs/jobmanager/lmi_job.h @@ -0,0 +1,605 @@ +/* + * Copyright (C) 2014 Red Hat, Inc. All rights reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: Michal Minar <miminar@redhat.com> + */ + +#include <glib-object.h> +#include <gio/gio.h> + +#ifndef LMI_JOB_H +#define LMI_JOB_H + +#define LMI_TYPE_JOB (lmi_job_get_type ()) +#define LMI_JOB(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), LMI_TYPE_JOB, LmiJob)) +#define LMI_IS_JOB(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), LMI_TYPE_JOB)) +#define LMI_JOB_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), LMI_TYPE_JOB, LmiJobClass)) +#define LMI_IS_JOB_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), LMI_TYPE_JOB)) +#define LMI_JOB_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), LMI_TYPE_JOB, LmiJobClass)) + +typedef struct _LmiJobPrivate LmiJobPrivate; +typedef struct _LmiJob LmiJob; +typedef struct _LmiJobClass LmiJobClass; + +#define CLOG_COLOR_BLACK 30 +#define CLOG_COLOR_RED 31 +#define CLOG_COLOR_GREEN 32 +#define CLOG_COLOR_YELLOW 33 +#define CLOG_COLOR_BLUE 34 +#define CLOG_COLOR_MAGENTA 35 +#define CLOG_COLOR_CYAN 36 +#define CLOG_COLOR_WHITE 37 + +#ifdef DEBUG_LOCKING + #include <stdio.h> + + #define _clog(color_code, msg, ...) \ + fprintf(stderr, "\x1b[%um" __FILE__ ":%u " msg "\x1b[0m\n", \ + color_code, __LINE__, ##__VA_ARGS__) +#else + static inline void _clog(unsigned int color_code, const char *msg, ...) + { + (void) msg; + } +#endif + +#define JOB_CRITICAL_BEGIN(job) \ + { \ + pthread_t _thread_id = pthread_self(); \ + _clog(CLOG_COLOR_CYAN, "[tid=%lu] locking job", _thread_id); \ + lmi_job_critical_section_begin(job); \ + _clog(CLOG_COLOR_CYAN, "[tid=%lu] locked job", _thread_id); \ + } + +#define JOB_CRITICAL_END(job) \ + { \ + pthread_t _thread_id = pthread_self(); \ + _clog(CLOG_COLOR_CYAN, "[tid=%lu] unlocking job", _thread_id); \ + lmi_job_critical_section_end(job); \ + _clog(CLOG_COLOR_CYAN, "[tid=%lu] unlocked job", _thread_id); \ + } + +/** + * Possible job states. Job begins with *NEW* state and transition to *RUNNING* + * when it's started. Next state is one of *COMPLETED*, *TERMINATED* and + * *EXCEPTION*. These states are called *final* states. No other transition + * can be made from final state. + */ +typedef enum { + LMI_JOB_STATE_ENUM_NEW, + LMI_JOB_STATE_ENUM_RUNNING, + LMI_JOB_STATE_ENUM_COMPLETED, + LMI_JOB_STATE_ENUM_TERMINATED, + LMI_JOB_STATE_ENUM_EXCEPTION, + LMI_JOB_STATE_ENUM_LAST +}LmiJobStateEnum; + +/** + * Used as a value of `status-code` property of job. + * + * Can only be set with `lmi_job_finish_exception()`. + */ +typedef enum { + LMI_JOB_STATUS_CODE_ENUM_OK = 0, + LMI_JOB_STATUS_CODE_ENUM_FAILED = 1, + LMI_JOB_STATUS_CODE_ENUM_ACCESS_DENIED = 2, + LMI_JOB_STATUS_CODE_ENUM_INVALID_NAMESPACE = 3, + LMI_JOB_STATUS_CODE_ENUM_INVALID_PARAMETER = 4, + LMI_JOB_STATUS_CODE_ENUM_INVALID_CLASS = 5, + LMI_JOB_STATUS_CODE_ENUM_NOT_FOUND = 6, + LMI_JOB_STATUS_CODE_ENUM_NOT_SUPPORTED = 7, + LMI_JOB_STATUS_CODE_ENUM_CLASS_HAS_CHILDREN = 8, + LMI_JOB_STATUS_CODE_ENUM_CLASS_HAS_INSTANCES = 9, + LMI_JOB_STATUS_CODE_ENUM_INVALID_SUPERCLASS = 10, + LMI_JOB_STATUS_CODE_ENUM_ALREADY_EXISTS = 11, + LMI_JOB_STATUS_CODE_ENUM_NO_SUCH_PROPERTY = 12, + LMI_JOB_STATUS_CODE_ENUM_TYPE_MISMATCH = 13, + LMI_JOB_STATUS_CODE_ENUM_QUERY_LANGUAGE_NOT_SUPPORTED = 14, + LMI_JOB_STATUS_CODE_ENUM_INVALID_QUERY = 15, + LMI_JOB_STATUS_CODE_ENUM_METHOD_NOT_AVAILABLE = 16, + LMI_JOB_STATUS_CODE_ENUM_METHOD_NOT_FOUND = 17, + LMI_JOB_STATUS_CODE_ENUM_UNEXPECTED_RESPONSE = 18, + LMI_JOB_STATUS_CODE_ENUM_INVALID_RESPONSE_DESTINATION = 19, + LMI_JOB_STATUS_CODE_ENUM_NAMESPACE_NOT_EMPTY = 20, + LMI_JOB_STATUS_CODE_ENUM_INVALID_ENUMERATION_CONTEXT = 21, + LMI_JOB_STATUS_CODE_ENUM_INVALID_OPERATION_TIMEOUT = 22, + LMI_JOB_STATUS_CODE_ENUM_PULL_HAS_BEEN_ABANDONED = 23, + LMI_JOB_STATUS_CODE_ENUM_PULL_CANNOT_BE_ABANDONED = 24, + LMI_JOB_STATUS_CODE_ENUM_FILTERED_ENUMERATION_NOT_SUPPORTED = 25, + LMI_JOB_STATUS_CODE_ENUM_CONTINUATION_ON_ERROR_NOT_SUPPORTED = 26, + LMI_JOB_STATUS_CODE_ENUM_SERVER_LIMITS_EXCEEDED = 27, + LMI_JOB_STATUS_CODE_ENUM_SERVER_IS_SHUTTING_DOWN = 28, + LMI_JOB_STATUS_CODE_ENUM_QUERY_FEATURE_NOT_SUPPORTED = 29, + LMI_JOB_STATUS_CODE_ENUM_LAST +}LmiJobStatusCodeEnum; + +/** + * Used as a value of `error-type` property of job. + * + * Shall only be set when is in `EXCEPTION` state. + */ +typedef enum { + LMI_JOB_ERROR_TYPE_ENUM_UNKNOWN = 0, + LMI_JOB_ERROR_TYPE_ENUM_OTHER = 1, + LMI_JOB_ERROR_TYPE_ENUM_COMMUNICATIONS_ERROR = 2, + LMI_JOB_ERROR_TYPE_ENUM_QUALITY_OF_SERVICE_ERROR = 3, + LMI_JOB_ERROR_TYPE_ENUM_SOFTWARE_ERROR = 4, + LMI_JOB_ERROR_TYPE_ENUM_HARDWARE_ERROR = 5, + LMI_JOB_ERROR_TYPE_ENUM_ENVIRONMENTAL_ERROR = 6, + LMI_JOB_ERROR_TYPE_ENUM_SECURITY_ERROR = 7, + LMI_JOB_ERROR_TYPE_ENUM_OVERSUBSCRIPTION_ERROR = 8, + LMI_JOB_ERROR_TYPE_ENUM_UNAVAILABLE_RESOURCE_ERROR = 9, + LMI_JOB_ERROR_TYPE_ENUM_UNSUPPORTED_OPERATION_ERROR = 10, + LMI_JOB_ERROR_TYPE_ENUM_LAST +}LmiJobErrorTypeEnum; + +/** + * This identifies modifiable properties of job. It's particularly useful for + * *modified* signal of job instance to identify property that got changed. + */ +typedef enum { + LMI_JOB_PROP_ENUM_NAME, + LMI_JOB_PROP_ENUM_JOBID, + LMI_JOB_PROP_ENUM_STATE, + LMI_JOB_PROP_ENUM_PRIORITY, + LMI_JOB_PROP_ENUM_METHOD_NAME, + LMI_JOB_PROP_ENUM_TIME_BEFORE_REMOVAL, + LMI_JOB_PROP_ENUM_TIME_OF_LAST_STATE_CHANGE, + LMI_JOB_PROP_ENUM_START_TIME, + LMI_JOB_PROP_ENUM_DELETE_ON_COMPLETION, + LMI_JOB_PROP_ENUM_PERCENT_COMPLETE, + LMI_JOB_PROP_ENUM_STATUS_CODE, + LMI_JOB_PROP_ENUM_RESULT, + LMI_JOB_PROP_ENUM_ERROR_TYPE, + LMI_JOB_PROP_ENUM_ERROR, + LMI_JOB_PROP_ENUM_LAST +}LmiJobPropEnum; + +struct _LmiJob { + GObject parent; + LmiJobPrivate *priv; +}; + +struct _LmiJobClass { + GObjectClass parent_class; + + /* Follows a list of instance signals. They are listed in ascending + * priority - first listed will be emitted last. */ + + /** + * Signal emitted whenever there is a change to some property. + */ + void (*modified) (LmiJob *job, + LmiJobPropEnum property, + GVariant *old_value, + GVariant *new_value); + + /** + * Signal emitted when there is a change to `time-before-removal` + * or `delete-on-completion` properties. + */ + void (*deletion_request_changed) (LmiJob *job, + gboolean delete_on_completion, + gint64 time_before_removal); + + /** + * Signal emitted when a change to job's priority occurs. + */ + void (*priority_changed) ( + LmiJob *job, + guint old_priority, + guint new_priority); + + /** + * Signal emitted whenever job changes its state. It is usually triggered + * just twice per one job. Once upon a transition from *NEW* to *RUNNING* + * and then from *RUNNING* to one of final states. + */ + void (*state_changed) (LmiJob *job, + LmiJobStateEnum old_state, + LmiJobStateEnum new_state); + + /** + * Signal emitted when job's state changes to final one. + * + * @param result Result of a job when job's state is *COMPLETED*. + * `G_VARIANT_TYPE_HANDLE` with value 0 is passed otherwise. + * @param error Error message filled when job's state is *EXCEPTION*. + */ + void (*finished) (LmiJob *job, + LmiJobStateEnum old_state, + LmiJobStateEnum new_state, + GVariant *result, + const gchar *error); + +}; + +GType lmi_job_get_type(); + +/** + * Create a new job. Number generator is invoked to get unique number. Other + * properties are set to their default values. + */ +LmiJob *lmi_job_new(); + +/** + * Get a unique number of job. + */ +guint lmi_job_get_number(const LmiJob *job); + +/** + * Get the name of job. Default value is `NULL`. This value is assigned to + * `Name` property of corresponding `CIM_ConcreteJob` instance. + * + * Do not free returned value. + */ +const gchar *lmi_job_get_name(const LmiJob *job); + +/** + * Get the jobid. This is used to associate job's instance with corresponding + * job of your provider's backend. + * + * If not set with `lmi_job_set_jobid`, job's number in decimal representation + * will be returned. + * + * Result needs to be freed with `g_free()`. + */ +gchar *lmi_job_get_jobid(const LmiJob *job); + +/** + * Is the jobid set? + */ +gboolean lmi_job_has_own_jobid(const LmiJob *job); + +/** + * Get a current state of job. + */ +LmiJobStateEnum lmi_job_get_state(const LmiJob *job); + +/** + * Get job's priority. Default value is 128. + */ +guint lmi_job_get_priority(const LmiJob *job); + +/** + * Get a name of CIM method whose invocation led to creation of this job. This + * property needs to be set with `lmi_job_set_method_name()` shortly after + * job's creation. + * + * Do not free returned value. + */ +const gchar *lmi_job_get_method_name(const LmiJob *job); + +/** + * Get a time of job's creation. Readable property, initialized upon job's + * creation. + */ +time_t lmi_job_get_time_submitted(const LmiJob *job); + +/** + * Get number of seconds the job is kept after its completion. If + * `delete-on-completion` property is set and job comes to final state, it will + * be deleted after certain number of seconds returned by this function. + * + * Default and minimum value is 5 minutes. + */ +time_t lmi_job_get_time_before_removal(const LmiJob *job); + +/** + * Get a time of job's last state change. + */ +time_t lmi_job_get_time_of_last_state_change(const LmiJob *job); + +/** + * Get a time of job's execution start. If job is not started, 0 will be + * returned. + */ +time_t lmi_job_get_start_time(const LmiJob *job); + +/** + * Shall the job be deleted after its completion? It's not deleted + * immediately but after `time-before-removal` seconds. + */ +gboolean lmi_job_get_delete_on_completion(const LmiJob *job); + +/** + * Get the percentage of completion, an integer ranging from 0 to 100. + * This number shall be updated during job's *RUNNING* state. When + * job finishes successfully, it is set to 100. + */ +guint lmi_job_get_percent_complete(const LmiJob *job); + +/** + * Get CIM status code of job. This shall be set together with `EXCEPTION` + * state. It won't be modified if job completes successfully. + */ +LmiJobStatusCodeEnum lmi_job_get_status_code(const LmiJob *job); + +/** + * Get result of completed job. Unless job is in *COMPLETED* state, this + * returns `NULL`. + * + * Call `g_variant_unref()` when the result is not needed anymore. + */ +GVariant *lmi_job_get_result(const LmiJob *job); + +/** + * Get result code of completed job. Unless job is in *COMPLETED* state, this + * returns G_MAXUINT. If the result is not type of unsigned int, G_MAXUINT-1 is + * returned. + */ +guint32 lmi_job_get_result_code(const LmiJob *job); + +/** + * Get error type. This is optionally set when job completes with error. + */ +LmiJobErrorTypeEnum lmi_job_get_error_type(const LmiJob *job); + +/** + * Get error message of failed job. Unless is in *EXCEPTION* state, this + * returns `NULL`. + * + * Do not free returned value. + */ +const gchar *lmi_job_get_error(const LmiJob *job); + +/** + * Check, whether job is in final state. + */ +gboolean lmi_job_is_finished(const LmiJob *job); + +/** + * Change the name of job. Affects `Name` property of corresponding + * `CIM_ConreteJob`. Should be used immediately after job's creation. + * + * Emits *modified* signal. + */ +void lmi_job_set_name(LmiJob *job, const gchar *name); + +/** + * Set the job id. Should be used immediately after job's creation. + * + * Emits *modified* signal. + */ +void lmi_job_set_jobid(LmiJob *job, const gchar *jobid); + +/** + * Change job's priority. + * + * Emits *priority-changed* and *modified* signals. + */ +void lmi_job_set_priority(LmiJob *job, guint priority); + +/** + * Set the name of CIM method creating given job. + * + * Emits *modified* signal. + */ +void lmi_job_set_method_name(LmiJob *job, const gchar *method_name); + +/** + * Change time interval of job's removal after its completion. + * + * Emits *deletion-request-changed* and *modified* signals. + */ +void lmi_job_set_time_before_removal(LmiJob *job, + glong time_before_removal); + +/** + * Specify whether job shall be deleted after its completion. + * + * Emits *deletion-request-changed* and *modified* signals. + */ +void lmi_job_set_delete_on_completion(LmiJob *job, + gboolean delete_on_completion); + +/** + * Change percentage of job's completion. + * + * Emits *modified* signal. + * + * @param percent_complete Is an integer ranging from 0 to 100. + */ +void lmi_job_set_percent_complete(LmiJob *job, guint percent_complete); + +/** + * Change error type of job. This shall be used only for failed job. + */ +void lmi_job_set_error_type(LmiJob *job, LmiJobErrorTypeEnum error_type); + +/** + * Change job's state to *RUNNING*. Job must be in *NEW* state. + * + * Emits *state-changed* and *modified* signals. + */ +gboolean lmi_job_start(LmiJob *job); + +/** + * Change job's state to *COMPLETED* and set the result. + * + * Emits *finished*, *state-changed* and *modified* signals. + * + * @param result Result of asynchronous method. If it is a floating variant, + * it will be singed, otherise it will be referenced. + * @returns Whether the state was successfully set. + */ +gboolean lmi_job_finish_ok(LmiJob *job, GVariant *result); + +/** + * Change job's state to *COMPLETED* and set the result code. + * + * Emits *finished*, *state-changed* and *modified* signals. + * + * @param result Result of asynchronous method. Unsigned integer + * GVariant will be created to hold this value. + * Use `lmi_job_get_result_code()` to get the result back. + * @returns Whether the state was successfully set. + */ +gboolean lmi_job_finish_ok_with_code(LmiJob *job, guint32 exit_code); + +/** + * Change job's state to *EXCEPTION* and set the error message. + * + * Emits *finished*, *state-changed* and *modified* signals. + * + * @returns Whether the state was successfully set. + */ +gboolean lmi_job_finish_exception(LmiJob *job, + LmiJobStatusCodeEnum status_code, + const gchar *error); + +/** + * Change job's state to *TERMINATED*. + * + * Emits *finished*, *state-changed* and *modified* signals. + * + * @returns Whether the state was successfully set. + */ +gboolean lmi_job_finish_terminate(LmiJob *job); + +/** + * Get names of input parameters of corresponding CIM method that caused + * job's creation. + * + * Release the result with `g_strfreev().` + * + * @returns `NULL`-terminated array of input parameter names. + */ +gchar **lmi_job_get_in_param_keys(const LmiJob *job); + +/** + * Is input parameter set for given job? Check is done in case-insensitive way. + */ +gboolean lmi_job_has_in_param(const LmiJob *job, const gchar *param); + +/** + * Get the value of input parameter of CIM method. + * + * Unreference the result with `g_variant_unref()` when you don't need it. + * + * @param param Name of queried input parameter. Lookup is done in + * case-insensitive way. + * @returns variant that shall be unreferenced when not needed. + * `NULL` is returned for missing key. + */ +GVariant *lmi_job_get_in_param(const LmiJob *job, const gchar *param); + +/** + * Set the value of input parameter of CIM method whose invocation led to creation + * of given job. + * + * @param param Name of input parameter which is compared in case-insensitive way + * to others. + * @param value Any value of any type. If it is floating variant, it will be + * singed, otherwise it will be referenced. + */ +gboolean lmi_job_set_in_param(LmiJob *job, const gchar *param, GVariant *value); + +/** + * Get names of outputput parameters of corresponding CIM method that caused + * job's creation. + * + * Release the result with `g_strfreev().` + * + * @returns `NULL`-terminated array of output parameter names. + */ +gchar **lmi_job_get_out_param_keys(const LmiJob *job); + +/** + * Is output parameter set for given job? Check is done in case-insensitive way. + */ +gboolean lmi_job_has_out_param(const LmiJob *job, const gchar *param); + +/** + * Get the value of output parameter of CIM method. + * + * Unreference the result with `g_variant_unref()` when you don't need it. + * + * @param param Name of queried output parameter. Lookup is done in + * case-insensitive way. + * @returns variant that shall be unreferenced when not needed. + * `NULL` is returned for missing key. + */ +GVariant *lmi_job_get_out_param(const LmiJob *job, const gchar *param); + +/** + * Set the value of output parameter of CIM method whose invocation led to creation + * of given job. + * + * @param param Name of output parameter which is compared in case-insensitive way + * to others. + * @param value Any value of any type. If it is floating variant, it will be + * singed, otherwise it will be referenced. */ +gboolean lmi_job_set_out_param(LmiJob *job, const gchar *param, GVariant *value); + +/** + * Get a name of job's state. + */ +const gchar *lmi_job_state_to_string(LmiJobStateEnum state); +/** + * Get a name of job's property. + */ +const gchar *lmi_job_prop_to_string(LmiJobPropEnum prop); + +/** + * Use this function in pair with `lmi_job_critical_section_end()` to surround + * critical section, where job's internal data must stay untouched by other + * threads. + * + * @note Accessing and modifying job from concurrent threads is itself thread-safe. + * This allows for exclusive use of job across several get and sets. + * + * Don't forget to call `lmi_job_critical_section_end()` when you're finished. + */ +void lmi_job_critical_section_begin(const LmiJob *job); + +/** + * Use at the end of critical section starting with + * `lmi_job_critical_section_begin()`. + */ +void lmi_job_critical_section_end(const LmiJob *job); + +/** + * Serialize job object to output stream. + * + * @return TRUE if job was successfully written. + */ +gboolean lmi_job_dump(const LmiJob *job, GOutputStream *stream); + +/** + * Serialize job object to a file. + * + * @return TRUE if file was successfully written. + */ +gboolean lmi_job_dump_to_file(const LmiJob *job, const gchar *file_path); + +/** + * Deserialize job object from input stream. + */ +LmiJob *lmi_job_load(GInputStream *stream, GType job_type); + +/** + * Deserialize job object from file. + */ +LmiJob *lmi_job_load_from_file(const gchar *file_path, GType job_type); + +/** + * Block until the job is finished. + */ +void lmi_job_wait_until_finished(LmiJob *job); + +#endif /* end of include guard: LMI_JOB_H */ diff --git a/src/libs/jobmanager/lmi_job_serialization.c b/src/libs/jobmanager/lmi_job_serialization.c new file mode 100644 index 0000000..54dc87d --- /dev/null +++ b/src/libs/jobmanager/lmi_job_serialization.c @@ -0,0 +1,172 @@ +/* + * Copyright (C) 2014 Red Hat, Inc. All rights reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: Michal Minar <miminar@redhat.com> + */ + +#include <json-glib/json-glib.h> +#include "openlmi.h" +#include "lmi_job.h" + +gboolean lmi_job_dump(const LmiJob *job, GOutputStream *stream) +{ + gchar *data = NULL; + gsize length; + gchar *jobid = NULL; + GError *gerror = NULL; + g_assert(LMI_IS_JOB(job)); + g_assert(G_IS_OUTPUT_STREAM(stream)); + + JOB_CRITICAL_BEGIN(job); + if ((jobid = lmi_job_get_jobid(job)) == NULL) { + lmi_error("Memory allocation failed"); + goto err; + } + + if ((data = json_gobject_to_data(G_OBJECT(job), &length)) == NULL) { + lmi_error("Failed to serialize job \"%s\" to json.", jobid); + goto err; + } + if (!g_output_stream_write_all(stream, data, length, NULL, NULL, &gerror)) { + lmi_error("Failed to dump job \"%s\" to output stream: %s", + jobid, gerror->message); + goto err; + } + JOB_CRITICAL_END(job); + g_free(data); + + lmi_debug("Job \"%s\" dumped to output stream.", jobid); + g_free(jobid); + + return TRUE; + +err: + g_free(data); + g_free(jobid); + JOB_CRITICAL_END(job); + return FALSE; +} + +gboolean lmi_job_dump_to_file(const LmiJob *job, const gchar *file_path) +{ + GFile *file; + GFileOutputStream *stream; + GError *gerror = NULL; + gchar *jobid = NULL; + gboolean res = false; + g_assert(LMI_IS_JOB(job)); + + JOB_CRITICAL_BEGIN(job); + if ((file = g_file_new_for_commandline_arg(file_path)) == NULL) + goto err; + if ((jobid = lmi_job_get_jobid(job)) == NULL) { + lmi_error("Memory allocation failed"); + goto err; + } + if ((stream = g_file_replace(file, NULL, FALSE, 0, NULL, &gerror)) == NULL) { + lmi_error("Failed to write job file \"%s\" for job \"%s\": %s", + file_path, jobid, gerror->message); + goto err; + } + res = lmi_job_dump(job, G_OUTPUT_STREAM(stream)); + + if (!g_output_stream_close(G_OUTPUT_STREAM(stream), NULL, &gerror)) { + lmi_error("Failed to close an output stream of file \"%s\": %s", + file_path, gerror->message); + res = FALSE; + } + g_object_unref(stream); +err: + g_clear_error(&gerror); + g_free(jobid); + g_object_unref(file); + JOB_CRITICAL_END(job); + return res; +} + +LmiJob *lmi_job_load(GInputStream *stream, GType job_type) +{ + GObject *object = NULL; + LmiJob *res = NULL; + JsonParser *parser = NULL; + JsonNode *root_node = NULL; + GError *gerror = NULL; + gchar *jobid = NULL; + g_assert(G_IS_INPUT_STREAM(stream)); + g_assert(g_type_is_a(job_type, LMI_TYPE_JOB)); + + if ((parser = json_parser_new()) == NULL) { + lmi_error("Memory allocation failed"); + goto err; + } + if (!json_parser_load_from_stream(parser, stream, NULL, &gerror)) + { + lmi_error("Failed to create parser from input stream."); + goto err; + } + if ((root_node = json_parser_get_root(parser)) == NULL) { + lmi_error("Failed to parse lmi job from input stream."); + goto err; + } + if ((object = json_gobject_deserialize(job_type, root_node)) == NULL) { + lmi_error("Failed to deserialize lmi job."); + goto err; + } + g_object_unref(parser); + res = LMI_JOB(object); + if ((jobid = lmi_job_get_jobid(res)) == NULL) { + lmi_error("Memory allocation failed"); + goto err; + } + + lmi_debug("Successfully parsed and created lmi job \"%s\".", jobid); + + g_free(jobid); + return res; + +err: + g_clear_object(&object); + g_clear_object(&parser); + return res; +} + +LmiJob *lmi_job_load_from_file(const gchar *file_path, GType job_type) +{ + GFile *file = NULL; + GFileInputStream *stream = NULL; + GError *gerror = NULL; + LmiJob *job = NULL; + + if ((file = g_file_new_for_commandline_arg(file_path)) == NULL) + goto err; + if ((stream = g_file_read(file, NULL, &gerror)) == NULL) { + lmi_error("Failed to read job file \"%s\": %s", + file_path, gerror->message); + goto err; + } + job = lmi_job_load(G_INPUT_STREAM(stream), job_type); + + if (!g_input_stream_close(G_INPUT_STREAM(stream), NULL, &gerror)) { + lmi_error("Failed to close input stream of file \"%s\": %s", + file_path, gerror->message); + } + g_object_unref(stream); +err: + g_clear_error(&gerror); + g_clear_object(&file); + return job; +} diff --git a/src/libs/jobmanager/openlmijobmanager.pc.in b/src/libs/jobmanager/openlmijobmanager.pc.in new file mode 100644 index 0000000..3194399 --- /dev/null +++ b/src/libs/jobmanager/openlmijobmanager.pc.in @@ -0,0 +1,11 @@ +prefix=@CMAKE_INSTALL_PREFIX@ +exec_prefix=${prefix} +includedir=${prefix}/include +libdir=${exec_prefix}/lib@LIB_SUFFIX@ + +Name: openlmijobmanager +Description: OpenLMI job manager support +Version: @OPENLMIJOBMANAGER_VERSION@ +Requires: openlmicommon openlmiindsender +Libs: -L${libdir} -lopenlmijobmanager +CFlags: -I${includedir}/openlmi |