summaryrefslogtreecommitdiffstats
path: root/nova
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@gmail.com>2010-12-22 22:04:30 +0000
committerTarmac <>2010-12-22 22:04:30 +0000
commit5f3f5acbddd66dfb3e8203724ed0ff9d0be3d5ae (patch)
treeebe0a7e1bc18fed15aa0eef26a16746f274ca1be /nova
parenteb64fe72160ca7c68809eaf0af91768f4eb5d8e8 (diff)
parent0704c0c4073f6c03959c113f90c51dfe4d72fd76 (diff)
Moves the ip allocation requests to the from the api host into calls to the network host made from the compute host.
Diffstat (limited to 'nova')
-rw-r--r--nova/api/ec2/cloud.py41
-rw-r--r--nova/compute/api.py62
-rw-r--r--nova/compute/manager.py65
-rw-r--r--nova/db/sqlalchemy/api.py5
-rw-r--r--nova/fakerabbit.py136
-rw-r--r--nova/network/linux_net.py2
-rw-r--r--nova/network/manager.py82
-rw-r--r--nova/rpc.py2
-rw-r--r--nova/tests/cloud_unittest.py30
-rw-r--r--nova/tests/compute_unittest.py1
-rw-r--r--nova/tests/network_unittest.py8
-rw-r--r--nova/tests/rpc_unittest.py29
-rw-r--r--nova/tests/scheduler_unittest.py1
-rw-r--r--nova/tests/virt_unittest.py7
-rw-r--r--nova/virt/libvirt.uml.xml.template.THIS27
-rw-r--r--nova/virt/libvirt_conn.py5
-rw-r--r--nova/virt/xenapi/vmops.py4
17 files changed, 279 insertions, 228 deletions
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index e1a21f122..e09261f00 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -27,7 +27,6 @@ import datetime
import logging
import re
import os
-import time
from nova import context
import IPy
@@ -699,19 +698,24 @@ class CloudController(object):
context.project_id)
raise quota.QuotaError(_("Address quota exceeded. You cannot "
"allocate any more addresses"))
- network_topic = self._get_network_topic(context)
+ # NOTE(vish): We don't know which network host should get the ip
+ # when we allocate, so just send it to any one. This
+ # will probably need to move into a network supervisor
+ # at some point.
public_ip = rpc.call(context,
- network_topic,
+ FLAGS.network_topic,
{"method": "allocate_floating_ip",
"args": {"project_id": context.project_id}})
return {'addressSet': [{'publicIp': public_ip}]}
def release_address(self, context, public_ip, **kwargs):
- # NOTE(vish): Should we make sure this works?
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
- network_topic = self._get_network_topic(context)
+ # NOTE(vish): We don't know which network host should get the ip
+ # when we deallocate, so just send it to any one. This
+ # will probably need to move into a network supervisor
+ # at some point.
rpc.cast(context,
- network_topic,
+ FLAGS.network_topic,
{"method": "deallocate_floating_ip",
"args": {"floating_address": floating_ip_ref['address']}})
return {'releaseResponse': ["Address released."]}
@@ -722,7 +726,10 @@ class CloudController(object):
fixed_address = db.instance_get_fixed_address(context,
instance_ref['id'])
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
- network_topic = self._get_network_topic(context)
+ # NOTE(vish): Perhaps we should just pass this on to compute and
+ # let compute communicate with network.
+ network_topic = self.compute_api.get_network_topic(context,
+ internal_id)
rpc.cast(context,
network_topic,
{"method": "associate_floating_ip",
@@ -732,24 +739,18 @@ class CloudController(object):
def disassociate_address(self, context, public_ip, **kwargs):
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
- network_topic = self._get_network_topic(context)
+ # NOTE(vish): Get the topic from the host name of the network of
+ # the associated fixed ip.
+ if not floating_ip_ref.get('fixed_ip'):
+ raise exception.ApiError('Address is not associated.')
+ host = floating_ip_ref['fixed_ip']['network']['host']
+ topic = db.queue_get_for(context, FLAGS.network_topic, host)
rpc.cast(context,
- network_topic,
+ topic,
{"method": "disassociate_floating_ip",
"args": {"floating_address": floating_ip_ref['address']}})
return {'disassociateResponse': ["Address disassociated."]}
- def _get_network_topic(self, context):
- """Retrieves the network host for a project"""
- network_ref = self.network_manager.get_network(context)
- host = network_ref['host']
- if not host:
- host = rpc.call(context,
- FLAGS.network_topic,
- {"method": "set_network_host",
- "args": {"network_id": network_ref['id']}})
- return db.queue_get_for(context, FLAGS.network_topic, host)
-
def run_instances(self, context, **kwargs):
max_count = int(kwargs.get('max_count', 1))
instances = self.compute_api.create_instances(context,
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 75434176e..4953fe559 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -53,6 +53,23 @@ class ComputeAPI(base.Base):
self.image_service = image_service
super(ComputeAPI, self).__init__(**kwargs)
+ def get_network_topic(self, context, instance_id):
+ try:
+ instance = self.db.instance_get_by_internal_id(context,
+ instance_id)
+ except exception.NotFound as e:
+ logging.warning("Instance %d was not found in get_network_topic",
+ instance_id)
+ raise e
+
+ host = instance['host']
+ if not host:
+ raise exception.Error("Instance %d has no host" % instance_id)
+ topic = self.db.queue_get_for(context, FLAGS.compute_topic, host)
+ return rpc.call(context,
+ topic,
+ {"method": "get_network_topic", "args": {'fake': 1}})
+
def create_instances(self, context, instance_type, image_id, min_count=1,
max_count=1, kernel_id=None, ramdisk_id=None,
display_name='', description='', key_name=None,
@@ -152,18 +169,6 @@ class ComputeAPI(base.Base):
instance = self.update_instance(context, instance_id, **updates)
instances.append(instance)
- # TODO(vish): This probably should be done in the scheduler
- # or in compute as a call. The network should be
- # allocated after the host is assigned and setup
- # can happen at the same time.
- address = self.network_manager.allocate_fixed_ip(context,
- instance_id,
- is_vpn)
- rpc.cast(elevated,
- self._get_network_topic(context),
- {"method": "setup_fixed_ip",
- "args": {"address": address}})
-
logging.debug(_("Casting to scheduler for %s/%s's instance %s"),
context.project_id, context.user_id, instance_id)
rpc.cast(context,
@@ -226,28 +231,6 @@ class ComputeAPI(base.Base):
state=0,
terminated_at=datetime.datetime.utcnow())
- # FIXME(ja): where should network deallocate occur?
- address = self.db.instance_get_floating_address(context,
- instance['id'])
- if address:
- logging.debug(_("Disassociating address %s") % address)
- # NOTE(vish): Right now we don't really care if the ip is
- # disassociated. We may need to worry about
- # checking this later. Perhaps in the scheduler?
- rpc.cast(context,
- self._get_network_topic(context),
- {"method": "disassociate_floating_ip",
- "args": {"floating_address": address}})
-
- address = self.db.instance_get_fixed_address(context, instance['id'])
- if address:
- logging.debug(_("Deallocating address %s") % address)
- # NOTE(vish): Currently, nothing needs to be done on the
- # network node until release. If this changes,
- # we will need to cast here.
- self.network_manager.deallocate_fixed_ip(context.elevated(),
- address)
-
host = instance['host']
if host:
rpc.cast(context,
@@ -317,14 +300,3 @@ class ComputeAPI(base.Base):
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "unrescue_instance",
"args": {"instance_id": instance['id']}})
-
- def _get_network_topic(self, context):
- """Retrieves the network host for a project"""
- network_ref = self.network_manager.get_network(context)
- host = network_ref['host']
- if not host:
- host = rpc.call(context,
- FLAGS.network_topic,
- {"method": "set_network_host",
- "args": {"network_id": network_ref['id']}})
- return self.db.queue_get_for(context, FLAGS.network_topic, host)
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index cc607f9d4..de114bdeb 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -40,6 +40,7 @@ import logging
from nova import exception
from nova import flags
from nova import manager
+from nova import rpc
from nova import utils
from nova.compute import power_state
@@ -48,6 +49,8 @@ flags.DEFINE_string('instances_path', '$state_path/instances',
'where instances are stored on disk')
flags.DEFINE_string('compute_driver', 'nova.virt.connection.get_connection',
'Driver to use for controlling virtualization')
+flags.DEFINE_string('stub_network', False,
+ 'Stub network related code')
class ComputeManager(manager.Manager):
@@ -82,6 +85,20 @@ class ComputeManager(manager.Manager):
state = power_state.NOSTATE
self.db.instance_set_state(context, instance_id, state)
+ def get_network_topic(self, context, **_kwargs):
+ """Retrieves the network host for a project on this host"""
+ # TODO(vish): This method should be memoized. This will make
+ # the call to get_network_host cheaper, so that
+ # it can pas messages instead of checking the db
+ # locally.
+ if FLAGS.stub_network:
+ host = FLAGS.network_host
+ else:
+ host = self.network_manager.get_network_host(context)
+ return self.db.queue_get_for(context,
+ FLAGS.network_topic,
+ host)
+
@exception.wrap_exception
def refresh_security_group(self, context, security_group_id, **_kwargs):
"""This call passes stright through to the virtualization driver."""
@@ -95,11 +112,30 @@ class ComputeManager(manager.Manager):
if instance_ref['name'] in self.driver.list_instances():
raise exception.Error(_("Instance has already been created"))
logging.debug(_("instance %s: starting..."), instance_id)
- self.network_manager.setup_compute_network(context, instance_id)
self.db.instance_update(context,
instance_id,
{'host': self.host})
+ self.db.instance_set_state(context,
+ instance_id,
+ power_state.NOSTATE,
+ 'networking')
+
+ is_vpn = instance_ref['image_id'] == FLAGS.vpn_image_id
+ # NOTE(vish): This could be a cast because we don't do anything
+ # with the address currently, but I'm leaving it as
+ # a call to ensure that network setup completes. We
+ # will eventually also need to save the address here.
+ if not FLAGS.stub_network:
+ address = rpc.call(context,
+ self.get_network_topic(context),
+ {"method": "allocate_fixed_ip",
+ "args": {"instance_id": instance_id,
+ "vpn": is_vpn}})
+
+ self.network_manager.setup_compute_network(context,
+ instance_id)
+
# TODO(vish) check to make sure the availability zone matches
self.db.instance_set_state(context,
instance_id,
@@ -125,9 +161,34 @@ class ComputeManager(manager.Manager):
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
context = context.elevated()
- logging.debug(_("instance %s: terminating"), instance_id)
instance_ref = self.db.instance_get(context, instance_id)
+
+ if not FLAGS.stub_network:
+ address = self.db.instance_get_floating_address(context,
+ instance_ref['id'])
+ if address:
+ logging.debug(_("Disassociating address %s") % address)
+ # NOTE(vish): Right now we don't really care if the ip is
+ # disassociated. We may need to worry about
+ # checking this later.
+ rpc.cast(context,
+ self.get_network_topic(context),
+ {"method": "disassociate_floating_ip",
+ "args": {"floating_address": address}})
+
+ address = self.db.instance_get_fixed_address(context,
+ instance_ref['id'])
+ if address:
+ logging.debug(_("Deallocating address %s") % address)
+ # NOTE(vish): Currently, nothing needs to be done on the
+ # network node until release. If this changes,
+ # we will need to cast here.
+ self.network_manager.deallocate_fixed_ip(context.elevated(),
+ address)
+
+ logging.debug(_("instance %s: terminating"), instance_id)
+
volumes = instance_ref.get('volumes', []) or []
for volume in volumes:
self.detach_volume(context, instance_id, volume['id'])
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 5ba458241..52d0c389d 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -463,6 +463,7 @@ def floating_ip_get_by_address(context, address, session=None):
session = get_session()
result = session.query(models.FloatingIp).\
+ options(joinedload_all('fixed_ip.network')).\
filter_by(address=address).\
filter_by(deleted=can_read_deleted(context)).\
first()
@@ -659,13 +660,17 @@ def instance_get(context, instance_id, session=None):
if is_admin_context(context):
result = session.query(models.Instance).\
+ options(joinedload_all('fixed_ip.floating_ips')).\
options(joinedload('security_groups')).\
+ options(joinedload('volumes')).\
filter_by(id=instance_id).\
filter_by(deleted=can_read_deleted(context)).\
first()
elif is_user_context(context):
result = session.query(models.Instance).\
+ options(joinedload_all('fixed_ip.floating_ips')).\
options(joinedload('security_groups')).\
+ options(joinedload('volumes')).\
filter_by(project_id=context.project_id).\
filter_by(id=instance_id).\
filter_by(deleted=False).\
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py
index 41e686cff..79d8b894d 100644
--- a/nova/fakerabbit.py
+++ b/nova/fakerabbit.py
@@ -25,6 +25,10 @@ from carrot.backends import base
from eventlet import greenthread
+EXCHANGES = {}
+QUEUES = {}
+
+
class Message(base.BaseMessage):
pass
@@ -68,81 +72,63 @@ class Queue(object):
return self._queue.get()
-class Backend(object):
- """ Singleton backend for testing """
- class __impl(base.BaseBackend):
- def __init__(self, *args, **kwargs):
- #super(__impl, self).__init__(*args, **kwargs)
- self._exchanges = {}
- self._queues = {}
-
- def _reset_all(self):
- self._exchanges = {}
- self._queues = {}
-
- def queue_declare(self, queue, **kwargs):
- if queue not in self._queues:
- logging.debug(_('Declaring queue %s'), queue)
- self._queues[queue] = Queue(queue)
-
- def exchange_declare(self, exchange, type, *args, **kwargs):
- if exchange not in self._exchanges:
- logging.debug(_('Declaring exchange %s'), exchange)
- self._exchanges[exchange] = Exchange(exchange, type)
-
- def queue_bind(self, queue, exchange, routing_key, **kwargs):
- logging.debug(_('Binding %s to %s with key %s'),
- queue, exchange, routing_key)
- self._exchanges[exchange].bind(self._queues[queue].push,
- routing_key)
-
- def declare_consumer(self, queue, callback, *args, **kwargs):
- self.current_queue = queue
- self.current_callback = callback
-
- def consume(self, *args, **kwargs):
- while True:
- item = self.get(self.current_queue)
- if item:
- self.current_callback(item)
- raise StopIteration()
- greenthread.sleep(0)
-
- def get(self, queue, no_ack=False):
- if not queue in self._queues or not self._queues[queue].size():
- return None
- (message_data, content_type, content_encoding) = \
- self._queues[queue].pop()
- message = Message(backend=self, body=message_data,
- content_type=content_type,
- content_encoding=content_encoding)
- message.result = True
- logging.debug(_('Getting from %s: %s'), queue, message)
- return message
-
- def prepare_message(self, message_data, delivery_mode,
- content_type, content_encoding, **kwargs):
- """Prepare message for sending."""
- return (message_data, content_type, content_encoding)
-
- def publish(self, message, exchange, routing_key, **kwargs):
- if exchange in self._exchanges:
- self._exchanges[exchange].publish(
- message, routing_key=routing_key)
-
- __instance = None
-
- def __init__(self, *args, **kwargs):
- if Backend.__instance is None:
- Backend.__instance = Backend.__impl(*args, **kwargs)
- self.__dict__['_Backend__instance'] = Backend.__instance
-
- def __getattr__(self, attr):
- return getattr(self.__instance, attr)
-
- def __setattr__(self, attr, value):
- return setattr(self.__instance, attr, value)
+class Backend(base.BaseBackend):
+ def queue_declare(self, queue, **kwargs):
+ global QUEUES
+ if queue not in QUEUES:
+ logging.debug(_('Declaring queue %s'), queue)
+ QUEUES[queue] = Queue(queue)
+
+ def exchange_declare(self, exchange, type, *args, **kwargs):
+ global EXCHANGES
+ if exchange not in EXCHANGES:
+ logging.debug(_('Declaring exchange %s'), exchange)
+ EXCHANGES[exchange] = Exchange(exchange, type)
+
+ def queue_bind(self, queue, exchange, routing_key, **kwargs):
+ global EXCHANGES
+ global QUEUES
+ logging.debug(_('Binding %s to %s with key %s'),
+ queue, exchange, routing_key)
+ EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
+
+ def declare_consumer(self, queue, callback, *args, **kwargs):
+ self.current_queue = queue
+ self.current_callback = callback
+
+ def consume(self, limit=None):
+ while True:
+ item = self.get(self.current_queue)
+ if item:
+ self.current_callback(item)
+ raise StopIteration()
+ greenthread.sleep(0)
+
+ def get(self, queue, no_ack=False):
+ global QUEUES
+ if not queue in QUEUES or not QUEUES[queue].size():
+ return None
+ (message_data, content_type, content_encoding) = QUEUES[queue].pop()
+ message = Message(backend=self, body=message_data,
+ content_type=content_type,
+ content_encoding=content_encoding)
+ message.result = True
+ logging.debug(_('Getting from %s: %s'), queue, message)
+ return message
+
+ def prepare_message(self, message_data, delivery_mode,
+ content_type, content_encoding, **kwargs):
+ """Prepare message for sending."""
+ return (message_data, content_type, content_encoding)
+
+ def publish(self, message, exchange, routing_key, **kwargs):
+ global EXCHANGES
+ if exchange in EXCHANGES:
+ EXCHANGES[exchange].publish(message, routing_key=routing_key)
def reset_all():
- Backend()._reset_all()
+ global EXCHANGES
+ global QUEUES
+ EXCHANGES = {}
+ QUEUES = {}
diff --git a/nova/network/linux_net.py b/nova/network/linux_net.py
index 771f1c932..931a89554 100644
--- a/nova/network/linux_net.py
+++ b/nova/network/linux_net.py
@@ -51,6 +51,8 @@ flags.DEFINE_bool('use_nova_chains', False,
'use the nova_ routing chains instead of default')
flags.DEFINE_string('dns_server', None,
'if set, uses specific dns server for dnsmasq')
+flags.DEFINE_string('dmz_cidr', '10.128.0.0/24',
+ 'dmz range that should be accepted')
def metadata_forward():
diff --git a/nova/network/manager.py b/nova/network/manager.py
index 9e1cf3903..16aa8f895 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -47,6 +47,7 @@ topologies. All of the network commands are issued to a subclass of
import datetime
import logging
import math
+import socket
import IPy
@@ -56,6 +57,7 @@ from nova import exception
from nova import flags
from nova import manager
from nova import utils
+from nova import rpc
FLAGS = flags.FLAGS
@@ -87,6 +89,10 @@ flags.DEFINE_bool('update_dhcp_on_disassociate', False,
'Whether to update dhcp when fixed_ip is disassociated')
flags.DEFINE_integer('fixed_ip_disassociate_timeout', 600,
'Seconds after which a deallocated ip is disassociated')
+flags.DEFINE_string('network_host', socket.gethostname(),
+ 'Network host to use for ip allocation in flat modes')
+flags.DEFINE_bool('fake_call', False,
+ 'If True, skip using the queue and make local calls')
class AddressAlreadyAllocated(exception.Error):
@@ -223,8 +229,8 @@ class NetworkManager(manager.Manager):
network_ref = self.db.fixed_ip_get_network(context, address)
self.driver.update_dhcp(context, network_ref['id'])
- def get_network(self, context):
- """Get the network for the current context."""
+ def get_network_host(self, context):
+ """Get the network host for the current context."""
raise NotImplementedError()
def create_networks(self, context, num_networks, network_size,
@@ -312,10 +318,6 @@ class FlatManager(NetworkManager):
"""Network is created manually."""
pass
- def setup_fixed_ip(self, context, address):
- """Currently no setup."""
- pass
-
def create_networks(self, context, cidr, num_networks, network_size,
*args, **kwargs):
"""Create networks based on parameters."""
@@ -336,14 +338,25 @@ class FlatManager(NetworkManager):
if network_ref:
self._create_fixed_ips(context, network_ref['id'])
- def get_network(self, context):
- """Get the network for the current context."""
- # NOTE(vish): To support mutilple network hosts, This could randomly
- # select from multiple networks instead of just
- # returning the one. It could also potentially be done
- # in the scheduler.
- return self.db.network_get_by_bridge(context,
- FLAGS.flat_network_bridge)
+ def get_network_host(self, context):
+ """Get the network host for the current context."""
+ network_ref = self.db.network_get_by_bridge(context,
+ FLAGS.flat_network_bridge)
+ # NOTE(vish): If the network has no host, use the network_host flag.
+ # This could eventually be a a db lookup of some sort, but
+ # a flag is easy to handle for now.
+ host = network_ref['host']
+ if not host:
+ topic = self.db.queue_get_for(context,
+ FLAGS.network_topic,
+ FLAGS.network_host)
+ if FLAGS.fake_call:
+ return self.set_network_host(context, network_ref['id'])
+ host = rpc.call(context,
+ FLAGS.network_topic,
+ {"method": "set_network_host",
+ "args": {"network_id": network_ref['id']}})
+ return host
def _on_set_network_host(self, context, network_id):
"""Called when this host becomes the host for a network."""
@@ -374,10 +387,16 @@ class FlatDHCPManager(FlatManager):
self.driver.ensure_bridge(network_ref['bridge'],
FLAGS.flat_interface)
- def setup_fixed_ip(self, context, address):
+ def allocate_fixed_ip(self, context, instance_id, *args, **kwargs):
"""Setup dhcp for this network."""
+ address = super(FlatDHCPManager, self).allocate_fixed_ip(context,
+ instance_id,
+ *args,
+ **kwargs)
network_ref = db.fixed_ip_get_network(context, address)
- self.driver.update_dhcp(context, network_ref['id'])
+ if not FLAGS.fake_network:
+ self.driver.update_dhcp(context, network_ref['id'])
+ return address
def deallocate_fixed_ip(self, context, address, *args, **kwargs):
"""Returns a fixed ip to the pool."""
@@ -446,28 +465,20 @@ class VlanManager(NetworkManager):
network_ref['id'],
instance_id)
self.db.fixed_ip_update(context, address, {'allocated': True})
+ if not FLAGS.fake_network:
+ self.driver.update_dhcp(context, network_ref['id'])
return address
def deallocate_fixed_ip(self, context, address, *args, **kwargs):
"""Returns a fixed ip to the pool."""
self.db.fixed_ip_update(context, address, {'allocated': False})
- def setup_fixed_ip(self, context, address):
- """Sets forwarding rules and dhcp for fixed ip."""
- network_ref = self.db.fixed_ip_get_network(context, address)
- self.driver.update_dhcp(context, network_ref['id'])
-
def setup_compute_network(self, context, instance_id):
"""Sets up matching network for compute hosts."""
network_ref = db.network_get_by_instance(context, instance_id)
self.driver.ensure_vlan_bridge(network_ref['vlan'],
network_ref['bridge'])
- def restart_nets(self):
- """Ensure the network for each user is enabled."""
- # TODO(vish): Implement this
- pass
-
def create_networks(self, context, cidr, num_networks, network_size,
vlan_start, vpn_start):
"""Create networks based on parameters."""
@@ -494,10 +505,23 @@ class VlanManager(NetworkManager):
if network_ref:
self._create_fixed_ips(context, network_ref['id'])
- def get_network(self, context):
+ def get_network_host(self, context):
"""Get the network for the current context."""
- return self.db.project_get_network(context.elevated(),
- context.project_id)
+ network_ref = self.db.project_get_network(context.elevated(),
+ context.project_id)
+ # NOTE(vish): If the network has no host, do a call to get an
+ # available host. This should be changed to go through
+ # the scheduler at some point.
+ host = network_ref['host']
+ if not host:
+ if FLAGS.fake_call:
+ return self.set_network_host(context, network_ref['id'])
+ host = rpc.call(context,
+ FLAGS.network_topic,
+ {"method": "set_network_host",
+ "args": {"network_id": network_ref['id']}})
+
+ return host
def _on_set_network_host(self, context, network_id):
"""Called when this host becomes the host for a network."""
diff --git a/nova/rpc.py b/nova/rpc.py
index 6e2cf051a..844088348 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -245,7 +245,7 @@ def msg_reply(msg_id, reply=None, failure=None):
logging.error(_("Returning exception %s to caller"), message)
logging.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
- conn = Connection.instance()
+ conn = Connection.instance(True)
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py
index 53a762310..70d2c44da 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/cloud_unittest.py
@@ -22,20 +22,18 @@ import logging
from M2Crypto import BIO
from M2Crypto import RSA
import os
-import StringIO
import tempfile
import time
from eventlet import greenthread
-from xml.etree import ElementTree
from nova import context
from nova import crypto
from nova import db
from nova import flags
from nova import rpc
+from nova import service
from nova import test
-from nova import utils
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
@@ -54,7 +52,8 @@ os.makedirs(IMAGES_PATH)
class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
- self.flags(connection_type='fake', images_path=IMAGES_PATH)
+ self.flags(connection_type='fake',
+ images_path=IMAGES_PATH)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.DEBUG)
@@ -62,27 +61,23 @@ class CloudTestCase(test.TestCase):
# set up our cloud
self.cloud = cloud.CloudController()
- # set up a service
- self.compute = utils.import_object(FLAGS.compute_manager)
- self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.compute_topic,
- proxy=self.compute)
- self.compute_consumer.attach_to_eventlet()
- self.network = utils.import_object(FLAGS.network_manager)
- self.network_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.network_topic,
- proxy=self.network)
- self.network_consumer.attach_to_eventlet()
+ # set up services
+ self.compute = service.Service.create(binary='nova-compute')
+ self.compute.start()
+ self.network = service.Service.create(binary='nova-network')
+ self.network.start()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
self.project = self.manager.create_project('proj', 'admin', 'proj')
self.context = context.RequestContext(user=self.user,
- project=self.project)
+ project=self.project)
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
+ self.compute.kill()
+ self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@@ -109,12 +104,13 @@ class CloudTestCase(test.TestCase):
{'address': address,
'host': FLAGS.host})
self.cloud.allocate_address(self.context)
- inst = db.instance_create(self.context, {})
+ inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
+ greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py
index 187ca31de..348bb3351 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/compute_unittest.py
@@ -41,6 +41,7 @@ class ComputeTestCase(test.TestCase):
logging.getLogger().setLevel(logging.DEBUG)
super(ComputeTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py
index bcac20585..96473ac7c 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/network_unittest.py
@@ -26,6 +26,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
+from nova import service
from nova import test
from nova import utils
from nova.auth import manager
@@ -40,6 +41,7 @@ class NetworkTestCase(test.TestCase):
# NOTE(vish): if you change these flags, make sure to change the
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
+ fake_call=True,
fake_network=True,
network_size=16,
num_networks=5)
@@ -56,16 +58,13 @@ class NetworkTestCase(test.TestCase):
# create the necessary network data for the project
user_context = context.RequestContext(project=self.projects[i],
user=self.user)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
instance_ref = self._create_instance(0)
self.instance_id = instance_ref['id']
instance_ref = self._create_instance(1)
self.instance2_id = instance_ref['id']
def tearDown(self):
- super(NetworkTestCase, self).tearDown()
# TODO(termie): this should really be instantiating clean datastores
# in between runs, one failure kills all the tests
db.instance_destroy(context.get_admin_context(), self.instance_id)
@@ -73,6 +72,7 @@ class NetworkTestCase(test.TestCase):
for project in self.projects:
self.manager.delete_project(project)
self.manager.delete_user(self.user)
+ super(NetworkTestCase, self).tearDown()
def _create_instance(self, project_num, mac=None):
if not mac:
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py
index a2495e65a..6ea2edcab 100644
--- a/nova/tests/rpc_unittest.py
+++ b/nova/tests/rpc_unittest.py
@@ -33,7 +33,7 @@ class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
@@ -79,6 +79,33 @@ class RpcTestCase(test.TestCase):
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
+ def test_nested_calls(self):
+ """Test that we can do an rpc.call inside another call"""
+ class Nested(object):
+ @staticmethod
+ def echo(context, queue, value):
+ """Calls echo in the passed queue"""
+ logging.debug("Nested received %s, %s", queue, value)
+ ret = rpc.call(context,
+ queue,
+ {"method": "echo",
+ "args": {"value": value}})
+ logging.debug("Nested return %s", ret)
+ return value
+
+ nested = Nested()
+ conn = rpc.Connection.instance(True)
+ consumer = rpc.AdapterConsumer(connection=conn,
+ topic='nested',
+ proxy=nested)
+ consumer.attach_to_eventlet()
+ value = 42
+ result = rpc.call(self.context,
+ 'nested', {"method": "echo",
+ "args": {"queue": "test",
+ "value": value}})
+ self.assertEqual(value, result)
+
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py
index d1756b8fb..df5e7afa5 100644
--- a/nova/tests/scheduler_unittest.py
+++ b/nova/tests/scheduler_unittest.py
@@ -78,6 +78,7 @@ class SimpleDriverTestCase(test.TestCase):
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
max_cores=4,
max_gigabytes=4,
network_manager='nova.network.manager.FlatManager',
diff --git a/nova/tests/virt_unittest.py b/nova/tests/virt_unittest.py
index 7682f9662..9ad009510 100644
--- a/nova/tests/virt_unittest.py
+++ b/nova/tests/virt_unittest.py
@@ -33,6 +33,7 @@ flags.DECLARE('instances_path', 'nova.compute.manager')
class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
+ self.flags(fake_call=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
@@ -88,9 +89,9 @@ class LibvirtConnTestCase(test.TestCase):
user_context = context.RequestContext(project=self.project,
user=self.user)
instance_ref = db.instance_create(user_context, instance)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
+ network_ref = db.project_get_network(context.get_admin_context(),
+ self.project.id)
fixed_ip = {'address': self.test_ip,
'network_id': network_ref['id']}
diff --git a/nova/virt/libvirt.uml.xml.template.THIS b/nova/virt/libvirt.uml.xml.template.THIS
deleted file mode 100644
index 506f2ef72..000000000
--- a/nova/virt/libvirt.uml.xml.template.THIS
+++ /dev/null
@@ -1,27 +0,0 @@
-<domain type='%(type)s'>
- <name>%(name)s</name>
- <memory>%(memory_kb)s</memory>
- <os>
- <type>%(type)s</type>
- <kernel>/usr/bin/linux</kernel>
- <root>/dev/ubda1</root>
- </os>
- <devices>
- <disk type='file'>
- <source file='%(basepath)s/disk'/>
- <target dev='ubd0' bus='uml'/>
- </disk>
- <interface type='bridge'>
- <source bridge='%(bridge_name)s'/>
- <mac address='%(mac_address)s'/>
- <filterref filter="nova-instance-%(name)s">
- <parameter name="IP" value="%(ip_address)s" />
- <parameter name="DHCPSERVER" value="%(dhcp_server)s" />
- %(extra_params)s
- </filterref>
- </interface>
- <console type="file">
- <source path='%(basepath)s/console.log'/>
- </console>
- </devices>
-</domain>
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index 16bcfe3c0..8d3a6a261 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -40,6 +40,7 @@ import logging
import os
import shutil
+from eventlet import greenthread
from eventlet import event
from eventlet import tpool
@@ -498,8 +499,8 @@ class LibvirtConnection(object):
# TODO(termie): cache?
logging.debug(_('instance %s: starting toXML method'),
instance['name'])
- network = db.project_get_network(context.get_admin_context(),
- instance['project_id'])
+ network = db.network_get_by_instance(context.get_admin_context(),
+ instance['id'])
# FIXME(vish): stick this in db
instance_type = instance['instance_type']
instance_type = instance_types.INSTANCE_TYPES[instance_type]
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 3b00ce8bf..4d897af35 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -59,8 +59,8 @@ class VMOps(object):
raise Exception('Attempted to create non-unique name %s' %
instance.name)
- bridge = db.project_get_network(context.get_admin_context(),
- instance.project_id).bridge
+ bridge = db.network_get_by_instance(context.get_admin_context(),
+ instance['id'])['bridge']
network_ref = \
NetworkHelper.find_network_with_bridge(self._session, bridge)