1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# Copyright (C) 2015 Custodia Project Contributors - see LICENSE file
from custodia.httpd.consumer import HTTPConsumer
from custodia.httpd.server import HTTPError
from custodia.store.interface import CSStoreError
import json
import os
class Secrets(HTTPConsumer):
def _get_key(self, namespaces, trail):
# Check tht the keys is in one of the authorized namespaces
if len(trail) < 1 or trail[0] not in namespaces:
raise HTTPError(403)
# pylint: disable=star-args
return os.path.join('keys', *trail)
def _get_filter(self, namespaces, trail, userfilter):
f = None
if len(trail) > 0:
for ns in namespaces:
if ns == trail[0]:
f = self._get_key(namespaces, trail)
break
if f is None:
# Consider the first namespace as the default one
t = [namespaces[0]] + trail
f = self._get_key(namespaces, t)
return '%s/%s' % (f, userfilter)
def _validate(self, value):
try:
msg = json.loads(value)
except Exception:
raise ValueError('Invalid JSON in payload')
if 'type' not in msg:
raise ValueError('Message type missing')
if msg['type'] != 'simple':
raise ValueError('Message type unknown')
if 'value' not in msg:
raise ValueError('Message value missing')
if len(msg.keys()) != 2:
raise ValueError('Unknown attributes in Message')
def _namespaces(self, request):
if 'remote_user' not in request:
raise HTTPError(403)
# At the moment we just have one namespace, the user's name
return [request['remote_user']]
def GET(self, request, response):
trail = request.get('trail', [])
ns = self._namespaces(request)
if len(trail) == 0 or trail[-1] == '':
try:
userfilter = request.get('query', dict()).get('filter', '')
keyfilter = self._get_filter(ns, trail[:-1], userfilter)
keydict = self.root.store.list(keyfilter)
if keydict is None:
raise HTTPError(404)
output = dict()
for k in keydict:
# strip away the internal prefix for storing keys
name = k[len('keys/'):]
output[name] = json.loads(keydict[k])
response['output'] = json.dumps(output)
except CSStoreError:
raise HTTPError(404)
else:
key = self._get_key(ns, trail)
try:
output = self.root.store.get(key)
if output is None:
raise HTTPError(404)
response['output'] = output
except CSStoreError:
raise HTTPError(500)
def PUT(self, request, response):
trail = request.get('trail', [])
ns = self._namespaces(request)
if len(trail) == 0 or trail[-1] == '':
raise HTTPError(405)
else:
content_type = request.get('headers',
dict()).get('Content-Type', '')
if content_type.split(';')[0].strip() != 'application/json':
raise HTTPError(400, 'Invalid Content-Type')
body = request.get('body')
if body is None:
raise HTTPError(400)
value = bytes(body).decode('utf-8')
try:
self._validate(value)
except ValueError as e:
raise HTTPError(400, str(e))
key = self._get_key(ns, trail)
try:
self.root.store.set(key, value)
except CSStoreError:
raise HTTPError(500)
# unit tests
import unittest
from custodia.store.sqlite import SqliteStore
class SecretsTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.secrets = Secrets()
cls.secrets.root.store = SqliteStore({'dburi': 'testdb.sqlite'})
@classmethod
def tearDownClass(self):
try:
os.unlink('testdb.sqlite')
except OSError:
pass
def test_1_PUTKey(self):
req = {'headers': {'Content-Type': 'application/json'},
'remote_user': 'test',
'trail': ['test', 'key1'],
'body': '{"type":"simple","value":"1234"}'}
rep = {}
self.secrets.PUT(req, rep)
def test_2_GETKey(self):
req = {'remote_user': 'test',
'trail': ['test', 'key1']}
rep = {}
self.secrets.GET(req, rep)
self.assertEqual(rep['output'],
'{"type":"simple","value":"1234"}')
def test_3_LISTKeys(self):
req = {'remote_user': 'test',
'trail': ['test', '']}
rep = {}
self.secrets.GET(req, rep)
self.assertEqual(json.loads(rep['output']),
json.loads('{"test/key1":'\
'{"type":"simple","value":"1234"}}'))
|