From 9d0a6d88d54ad38c4ccc564aa417beed310712fc Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Fri, 14 Sep 2012 00:56:42 -0700 Subject: Deserialize user_data in xml servers request Also adds better validation for b64 encoded data since python will happily ignore illegal bytes in the base64 encoded data. Fixes bug 1050797 Change-Id: I4f380d029ec2d51f130a6dbd581410873e414216 --- nova/api/openstack/compute/servers.py | 32 +++++++++++++++----- nova/tests/api/openstack/compute/test_servers.py | 38 ++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/nova/api/openstack/compute/servers.py b/nova/api/openstack/compute/servers.py index f448d3fb9..b83ca0668 100644 --- a/nova/api/openstack/compute/servers.py +++ b/nova/api/openstack/compute/servers.py @@ -16,11 +16,12 @@ import base64 import os +import re import socket -from xml.dom import minidom import webob from webob import exc +from xml.dom import minidom from nova.api.openstack import common from nova.api.openstack.compute import ips @@ -167,6 +168,10 @@ class CommonDeserializer(wsgi.MetadataXMLDeserializer): if metadata_node is not None: server["metadata"] = self.extract_metadata(metadata_node) + user_data_node = self.find_first_child_named(server_node, "user_data") + if user_data_node is not None: + server["user_data"] = self.extract_text(user_data_node) + personality = self._extract_personality(server_node) if personality is not None: server["personality"] = personality @@ -500,9 +505,8 @@ class Controller(wsgi.Controller): except TypeError: expl = _('Bad personality format') raise exc.HTTPBadRequest(explanation=expl) - try: - contents = base64.b64decode(contents) - except TypeError: + contents = self._decode_base64(contents) + if contents is None: expl = _('Personality content for %s cannot be decoded') % path raise exc.HTTPBadRequest(explanation=expl) injected_files.append((path, contents)) @@ -572,13 +576,27 @@ class Controller(wsgi.Controller): return networks + # NOTE(vish): Without this regex, b64decode will happily + # ignore illegal bytes in the base64 encoded + # data. + B64_REGEX = re.compile('^(?:[A-Za-z0-9+\/]{4})*' + '(?:[A-Za-z0-9+\/]{2}==' + '|[A-Za-z0-9+\/]{3}=)?$') + + def _decode_base64(self, data): + data = re.sub(r'\s', '', data) + if not self.B64_REGEX.match(data): + return None + try: + return base64.b64decode(data) + except TypeError: + return None + def _validate_user_data(self, user_data): """Check if the user_data is encoded properly.""" if not user_data: return - try: - user_data = base64.b64decode(user_data) - except TypeError: + if self._decode_base64(user_data) is None: expl = _('Userdata content cannot be decoded') raise exc.HTTPBadRequest(explanation=expl) diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py index 1a2026900..aa8f6132b 100644 --- a/nova/tests/api/openstack/compute/test_servers.py +++ b/nova/tests/api/openstack/compute/test_servers.py @@ -16,6 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. +import base64 import datetime import urlparse @@ -94,6 +95,43 @@ class MockSetAdminPassword(object): self.password = password +class Base64ValidationTest(test.TestCase): + def setUp(self): + super(Base64ValidationTest, self).setUp() + self.ext_mgr = extensions.ExtensionManager() + self.ext_mgr.extensions = {} + self.controller = servers.Controller(self.ext_mgr) + + def test_decode_base64(self): + value = "A random string" + result = self.controller._decode_base64(base64.b64encode(value)) + self.assertEqual(result, value) + + def test_decode_base64_binary(self): + value = "\x00\x12\x75\x99" + result = self.controller._decode_base64(base64.b64encode(value)) + self.assertEqual(result, value) + + def test_decode_base64_whitespace(self): + value = "A random string" + encoded = base64.b64encode(value) + white = "\n \n%s\t%s\n" % (encoded[:2], encoded[2:]) + result = self.controller._decode_base64(white) + self.assertEqual(result, value) + + def test_decode_base64_invalid(self): + invalid = "A random string" + result = self.controller._decode_base64(invalid) + self.assertEqual(result, None) + + def test_decode_base64_illegal_bytes(self): + value = "A random string" + encoded = base64.b64encode(value) + white = ">\x01%s*%s()" % (encoded[:2], encoded[2:]) + result = self.controller._decode_base64(white) + self.assertEqual(result, None) + + class ServersControllerTest(test.TestCase): def setUp(self): -- cgit