summaryrefslogtreecommitdiffstats
path: root/base/common/python/pki/cert.py
blob: 3d05df0c34855afe0863ced8df6d439cac8c8465 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#!/usr/bin/python
'''
Created on Feb 13, 2014
Note: The implementation in this file has not been completed and is not tested.
This note should be removed when testing/implementation is complete.

@author: akoneru
'''
import pki.client as client
import pki.encoder as encoder
import json
import types

class CertId(object):
    '''
    Class encapsulating a certificate serial number
    '''

    def __init__(self, cert_id):
        ''' Constructor '''
        if str(cert_id).startswith('0x'):
            #hex number
            print 'hex number'
            self.id = cert_id
        else:
            self.id = cert_id

class CertData(object):
    '''
    Class containing certificate data as returned from getCert()
    '''

    def __init__(self):
        ''' Constructor '''
        self.Encoded = None

    @classmethod
    def from_json(cls, attr_list):
        ''' Return CertData object from JSON dict '''
        cert_data = cls()
        for key in attr_list:
            setattr(cert_data, key, attr_list[key])
        return cert_data

class CertDataInfo(object):
    '''
    Class containing information contained in a CertRecord on the CA.
    This data is returned when searching/listing certificate records.
    '''

    def __init__(self):
        ''' Constructor '''
        self.certId = None
        self.subjectDN = None
        self.status = None
        self.type = None
        self.version = None
        self.keyAlgorithmOID = None
        self.keyLength = None
        self.notValidBefore = None
        self.notValidAfter = None
        self.issuedOn = None
        self.issuedBy = None

    @classmethod
    def from_json(cls, attr_list):
        ''' Return CertDataInfo object from JSON dict '''
        cert_data_info = cls()
        for key in attr_list:
            setattr(cert_data_info, key, attr_list[key])
        return cert_data_info

class CertDataInfos(object):
    '''
    Class containing lists of CertDataInfo objects.
    This data is returned when searching/listing certificate records on the CA.
    '''

    def __init__(self):
        ''' Constructor '''
        self.certInfoList = []
        self.links = []

    @classmethod
    def from_json(cls, json_value):
        ''' Populate object from JSON input '''
        ret = cls()
        cert_infos = json_value['CertDataInfo']
        if not isinstance(cert_infos, types.ListType):
            ret.certInfoList.append(CertDataInfo.from_json(cert_infos))
        else:
            for cert_info in cert_infos:
                ret.certInfoList.append(CertDataInfo.from_json(cert_info))
        return ret

class CertSearchRequest(object):

    def __init__(self):
        self.serialNumberRangeInUse = False
        self.serialTo = None
        self.serialFrom = None
        self.subjectInUse = False
        self.eMail = None
        self.commonName = None
        self.userID = None
        self.orgUnit = None
        self.org = None
        self.locality = None
        self.state = None
        self.country = None
        self.matchExactly = None
        self.status = None
        self.revokedBy = None
        self.revokedOnFrom = None
        self.revokedOnTo = None
        self.revocationReason = None
        self.issuedBy = None
        self.issuedOnFrom = None
        self.issuedOnTo = None
        self.validNotBeforeFrom = None
        self.validNotBeforeTo = None
        self.validNotAfterFrom = None
        self.validNotAfterTo = None
        self.validityOperation = None
        self.validityCount = None
        self.validityUnit = None
        self.certTypeSubEmailCA = None
        self.certTypeSubSSLCA = None
        self.certTypeSecureEmail = None
        self.certTypeSSLClient = None
        self.certTypeSSLServer = None
        self.revokedByInUse = False
        self.revokedOnInUse = False
        self.revocationReasonInUse = None
        self.issuedByInUse = False
        self.issuedOnInUse = False
        self.validNotBeforeInUse = False
        self.validNotAfterInUse = False
        self.validityLengthInUse = False
        self.certTypeInUse = False


class CertClient(object):
    '''
    Class encapsulating and mirroring the functionality in the CertResouce Java interface class
    defining the REST API for Certificate resources.
    '''

    def __init__(self, connection):
        ''' Constructor '''
        #super(PKIResource, self).__init__(connection)
        self.connection = connection
        self.headers = {'Content-type': 'application/json',
                        'Accept': 'application/json'}
        self.cert_url = '/rest/certs'
        self.agent_cert_url = '/rest/agent/certs'

    def getCert(self, cert_id):
        ''' Return a CertData object for a particular certificate. '''
        url = self.cert_url + '/' + str(cert_id.id)
        response = self.connection.get(url, self.headers)
        certData = encoder.CustomTypeDecoder(response.json())
        return certData

    def listCerts(self, status = None):
        ''' Return a CertDataInfos object for specific certs
            Not sure I understand what this method is for.
        '''
        if status is not None:
            cert_search_request =  CertSearchRequest()
            cert_search_request.status = status

        response = self.connection.get(self.cert_url, self.headers)
        print response.json()

    def searchCerts(self, cert_search_request):
        ''' Return a CertDataInfos object containing the results of a cert search.'''
        url = self.cert_url + '/search'
        searchRequest = json.dumps(cert_search_request, cls=encoder.CustomTypeEncoder)
        r = self.connection.post(url, searchRequest, self.headers)
        print r.json()['CertDataInfos']
        return CertDataInfos.from_json(r.json()['CertDataInfos'])

    def getCerts(self, cert_search_request):
        ''' Doctring needed here. '''
        pass

    def reviewCert(self, cert_id):
        ''' Doc string needed here. '''
        pass

    def revokeCert(self, cert_id, cert_revoke_request):
        ''' Doc string needed here '''
        pass

    def revokeCACert(self, cert_id, cert_revoke_request):
        ''' Doc string needed here. '''
        pass

    def unrevokecert(self, cert_id, cert_unrevoke_request):
        ''' Doc string needed here '''
        pass

encoder.NOTYPES['CertData'] = CertData
encoder.NOTYPES['CertSearchRequest'] = CertSearchRequest


def main():
    connection = client.PKIConnection('http', 'localhost', '8080', 'ca')
    connection.authenticate('caadmin', 'Secret123')
    certResource = CertClient(connection)
    cert = certResource.getCert(CertId('0x6'))
    print cert

if __name__ == "__main__":
    main()