#!/usr/bin/env python # Copyright (C) 2012 Peter Hatina # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, see . import sys import pywbem class CuraUserClient(): USER_CLASS_NAME = "LMI_Account" GROUP_CLASS_NAME = "LMI_Group" MEMBER_OF_GROUP_CLASS_NAME = "LMI_MemberOfGroup" ASSIGNED_IDENTITY_CLASS_NAME = "LMI_AssignedAccountIdentity" ACCOUNT_MANAGEMENT_SERVICE_CLASS_NAME = "LMI_AccountManagementService" COMPUTER_SYSTEM_CLASS_NAME = "CIM_ComputerSystem" def __init__(self, hostname, username = "", password = ""): self.m_hostname = hostname self.m_username = username self.m_password = password self.m_cliconn = pywbem.WBEMConnection("https://" + self.m_hostname, (self.m_username, self.m_password)) def __getInstances(self, klass, filter_name = ""): try: inst_name_list = self.m_cliconn.EnumerateInstanceNames(klass) except pywbem.cim_operations.CIMError, e: return (None, e.args[1]) except pywbem.cim_http.AuthError, e: return (None, e.args[0]) inst_names = inst_name_list if len(filter_name): inst_names = filter(lambda n: n["Name"] == filter_name, inst_name_list) inst_list = [] for i in inst_names: inst_list.append(self.m_cliconn.GetInstance(i, LocalOnly = False)) if not len(inst_list): return (None, "Not found") return (inst_list, "") def __getUserInstances(self): return self.__getInstances(CuraUserClient.USER_CLASS_NAME) def __getGroupInstances(self): return self.__getInstances(CuraUserClient.GROUP_CLASS_NAME) def __getAccountManagementInstance(self): return self.__getInstances(CuraUserClient.ACCOUNT_MANAGEMENT_SERVICE_CLASS_NAME) def __getComputerSystemInstance(self): return self.__getInstances(CuraUserClient.COMPUTER_SYSTEM_CLASS_NAME) def __getMemberGroups(self, inst): members = [] identities = self.m_cliconn.Associators(inst.path, AssocClass = CuraUserClient.ASSIGNED_IDENTITY_CLASS_NAME) for identity in identities: accounts = self.m_cliconn.Associators(identity.path, AssocClass = CuraUserClient.MEMBER_OF_GROUP_CLASS_NAME) members.extend(accounts) return members def __getGroupMembers(self, inst): members = [] for g in inst: identities = self.m_cliconn.Associators(g.path, AssocClass = CuraUserClient.MEMBER_OF_GROUP_CLASS_NAME) for i in identities: accounts = self.m_cliconn.Associators(i.path, AssocClass = CuraUserClient.ASSIGNED_IDENTITY_CLASS_NAME) for a in accounts: members.append(a["Name"]) return members def listUsers(self): (inst_list, rparam) = self.__getUserInstances() if not inst_list: return (False, "No users found") sys.stdout.write("%s:\n" % self.m_hostname) try: for i in inst_list: groups = self.__getMemberGroups(i) gid = groups[0]["InstanceID"].split(":")[-1] sys.stdout.write("%s:x:%s:%s:%s:%s:%s\n" % (i["Name"], i["UserID"], gid, i["ElementName"], i["HomeDirectory"], i["LoginShell"])) except pywbem.cim_operations.CIMError, e: return (False, e.args[1]) return (True, "") def listGroups(self): (inst_list, rparam) = self.__getGroupInstances() if not inst_list: return (False, "No groups found") for i in inst_list: sys.stdout.write("%s:%s\n" % (i["Name"], i["InstanceID"].split(":")[-1])) return (True, "") def listGroupMembers(self, member): (inst_list, rparam) = self.__getInstances( CuraUserClient.GROUP_CLASS_NAME, member) if not inst_list: return (False, "No such group '%s' found" % member) members = self.__getGroupMembers(inst_list) for m in members: sys.stdout.write("%s\n" % m) return (True, "") def userAdd(self, user_name, **params): (inst_management, rparam) = self.__getAccountManagementInstance() if not inst_management: return (False, rparam) # We are using the first and only instance present. # For KVM_ComputerSystem, there has be some CLI option added. inst_management = inst_management[0] (inst_computer_system, rparam) = self.__getComputerSystemInstance() if not inst_computer_system: return (False, rparam) user_params = { "Name" : user_name } user_params["DontCreateHome"] = not params["createHome"] user_params["System"] = inst_computer_system[0].path user_params["DontCreateGroup"] = not params["createGroup"] if params["shell"]: user_params["Shell"] = params["shell"] if params["systemAccount"]: user_params["SystemAccount"] = params["systemAccount"] if params["gid"]: user_params["GID"] = pywbem.Uint32(params["gid"]) if params["gecos"]: user_params["GECOS"] = params["gecos"] if params["homeDir"]: user_params["HomeDirectory"] = params["homeDir"] if params["password"]: user_params["Password"] = params["password"] if params["uid"]: user_params["UID"] = pywbem.Uint32(params["uid"]) try: (account, identity) = self.m_cliconn.InvokeMethod( "CreateAccount", inst_management.path, **user_params) except pywbem.CIMError, e: return (False, e.args[1]) return (account != None, "")