summaryrefslogtreecommitdiffstats
path: root/custodia/secrets.py
blob: 8956bd54a70452d34493505708cb595ac39d5399 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright (C) 2015  Custodia Project Contributors - see LICENSE file

from custodia.httpd.consumer import HTTPConsumer
from custodia.httpd.server import HTTPError
from custodia.store.interface import CSStoreError
import json
import os


class Secrets(HTTPConsumer):

    def _get_key(self, namespaces, trail):
        # Check tht the keys is in one of the authorized namespaces
        if len(trail) < 1 or trail[0] not in namespaces:
            raise HTTPError(403)
        # pylint: disable=star-args
        return os.path.join('keys', *trail)

    def _get_filter(self, namespaces, trail, userfilter):
        f = None
        if len(trail) > 0:
            for ns in namespaces:
                if ns == trail[0]:
                    f = self._get_key(namespaces, trail)
                break
        if f is None:
            # Consider the first namespace as the default one
            t = [namespaces[0]] + trail
            f = self._get_key(namespaces, t)
        return '%s/%s' % (f, userfilter)

    def _validate(self, value):
        try:
            msg = json.loads(value)
        except Exception:
            raise ValueError('Invalid JSON in payload')
        if 'type' not in msg:
            raise ValueError('Message type missing')
        if msg['type'] != 'simple':
            raise ValueError('Message type unknown')
        if 'value' not in msg:
            raise ValueError('Message value missing')
        if len(msg.keys()) != 2:
            raise ValueError('Unknown attributes in Message')

    def _namespaces(self, request):
        if 'remote_user' not in request:
            raise HTTPError(403)
        # At the moment we just have one namespace, the user's name
        return [request['remote_user']]

    def GET(self, request, response):
        trail = request.get('trail', [])
        ns = self._namespaces(request)
        if len(trail) == 0 or trail[-1] == '':
            try:
                userfilter = request.get('query', dict()).get('filter', '')
                keyfilter = self._get_filter(ns, trail[:-1], userfilter)
                keydict = self.root.store.list(keyfilter)
                if keydict is None:
                    raise HTTPError(404)
                output = dict()
                for k in keydict:
                    # strip away the internal prefix for storing keys
                    name = k[len('keys/'):]
                    output[name] = json.loads(keydict[k])
                response['output'] = json.dumps(output)
            except CSStoreError:
                raise HTTPError(404)
        else:
            key = self._get_key(ns, trail)
            try:
                output = self.root.store.get(key)
                if output is None:
                    raise HTTPError(404)
                response['output'] = output
            except CSStoreError:
                raise HTTPError(500)

    def PUT(self, request, response):
        trail = request.get('trail', [])
        ns = self._namespaces(request)
        if len(trail) == 0 or trail[-1] == '':
            raise HTTPError(405)
        else:
            content_type = request.get('headers',
                                       dict()).get('Content-Type', '')
            if content_type.split(';')[0].strip() != 'application/json':
                raise HTTPError(400, 'Invalid Content-Type')
            body = request.get('body')
            if body is None:
                raise HTTPError(400)
            value = bytes(body).decode('utf-8')
            try:
                self._validate(value)
            except ValueError as e:
                raise HTTPError(400, str(e))

            key = self._get_key(ns, trail)
            try:
                self.root.store.set(key, value)
            except CSStoreError:
                raise HTTPError(500)


# unit tests
import unittest
from custodia.store.sqlite import SqliteStore


class SecretsTests(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.secrets = Secrets()
        cls.secrets.root.store = SqliteStore({'dburi': 'testdb.sqlite'})

    @classmethod
    def tearDownClass(self):
        try:
            os.unlink('testdb.sqlite')
        except OSError:
            pass

    def test_1_PUTKey(self):
        req = {'headers': {'Content-Type': 'application/json'},
               'remote_user': 'test',
               'trail': ['test', 'key1'],
               'body': '{"type":"simple","value":"1234"}'}
        rep = {}
        self.secrets.PUT(req, rep)

    def test_2_GETKey(self):
        req = {'remote_user': 'test',
               'trail': ['test', 'key1']}
        rep = {}
        self.secrets.GET(req, rep)
        self.assertEqual(rep['output'],
                         '{"type":"simple","value":"1234"}')

    def test_3_LISTKeys(self):
        req = {'remote_user': 'test',
               'trail': ['test', '']}
        rep = {}
        self.secrets.GET(req, rep)
        self.assertEqual(json.loads(rep['output']),
                         json.loads('{"test/key1":'\
                                    '{"type":"simple","value":"1234"}}'))