summaryrefslogtreecommitdiffstats
path: root/func/minion/modules/delegation.py
blob: b8d6c8da47daaea2e1808905c9bfc5668581a28e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# Copyright 2008, Red Hat, Inc
# Steve Salevan <ssalevan@redhat.com>
#
# This software may be freely redistributed under the terms of the GNU
# general public license.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.

import time
import func_module
import func.overlord.client as fc
import func.jobthing as jobthing

from certmaster import utils as cm_utils
from func.overlord import delegation_tools as dtools

#boolean value appended to job id list to denote direct/delegated calls
DIRECT = False
DELEGATED = True

class DelegationModule(func_module.FuncModule):
    
    version = "0.0.1"
    api_version = "0.0.1"
    description = "Minion-side module to support delegation on sub-Overlords."
    
    def run(self,module,method,args,delegation_list,async,nforks):
        """
        Delegates commands down the path of delegation
        supplied as an argument
        """
        result_dict = {}
        job_id_list = []
        
        #separate list passed to us into minions we can call directly and 
        #further delegation paths
        (single_paths, grouped_paths) = dtools.group_paths(delegation_list)
        
        #run delegated calls
        for group in grouped_paths.keys():
            overlord = fc.Overlord(group,
                                   async=async,
                                   nforks=nforks)
            path_list = grouped_paths[group]
            delegation_results = overlord.delegation.run(module,
                                                         method,
                                                         args,
                                                         path_list,
                                                         async,
                                                         nforks)
            if async:
                job_id_list.append([overlord, 
                                    delegation_results,
                                    group,
                                    True])
            else:
                #These are delegated calls, so we need to strip away the
                #hash that surrounds the results
                if cm_utils.is_error(delegation_results[group]):
                    result_dict.update(delegation_results)
                else:
                    result_dict.update(delegation_results[group])
        
        #run direct calls
        for minion in single_paths:
            overlord = fc.Overlord(minion, 
                                   async=async,
                                   nforks=nforks)
            overlord_module = getattr(overlord,module)
            results = getattr(overlord_module,method)(*args[:])
            if async:
                job_id_list.append([overlord,
                                    results,
                                    minion,
                                    False])
            else:
                result_dict.update(results)
        
        #poll async calls
        while len(job_id_list) > 0:
            for job in job_id_list:
                (return_code, async_results) = job[0].job_status(job[1])
                if return_code == jobthing.JOB_ID_RUNNING:
                    pass #it's still going, ignore it this cycle
                elif return_code == jobthing.JOB_ID_PARTIAL:
                    pass #yep, it's still rolling
                elif return_code == jobthing.JOB_ID_REMOTE_ERROR:
                    result_dict.update(async_results)
                    job_id_list.remove(job)
                else: #it's done or it's had an error, pass it up
                    if job[3] == DIRECT:
                        #this is a direct call, so we only need to
                        #update the hash with the pertinent results
                        results = async_results
                    elif job[3] == DELEGATED:
                        #this is a delegated call, so we need to strip
                        #away the nesting hash
                        results = async_results[job[2]]
                    else:
                        #and this code should never be reached
                        results = {}
                    result_dict.update(results)
                    job_id_list.remove(job)
            time.sleep(0.1) #pause a bit so that we don't flood our minions
        
        return result_dict