diff options
| author | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-07-19 14:45:39 -0500 |
|---|---|---|
| committer | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-07-19 14:45:39 -0500 |
| commit | 19a8556917ffebf9f85e68df4a841e1ea97124d2 (patch) | |
| tree | 3e45898c6f4a58fd17759ad29d8117294d4914ad | |
| parent | 5d647826d6345bfdd87bad10995319679f39679f (diff) | |
| parent | d52925f1f3293a4ad1692bd1aab846c713f7c3de (diff) | |
| download | nova-19a8556917ffebf9f85e68df4a841e1ea97124d2.tar.gz nova-19a8556917ffebf9f85e68df4a841e1ea97124d2.tar.xz nova-19a8556917ffebf9f85e68df4a841e1ea97124d2.zip | |
merged trunk
| -rw-r--r-- | .bzrignore | 1 | ||||
| -rwxr-xr-x | bin/nova-objectstore | 25 | ||||
| -rw-r--r-- | debian/control | 2 | ||||
| -rw-r--r-- | debian/nova-objectstore.install | 1 | ||||
| -rw-r--r-- | debian/nova-objectstore.links | 1 | ||||
| -rw-r--r-- | debian/nova-objectstore.nginx.conf | 17 | ||||
| -rw-r--r-- | nova/compute/linux_net.py | 4 | ||||
| -rw-r--r-- | nova/flags.py | 1 | ||||
| -rw-r--r-- | nova/objectstore/handler.py | 385 | ||||
| -rw-r--r-- | nova/rpc.py | 9 | ||||
| -rw-r--r-- | run_tests.py | 9 |
11 files changed, 218 insertions, 237 deletions
diff --git a/.bzrignore b/.bzrignore new file mode 100644 index 000000000..93fc868a3 --- /dev/null +++ b/.bzrignore @@ -0,0 +1 @@ +run_tests.err.log diff --git a/bin/nova-objectstore b/bin/nova-objectstore index 521f3d5d1..9385fd299 100755 --- a/bin/nova-objectstore +++ b/bin/nova-objectstore @@ -18,33 +18,32 @@ # under the License. """ - Tornado daemon for nova objectstore. Supports S3 API. + Twisted daemon for nova objectstore. Supports S3 API. """ import logging -from tornado import httpserver -from tornado import ioloop from nova import flags -from nova import server from nova import utils -from nova.auth import users +from nova import twistd from nova.objectstore import handler FLAGS = flags.FLAGS -def main(argv): +def main(): # FIXME: if this log statement isn't here, no logging # appears from other files and app won't start daemonized - logging.debug('Started HTTP server on %s' % (FLAGS.s3_internal_port)) - app = handler.Application(users.UserManager()) - server = httpserver.HTTPServer(app) - server.listen(FLAGS.s3_internal_port) - ioloop.IOLoop.instance().start() - + logging.debug('Started HTTP server on %s' % (FLAGS.s3_port)) + app = handler.get_application() + print app + return app +# NOTE(soren): Stolen from nova-compute if __name__ == '__main__': + twistd.serve(__file__) + +if __name__ == '__builtin__': utils.default_flagfile() - server.serve('nova-objectstore', main) + application = main() diff --git a/debian/control b/debian/control index 17414bb7a..a6d12f36e 100644 --- a/debian/control +++ b/debian/control @@ -91,7 +91,7 @@ Description: Nova Cloud Computing - API frontend Package: nova-objectstore Architecture: all -Depends: nova-common (= ${binary:Version}), nginx, ${python:Depends}, ${misc:Depends} +Depends: nova-common (= ${binary:Version}), ${python:Depends}, ${misc:Depends} Description: Nova Cloud Computing - object store Nova is a cloud computing fabric controller (the main part of an IaaS system) built to match the popular AWS EC2 and S3 APIs. It is written in diff --git a/debian/nova-objectstore.install b/debian/nova-objectstore.install index 3ed93ff37..c5b3d997a 100644 --- a/debian/nova-objectstore.install +++ b/debian/nova-objectstore.install @@ -1,3 +1,2 @@ bin/nova-objectstore usr/bin debian/nova-objectstore.conf etc/nova -debian/nova-objectstore.nginx.conf etc/nginx/sites-available diff --git a/debian/nova-objectstore.links b/debian/nova-objectstore.links deleted file mode 100644 index 38e33948e..000000000 --- a/debian/nova-objectstore.links +++ /dev/null @@ -1 +0,0 @@ -/etc/nginx/sites-available/nova-objectstore.nginx.conf /etc/nginx/sites-enabled/nova-objectstore.nginx.conf diff --git a/debian/nova-objectstore.nginx.conf b/debian/nova-objectstore.nginx.conf deleted file mode 100644 index b63424150..000000000 --- a/debian/nova-objectstore.nginx.conf +++ /dev/null @@ -1,17 +0,0 @@ -server { - listen 3333 default; - server_name localhost; - client_max_body_size 10m; - - access_log /var/log/nginx/localhost.access.log; - - location ~ /_images/.+ { - root /var/lib/nova/images; - rewrite ^/_images/(.*)$ /$1 break; - } - - location / { - proxy_pass http://localhost:3334/; - } -} - diff --git a/nova/compute/linux_net.py b/nova/compute/linux_net.py index eb9614194..48e07da66 100644 --- a/nova/compute/linux_net.py +++ b/nova/compute/linux_net.py @@ -37,13 +37,13 @@ def execute(cmd, addl_env=None): logging.debug("FAKE NET: %s" % cmd) return "fake", 0 else: - return nova.utils.execute(cmd, addl_env=addl_env) + return utils.execute(cmd, addl_env=addl_env) def runthis(desc, cmd): if FLAGS.fake_network: return execute(cmd) else: - return nova.utils.runthis(desc,cmd) + return utils.runthis(desc,cmd) def Popen(cmd): if FLAGS.fake_network: diff --git a/nova/flags.py b/nova/flags.py index 60245a349..06ea1e007 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -37,7 +37,6 @@ DEFINE_bool = DEFINE_bool # http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#39 DEFINE_integer('s3_port', 3333, 's3 port') -DEFINE_integer('s3_internal_port', 3334, 's3 port') DEFINE_string('s3_host', '127.0.0.1', 's3 host') #DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on') DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on') diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py index 8377a57a6..b2ed3d482 100644 --- a/nova/objectstore/handler.py +++ b/nova/objectstore/handler.py @@ -1,10 +1,11 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. # -# Copyright 2009 Facebook +# Copyright 2010 OpenStack LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Copyright 2009 Facebook # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -37,15 +38,21 @@ S3 client with this module:: """ import datetime -import os -import json import logging +import json import multiprocessing -from tornado import escape, web +import os +from tornado import escape import urllib +from twisted.application import internet, service +from twisted.web.resource import Resource +from twisted.web import server, static + + from nova import exception from nova import flags +from nova.auth import users from nova.endpoint import api from nova.objectstore import bucket from nova.objectstore import image @@ -53,241 +60,213 @@ from nova.objectstore import image FLAGS = flags.FLAGS - -def catch_nova_exceptions(target): - # FIXME: find a way to wrap all handlers in the web.Application.__init__ ? - def wrapper(*args, **kwargs): - try: - return target(*args, **kwargs) - except exception.NotFound: - raise web.HTTPError(404) - except exception.NotAuthorized: - raise web.HTTPError(403) - - return wrapper - - -class Application(web.Application): +def render_xml(request, value): + assert isinstance(value, dict) and len(value) == 1 + request.setHeader("Content-Type", "application/xml; charset=UTF-8") + + name = value.keys()[0] + request.write('<?xml version="1.0" encoding="UTF-8"?>\n') + request.write('<' + escape.utf8(name) + + ' xmlns="http://doc.s3.amazonaws.com/2006-03-01">') + _render_parts(value.values()[0], request.write) + request.write('</' + escape.utf8(name) + '>') + request.finish() + +def finish(request, content=None): + if content: + request.write(content) + request.finish() + +def _render_parts(value, write_cb): + if isinstance(value, basestring): + write_cb(escape.xhtml_escape(value)) + elif isinstance(value, int) or isinstance(value, long): + write_cb(str(value)) + elif isinstance(value, datetime.datetime): + write_cb(value.strftime("%Y-%m-%dT%H:%M:%S.000Z")) + elif isinstance(value, dict): + for name, subvalue in value.iteritems(): + if not isinstance(subvalue, list): + subvalue = [subvalue] + for subsubvalue in subvalue: + write_cb('<' + escape.utf8(name) + '>') + _render_parts(subsubvalue, write_cb) + write_cb('</' + escape.utf8(name) + '>') + else: + raise Exception("Unknown S3 value type %r", value) + +def get_argument(request, key, default_value): + if key in request.args: + return request.args[key][0] + return default_value + +def get_context(request): + try: + # Authorization Header format: 'AWS <access>:<secret>' + access, sep, secret = request.getHeader('Authorization').split(' ')[1].rpartition(':') + um = users.UserManager.instance() + print 'um %s' % um + (user, project) = um.authenticate(access, secret, {}, request.method, request.host, request.uri, False) + # FIXME: check signature here! + return api.APIRequestContext(None, user, project) + except exception.Error, ex: + logging.debug("Authentication Failure: %s" % ex) + raise exception.NotAuthorized + +class S3(Resource): """Implementation of an S3-like storage server based on local files.""" - def __init__(self, user_manager): - web.Application.__init__(self, [ - (r"/", RootHandler), - (r"/_images/(.+)", ImageDownloadHandler), - (r"/_images/", ImageHandler), - (r"/([^/]+)/(.+)", ObjectHandler), - (r"/([^/]+)/", BucketHandler), - ]) - self.buckets_path = os.path.abspath(FLAGS.buckets_path) - self.images_path = os.path.abspath(FLAGS.images_path) - - if not os.path.exists(self.buckets_path): - raise Exception("buckets_path does not exist") - if not os.path.exists(self.images_path): - raise Exception("images_path does not exist") - self.user_manager = user_manager - - -class BaseRequestHandler(web.RequestHandler): - SUPPORTED_METHODS = ("PUT", "GET", "DELETE", "HEAD") - - @property - def context(self): - if not hasattr(self, '_context'): - try: - # Authorization Header format: 'AWS <access>:<secret>' - access, sep, secret = self.request.headers['Authorization'].split(' ')[1].rpartition(':') - (user, project) = self.application.user_manager.authenticate(access, secret, {}, self.request.method, self.request.host, self.request.path, False) - # FIXME: check signature here! - self._context = api.APIRequestContext(self, user, project) - except exception.Error, ex: - logging.debug("Authentication Failure: %s" % ex) - raise web.HTTPError(403) - return self._context - - def render_xml(self, value): - assert isinstance(value, dict) and len(value) == 1 - self.set_header("Content-Type", "application/xml; charset=UTF-8") - name = value.keys()[0] - parts = [] - parts.append('<' + escape.utf8(name) + - ' xmlns="http://doc.s3.amazonaws.com/2006-03-01">') - self._render_parts(value.values()[0], parts) - parts.append('</' + escape.utf8(name) + '>') - self.finish('<?xml version="1.0" encoding="UTF-8"?>\n' + - ''.join(parts)) - - def _render_parts(self, value, parts=[]): - if isinstance(value, basestring): - parts.append(escape.xhtml_escape(value)) - elif isinstance(value, int) or isinstance(value, long): - parts.append(str(value)) - elif isinstance(value, datetime.datetime): - parts.append(value.strftime("%Y-%m-%dT%H:%M:%S.000Z")) - elif isinstance(value, dict): - for name, subvalue in value.iteritems(): - if not isinstance(subvalue, list): - subvalue = [subvalue] - for subsubvalue in subvalue: - parts.append('<' + escape.utf8(name) + '>') - self._render_parts(subsubvalue, parts) - parts.append('</' + escape.utf8(name) + '>') - else: - raise Exception("Unknown S3 value type %r", value) - - def head(self, *args, **kwargs): - return self.get(*args, **kwargs) + def getChild(self, name, request): + request.context = get_context(request) + if name == '': + return self + elif name == '_images': + return ImageResource() + else: + return BucketResource(name) -class RootHandler(BaseRequestHandler): - def get(self): - buckets = [b for b in bucket.Bucket.all() if b.is_authorized(self.context)] + def render_GET(self, request): + buckets = [b for b in bucket.Bucket.all() if b.is_authorized(request.context)] - self.render_xml({"ListAllMyBucketsResult": { + render_xml(request, {"ListAllMyBucketsResult": { "Buckets": {"Bucket": [b.metadata for b in buckets]}, }}) + return server.NOT_DONE_YET + +class BucketResource(Resource): + def __init__(self, name): + Resource.__init__(self) + self.name = name + def getChild(self, name, request): + if name == '': + return self + else: + return ObjectResource(bucket.Bucket(self.name), name) -class BucketHandler(BaseRequestHandler): - @catch_nova_exceptions - def get(self, bucket_name): - logging.debug("List keys for bucket %s" % (bucket_name)) + def render_GET(self, request): + logging.debug("List keys for bucket %s" % (self.name)) - bucket_object = bucket.Bucket(bucket_name) + bucket_object = bucket.Bucket(self.name) - if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + if not bucket_object.is_authorized(request.context): + raise exception.NotAuthorized - prefix = self.get_argument("prefix", u"") - marker = self.get_argument("marker", u"") - max_keys = int(self.get_argument("max-keys", 1000)) - terse = int(self.get_argument("terse", 0)) + prefix = get_argument(request, "prefix", u"") + marker = get_argument(request, "marker", u"") + max_keys = int(get_argument(request, "max-keys", 1000)) + terse = int(get_argument(request, "terse", 0)) results = bucket_object.list_keys(prefix=prefix, marker=marker, max_keys=max_keys, terse=terse) - self.render_xml({"ListBucketResult": results}) + render_xml(request, {"ListBucketResult": results}) + return server.NOT_DONE_YET - @catch_nova_exceptions - def put(self, bucket_name): - logging.debug("Creating bucket %s" % (bucket_name)) - bucket.Bucket.create(bucket_name, self.context) - self.finish() + def render_PUT(self, request): + logging.debug("Creating bucket %s" % (self.name)) + try: + print 'user is %s' % request.context + except Exception, e: + logging.exception(e) + logging.debug("calling bucket.Bucket.create(%r, %r)" % (self.name, request.context)) + bucket.Bucket.create(self.name, request.context) + return '' - @catch_nova_exceptions - def delete(self, bucket_name): - logging.debug("Deleting bucket %s" % (bucket_name)) - bucket_object = bucket.Bucket(bucket_name) + def render_DELETE(self, request): + logging.debug("Deleting bucket %s" % (self.name)) + bucket_object = bucket.Bucket(self.name) - if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + if not bucket_object.is_authorized(request.context): + raise exception.NotAuthorized bucket_object.delete() - self.set_status(204) - self.finish() - - -class ObjectHandler(BaseRequestHandler): - @catch_nova_exceptions - def get(self, bucket_name, object_name): - logging.debug("Getting object: %s / %s" % (bucket_name, object_name)) - - bucket_object = bucket.Bucket(bucket_name) + request.setResponseCode(204) + return '' - if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) - obj = bucket_object[urllib.unquote(object_name)] - self.set_header("Content-Type", "application/unknown") - self.set_header("Last-Modified", datetime.datetime.utcfromtimestamp(obj.mtime)) - self.set_header("Etag", '"' + obj.md5 + '"') - self.finish(obj.read()) +class ObjectResource(Resource): + def __init__(self, bucket, name): + Resource.__init__(self) + self.bucket = bucket + self.name = name - @catch_nova_exceptions - def put(self, bucket_name, object_name): - logging.debug("Putting object: %s / %s" % (bucket_name, object_name)) - bucket_object = bucket.Bucket(bucket_name) + def render_GET(self, request): + logging.debug("Getting object: %s / %s" % (self.bucket.name, self.name)) - if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + if not self.bucket.is_authorized(request.context): + raise exception.NotAuthorized - key = urllib.unquote(object_name) - bucket_object[key] = self.request.body - self.set_header("Etag", '"' + bucket_object[key].md5 + '"') - self.finish() + obj = self.bucket[urllib.unquote(self.name)] + request.setHeader("Content-Type", "application/unknown") + request.setHeader("Last-Modified", datetime.datetime.utcfromtimestamp(obj.mtime)) + request.setHeader("Etag", '"' + obj.md5 + '"') + return static.File(obj.path).render_GET(request) - @catch_nova_exceptions - def delete(self, bucket_name, object_name): - logging.debug("Deleting object: %s / %s" % (bucket_name, object_name)) - bucket_object = bucket.Bucket(bucket_name) + def render_PUT(self, request): + logging.debug("Putting object: %s / %s" % (self.bucket.name, self.name)) - if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + if not self.bucket.is_authorized(request.context): + raise exception.NotAuthorized - del bucket_object[urllib.unquote(object_name)] - self.set_status(204) - self.finish() + key = urllib.unquote(self.name) + request.content.seek(0, 0) + self.bucket[key] = request.content.read() + request.setHeader("Etag", '"' + self.bucket[key].md5 + '"') + finish(request) + return server.NOT_DONE_YET + def render_DELETE(self, request): + logging.debug("Deleting object: %s / %s" % (self.bucket.name, self.name)) -class ImageDownloadHandler(BaseRequestHandler): - SUPPORTED_METHODS = ("GET", ) + if not self.bucket.is_authorized(request.context): + raise exception.NotAuthorized - @catch_nova_exceptions - def get(self, image_id): - """ send the decrypted image file + del self.bucket[urllib.unquote(self.name)] + request.setResponseCode(204) + return '' - streaming content through python is slow and should only be used - in development mode. You should serve files via a web server - in production. - """ +class ImageResource(Resource): + isLeaf = True - self.set_header("Content-Type", "application/octet-stream") - - READ_SIZE = 64*1024 - - img = image.Image(image_id) - with open(img.image_path, 'rb') as fp: - s = fp.read(READ_SIZE) - while s: - self.write(s) - s = fp.read(READ_SIZE) - - self.finish() - -class ImageHandler(BaseRequestHandler): - SUPPORTED_METHODS = ("POST", "PUT", "GET", "DELETE") + def getChild(self, name, request): + if name == '': + return self + else: + request.setHeader("Content-Type", "application/octet-stream") + img = image.Image(name) + return static.File(img.image_path) - @catch_nova_exceptions - def get(self): + def render_GET(self, request): """ returns a json listing of all images that a user has permissions to see """ images = [i for i in image.Image.all() if i.is_authorized(self.context)] - self.finish(json.dumps([i.metadata for i in images])) + request.write(json.dumps([i.metadata for i in images])) + return server.NOT_DONE_YET - @catch_nova_exceptions - def put(self): + def render_PUT(self, request): """ create a new registered image """ - image_id = self.get_argument('image_id', u'') - image_location = self.get_argument('image_location', u'') + image_id = get_argument(request, 'image_id', u'') + image_location = get_argument(request, 'image_location', u'') image_path = os.path.join(FLAGS.images_path, image_id) if not image_path.startswith(FLAGS.images_path) or \ os.path.exists(image_path): - raise web.HTTPError(403) + raise exception.NotAuthorized bucket_object = bucket.Bucket(image_location.split("/")[0]) manifest = image_location[len(image_location.split('/')[0])+1:] - if not bucket_object.is_authorized(self.context): - raise web.HTTPError(403) + if not bucket_object.is_authorized(request.context): + raise exception.NotAuthorized p = multiprocessing.Process(target=image.Image.register_aws_image, - args=(image_id, image_location, self.context)) + args=(image_id, image_location, request.context)) p.start() - self.finish() + return '' - @catch_nova_exceptions - def post(self): + def render_POST(self, request): """ update image attributes: public/private """ image_id = self.get_argument('image_id', u'') @@ -295,22 +274,30 @@ class ImageHandler(BaseRequestHandler): image_object = image.Image(image_id) - if not image_object.is_authorized(self.context): - raise web.HTTPError(403) + if not image.is_authorized(request.context): + raise exception.NotAuthorized image_object.set_public(operation=='add') - self.finish() + return '' - @catch_nova_exceptions - def delete(self): + def render_DELETE(self, request): """ delete a registered image """ image_id = self.get_argument("image_id", u"") image_object = image.Image(image_id) - if not image_object.is_authorized(self.context): - raise web.HTTPError(403) + if not image.is_authorized(request.context): + raise exception.NotAuthorized image_object.delete() - self.set_status(204) + request.setResponseCode(204) + return '' + +def get_application(): + root = S3() + factory = server.Site(root) + application = service.Application("objectstore") + objectStoreService = internet.TCPServer(FLAGS.s3_port, factory) + objectStoreService.setServiceParent(application) + return application diff --git a/nova/rpc.py b/nova/rpc.py index 58a2b29cf..ef463e84b 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -110,6 +110,7 @@ class TopicConsumer(Consumer): self.queue = topic self.routing_key = topic self.exchange = FLAGS.control_exchange + self.durable = False super(TopicConsumer, self).__init__(connection=connection) @@ -195,7 +196,10 @@ def call(topic, msg): conn = Connection.instance() d = defer.Deferred() consumer = DirectConsumer(connection=conn, msg_id=msg_id) - consumer.register_callback(lambda data, message: d.callback(data)) + def deferred_receive(data, message): + message.ack() + d.callback(data) + consumer.register_callback(deferred_receive) injected = consumer.attach_to_tornado() # clean up after the injected listened and return x @@ -233,7 +237,8 @@ def send_message(topic, message, wait=True): exchange=msg_id, auto_delete=True, exchange_type="direct", - routing_key=msg_id) + routing_key=msg_id, + durable=False) consumer.register_callback(generic_response) publisher = messaging.Publisher(connection=Connection.instance(), diff --git a/run_tests.py b/run_tests.py index eb26459c5..db8a582ea 100644 --- a/run_tests.py +++ b/run_tests.py @@ -39,6 +39,7 @@ Due to our use of multiprocessing it we frequently get some ignorable """ import __main__ +import os import sys @@ -66,6 +67,9 @@ FLAGS = flags.FLAGS flags.DEFINE_bool('flush_db', True, 'Flush the database before running fake tests') +flags.DEFINE_string('tests_stderr', 'run_tests.err.log', + 'Path to where to pipe STDERR during test runs. Default = "run_tests.err.log"') + if __name__ == '__main__': OptionsClass = twistd.WrapTwistedOptions(trial_script.Options) config = OptionsClass() @@ -85,6 +89,11 @@ if __name__ == '__main__': else: from nova.tests.real_flags import * + # Establish redirect for STDERR + sys.stderr.flush() + err = open(FLAGS.tests_stderr, 'w+', 0) + os.dup2(err.fileno(), sys.stderr.fileno()) + if len(argv) == 1 and len(config['tests']) == 0: # If no tests were specified run the ones imported in this file # NOTE(termie): "tests" is not a flag, just some Trial related stuff |
