1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# Copyright 2008, Red Hat, Inc
# Steve Salevan <ssalevan@redhat.com>
#
# This software may be freely redistributed under the terms of the GNU
# general public license.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import time
import func_module
import func.overlord.client as fc
import func.jobthing as jobthing
from certmaster import utils as cm_utils
from func.overlord import delegation_tools as dtools
#boolean value appended to job id list to denote direct/delegated calls
DIRECT = False
DELEGATED = True
class DelegationModule(func_module.FuncModule):
version = "0.0.1"
api_version = "0.0.1"
description = "Minion-side module to support delegation on sub-Overlords."
def run(self,module,method,args,delegation_list,async,nforks):
"""
Delegates commands down the path of delegation
supplied as an argument
"""
result_dict = {}
job_id_list = []
#separate list passed to us into minions we can call directly and
#further delegation paths
(single_paths, grouped_paths) = dtools.group_paths(delegation_list)
#run delegated calls
for group in grouped_paths.keys():
overlord = fc.Overlord(group,
async=async,
nforks=nforks)
path_list = grouped_paths[group]
delegation_results = overlord.delegation.run(module,
method,
args,
path_list,
async,
nforks)
if async:
job_id_list.append([overlord,
delegation_results,
group,
True])
else:
#These are delegated calls, so we need to strip away the
#hash that surrounds the results
if cm_utils.is_error(delegation_results[group]):
result_dict.update(delegation_results)
else:
result_dict.update(delegation_results[group])
#run direct calls
for minion in single_paths:
overlord = fc.Overlord(minion,
async=async,
nforks=nforks)
overlord_module = getattr(overlord,module)
results = getattr(overlord_module,method)(*args[:])
if async:
job_id_list.append([overlord,
results,
minion,
False])
else:
result_dict.update(results)
#poll async calls
while len(job_id_list) > 0:
for job in job_id_list:
(return_code, async_results) = job[0].job_status(job[1])
if return_code == jobthing.JOB_ID_RUNNING:
pass #it's still going, ignore it this cycle
elif return_code == jobthing.JOB_ID_PARTIAL:
pass #yep, it's still rolling
elif return_code == jobthing.JOB_ID_REMOTE_ERROR:
result_dict.update(async_results)
job_id_list.remove(job)
else: #it's done or it's had an error, pass it up
if job[3] == DIRECT:
#this is a direct call, so we only need to
#update the hash with the pertinent results
results = async_results
elif job[3] == DELEGATED:
#this is a delegated call, so we need to strip
#away the nesting hash
results = async_results[job[2]]
else:
#and this code should never be reached
results = {}
result_dict.update(results)
job_id_list.remove(job)
time.sleep(0.1) #pause a bit so that we don't flood our minions
return result_dict
|