summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@gmail.com>2010-12-22 22:04:30 +0000
committerTarmac <>2010-12-22 22:04:30 +0000
commit5f3f5acbddd66dfb3e8203724ed0ff9d0be3d5ae (patch)
treeebe0a7e1bc18fed15aa0eef26a16746f274ca1be
parenteb64fe72160ca7c68809eaf0af91768f4eb5d8e8 (diff)
parent0704c0c4073f6c03959c113f90c51dfe4d72fd76 (diff)
downloadnova-5f3f5acbddd66dfb3e8203724ed0ff9d0be3d5ae.tar.gz
nova-5f3f5acbddd66dfb3e8203724ed0ff9d0be3d5ae.tar.xz
nova-5f3f5acbddd66dfb3e8203724ed0ff9d0be3d5ae.zip
Moves the ip allocation requests to the from the api host into calls to the network host made from the compute host.
-rw-r--r--nova/api/ec2/cloud.py41
-rw-r--r--nova/compute/api.py62
-rw-r--r--nova/compute/manager.py65
-rw-r--r--nova/db/sqlalchemy/api.py5
-rw-r--r--nova/fakerabbit.py136
-rw-r--r--nova/network/linux_net.py2
-rw-r--r--nova/network/manager.py82
-rw-r--r--nova/rpc.py2
-rw-r--r--nova/tests/cloud_unittest.py30
-rw-r--r--nova/tests/compute_unittest.py1
-rw-r--r--nova/tests/network_unittest.py8
-rw-r--r--nova/tests/rpc_unittest.py29
-rw-r--r--nova/tests/scheduler_unittest.py1
-rw-r--r--nova/tests/virt_unittest.py7
-rw-r--r--nova/virt/libvirt.uml.xml.template.THIS27
-rw-r--r--nova/virt/libvirt_conn.py5
-rw-r--r--nova/virt/xenapi/vmops.py4
17 files changed, 279 insertions, 228 deletions
diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py
index e1a21f122..e09261f00 100644
--- a/nova/api/ec2/cloud.py
+++ b/nova/api/ec2/cloud.py
@@ -27,7 +27,6 @@ import datetime
import logging
import re
import os
-import time
from nova import context
import IPy
@@ -699,19 +698,24 @@ class CloudController(object):
context.project_id)
raise quota.QuotaError(_("Address quota exceeded. You cannot "
"allocate any more addresses"))
- network_topic = self._get_network_topic(context)
+ # NOTE(vish): We don't know which network host should get the ip
+ # when we allocate, so just send it to any one. This
+ # will probably need to move into a network supervisor
+ # at some point.
public_ip = rpc.call(context,
- network_topic,
+ FLAGS.network_topic,
{"method": "allocate_floating_ip",
"args": {"project_id": context.project_id}})
return {'addressSet': [{'publicIp': public_ip}]}
def release_address(self, context, public_ip, **kwargs):
- # NOTE(vish): Should we make sure this works?
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
- network_topic = self._get_network_topic(context)
+ # NOTE(vish): We don't know which network host should get the ip
+ # when we deallocate, so just send it to any one. This
+ # will probably need to move into a network supervisor
+ # at some point.
rpc.cast(context,
- network_topic,
+ FLAGS.network_topic,
{"method": "deallocate_floating_ip",
"args": {"floating_address": floating_ip_ref['address']}})
return {'releaseResponse': ["Address released."]}
@@ -722,7 +726,10 @@ class CloudController(object):
fixed_address = db.instance_get_fixed_address(context,
instance_ref['id'])
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
- network_topic = self._get_network_topic(context)
+ # NOTE(vish): Perhaps we should just pass this on to compute and
+ # let compute communicate with network.
+ network_topic = self.compute_api.get_network_topic(context,
+ internal_id)
rpc.cast(context,
network_topic,
{"method": "associate_floating_ip",
@@ -732,24 +739,18 @@ class CloudController(object):
def disassociate_address(self, context, public_ip, **kwargs):
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
- network_topic = self._get_network_topic(context)
+ # NOTE(vish): Get the topic from the host name of the network of
+ # the associated fixed ip.
+ if not floating_ip_ref.get('fixed_ip'):
+ raise exception.ApiError('Address is not associated.')
+ host = floating_ip_ref['fixed_ip']['network']['host']
+ topic = db.queue_get_for(context, FLAGS.network_topic, host)
rpc.cast(context,
- network_topic,
+ topic,
{"method": "disassociate_floating_ip",
"args": {"floating_address": floating_ip_ref['address']}})
return {'disassociateResponse': ["Address disassociated."]}
- def _get_network_topic(self, context):
- """Retrieves the network host for a project"""
- network_ref = self.network_manager.get_network(context)
- host = network_ref['host']
- if not host:
- host = rpc.call(context,
- FLAGS.network_topic,
- {"method": "set_network_host",
- "args": {"network_id": network_ref['id']}})
- return db.queue_get_for(context, FLAGS.network_topic, host)
-
def run_instances(self, context, **kwargs):
max_count = int(kwargs.get('max_count', 1))
instances = self.compute_api.create_instances(context,
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 75434176e..4953fe559 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -53,6 +53,23 @@ class ComputeAPI(base.Base):
self.image_service = image_service
super(ComputeAPI, self).__init__(**kwargs)
+ def get_network_topic(self, context, instance_id):
+ try:
+ instance = self.db.instance_get_by_internal_id(context,
+ instance_id)
+ except exception.NotFound as e:
+ logging.warning("Instance %d was not found in get_network_topic",
+ instance_id)
+ raise e
+
+ host = instance['host']
+ if not host:
+ raise exception.Error("Instance %d has no host" % instance_id)
+ topic = self.db.queue_get_for(context, FLAGS.compute_topic, host)
+ return rpc.call(context,
+ topic,
+ {"method": "get_network_topic", "args": {'fake': 1}})
+
def create_instances(self, context, instance_type, image_id, min_count=1,
max_count=1, kernel_id=None, ramdisk_id=None,
display_name='', description='', key_name=None,
@@ -152,18 +169,6 @@ class ComputeAPI(base.Base):
instance = self.update_instance(context, instance_id, **updates)
instances.append(instance)
- # TODO(vish): This probably should be done in the scheduler
- # or in compute as a call. The network should be
- # allocated after the host is assigned and setup
- # can happen at the same time.
- address = self.network_manager.allocate_fixed_ip(context,
- instance_id,
- is_vpn)
- rpc.cast(elevated,
- self._get_network_topic(context),
- {"method": "setup_fixed_ip",
- "args": {"address": address}})
-
logging.debug(_("Casting to scheduler for %s/%s's instance %s"),
context.project_id, context.user_id, instance_id)
rpc.cast(context,
@@ -226,28 +231,6 @@ class ComputeAPI(base.Base):
state=0,
terminated_at=datetime.datetime.utcnow())
- # FIXME(ja): where should network deallocate occur?
- address = self.db.instance_get_floating_address(context,
- instance['id'])
- if address:
- logging.debug(_("Disassociating address %s") % address)
- # NOTE(vish): Right now we don't really care if the ip is
- # disassociated. We may need to worry about
- # checking this later. Perhaps in the scheduler?
- rpc.cast(context,
- self._get_network_topic(context),
- {"method": "disassociate_floating_ip",
- "args": {"floating_address": address}})
-
- address = self.db.instance_get_fixed_address(context, instance['id'])
- if address:
- logging.debug(_("Deallocating address %s") % address)
- # NOTE(vish): Currently, nothing needs to be done on the
- # network node until release. If this changes,
- # we will need to cast here.
- self.network_manager.deallocate_fixed_ip(context.elevated(),
- address)
-
host = instance['host']
if host:
rpc.cast(context,
@@ -317,14 +300,3 @@ class ComputeAPI(base.Base):
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "unrescue_instance",
"args": {"instance_id": instance['id']}})
-
- def _get_network_topic(self, context):
- """Retrieves the network host for a project"""
- network_ref = self.network_manager.get_network(context)
- host = network_ref['host']
- if not host:
- host = rpc.call(context,
- FLAGS.network_topic,
- {"method": "set_network_host",
- "args": {"network_id": network_ref['id']}})
- return self.db.queue_get_for(context, FLAGS.network_topic, host)
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index cc607f9d4..de114bdeb 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -40,6 +40,7 @@ import logging
from nova import exception
from nova import flags
from nova import manager
+from nova import rpc
from nova import utils
from nova.compute import power_state
@@ -48,6 +49,8 @@ flags.DEFINE_string('instances_path', '$state_path/instances',
'where instances are stored on disk')
flags.DEFINE_string('compute_driver', 'nova.virt.connection.get_connection',
'Driver to use for controlling virtualization')
+flags.DEFINE_string('stub_network', False,
+ 'Stub network related code')
class ComputeManager(manager.Manager):
@@ -82,6 +85,20 @@ class ComputeManager(manager.Manager):
state = power_state.NOSTATE
self.db.instance_set_state(context, instance_id, state)
+ def get_network_topic(self, context, **_kwargs):
+ """Retrieves the network host for a project on this host"""
+ # TODO(vish): This method should be memoized. This will make
+ # the call to get_network_host cheaper, so that
+ # it can pas messages instead of checking the db
+ # locally.
+ if FLAGS.stub_network:
+ host = FLAGS.network_host
+ else:
+ host = self.network_manager.get_network_host(context)
+ return self.db.queue_get_for(context,
+ FLAGS.network_topic,
+ host)
+
@exception.wrap_exception
def refresh_security_group(self, context, security_group_id, **_kwargs):
"""This call passes stright through to the virtualization driver."""
@@ -95,11 +112,30 @@ class ComputeManager(manager.Manager):
if instance_ref['name'] in self.driver.list_instances():
raise exception.Error(_("Instance has already been created"))
logging.debug(_("instance %s: starting..."), instance_id)
- self.network_manager.setup_compute_network(context, instance_id)
self.db.instance_update(context,
instance_id,
{'host': self.host})
+ self.db.instance_set_state(context,
+ instance_id,
+ power_state.NOSTATE,
+ 'networking')
+
+ is_vpn = instance_ref['image_id'] == FLAGS.vpn_image_id
+ # NOTE(vish): This could be a cast because we don't do anything
+ # with the address currently, but I'm leaving it as
+ # a call to ensure that network setup completes. We
+ # will eventually also need to save the address here.
+ if not FLAGS.stub_network:
+ address = rpc.call(context,
+ self.get_network_topic(context),
+ {"method": "allocate_fixed_ip",
+ "args": {"instance_id": instance_id,
+ "vpn": is_vpn}})
+
+ self.network_manager.setup_compute_network(context,
+ instance_id)
+
# TODO(vish) check to make sure the availability zone matches
self.db.instance_set_state(context,
instance_id,
@@ -125,9 +161,34 @@ class ComputeManager(manager.Manager):
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
context = context.elevated()
- logging.debug(_("instance %s: terminating"), instance_id)
instance_ref = self.db.instance_get(context, instance_id)
+
+ if not FLAGS.stub_network:
+ address = self.db.instance_get_floating_address(context,
+ instance_ref['id'])
+ if address:
+ logging.debug(_("Disassociating address %s") % address)
+ # NOTE(vish): Right now we don't really care if the ip is
+ # disassociated. We may need to worry about
+ # checking this later.
+ rpc.cast(context,
+ self.get_network_topic(context),
+ {"method": "disassociate_floating_ip",
+ "args": {"floating_address": address}})
+
+ address = self.db.instance_get_fixed_address(context,
+ instance_ref['id'])
+ if address:
+ logging.debug(_("Deallocating address %s") % address)
+ # NOTE(vish): Currently, nothing needs to be done on the
+ # network node until release. If this changes,
+ # we will need to cast here.
+ self.network_manager.deallocate_fixed_ip(context.elevated(),
+ address)
+
+ logging.debug(_("instance %s: terminating"), instance_id)
+
volumes = instance_ref.get('volumes', []) or []
for volume in volumes:
self.detach_volume(context, instance_id, volume['id'])
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 5ba458241..52d0c389d 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -463,6 +463,7 @@ def floating_ip_get_by_address(context, address, session=None):
session = get_session()
result = session.query(models.FloatingIp).\
+ options(joinedload_all('fixed_ip.network')).\
filter_by(address=address).\
filter_by(deleted=can_read_deleted(context)).\
first()
@@ -659,13 +660,17 @@ def instance_get(context, instance_id, session=None):
if is_admin_context(context):
result = session.query(models.Instance).\
+ options(joinedload_all('fixed_ip.floating_ips')).\
options(joinedload('security_groups')).\
+ options(joinedload('volumes')).\
filter_by(id=instance_id).\
filter_by(deleted=can_read_deleted(context)).\
first()
elif is_user_context(context):
result = session.query(models.Instance).\
+ options(joinedload_all('fixed_ip.floating_ips')).\
options(joinedload('security_groups')).\
+ options(joinedload('volumes')).\
filter_by(project_id=context.project_id).\
filter_by(id=instance_id).\
filter_by(deleted=False).\
diff --git a/nova/fakerabbit.py b/nova/fakerabbit.py
index 41e686cff..79d8b894d 100644
--- a/nova/fakerabbit.py
+++ b/nova/fakerabbit.py
@@ -25,6 +25,10 @@ from carrot.backends import base
from eventlet import greenthread
+EXCHANGES = {}
+QUEUES = {}
+
+
class Message(base.BaseMessage):
pass
@@ -68,81 +72,63 @@ class Queue(object):
return self._queue.get()
-class Backend(object):
- """ Singleton backend for testing """
- class __impl(base.BaseBackend):
- def __init__(self, *args, **kwargs):
- #super(__impl, self).__init__(*args, **kwargs)
- self._exchanges = {}
- self._queues = {}
-
- def _reset_all(self):
- self._exchanges = {}
- self._queues = {}
-
- def queue_declare(self, queue, **kwargs):
- if queue not in self._queues:
- logging.debug(_('Declaring queue %s'), queue)
- self._queues[queue] = Queue(queue)
-
- def exchange_declare(self, exchange, type, *args, **kwargs):
- if exchange not in self._exchanges:
- logging.debug(_('Declaring exchange %s'), exchange)
- self._exchanges[exchange] = Exchange(exchange, type)
-
- def queue_bind(self, queue, exchange, routing_key, **kwargs):
- logging.debug(_('Binding %s to %s with key %s'),
- queue, exchange, routing_key)
- self._exchanges[exchange].bind(self._queues[queue].push,
- routing_key)
-
- def declare_consumer(self, queue, callback, *args, **kwargs):
- self.current_queue = queue
- self.current_callback = callback
-
- def consume(self, *args, **kwargs):
- while True:
- item = self.get(self.current_queue)
- if item:
- self.current_callback(item)
- raise StopIteration()
- greenthread.sleep(0)
-
- def get(self, queue, no_ack=False):
- if not queue in self._queues or not self._queues[queue].size():
- return None
- (message_data, content_type, content_encoding) = \
- self._queues[queue].pop()
- message = Message(backend=self, body=message_data,
- content_type=content_type,
- content_encoding=content_encoding)
- message.result = True
- logging.debug(_('Getting from %s: %s'), queue, message)
- return message
-
- def prepare_message(self, message_data, delivery_mode,
- content_type, content_encoding, **kwargs):
- """Prepare message for sending."""
- return (message_data, content_type, content_encoding)
-
- def publish(self, message, exchange, routing_key, **kwargs):
- if exchange in self._exchanges:
- self._exchanges[exchange].publish(
- message, routing_key=routing_key)
-
- __instance = None
-
- def __init__(self, *args, **kwargs):
- if Backend.__instance is None:
- Backend.__instance = Backend.__impl(*args, **kwargs)
- self.__dict__['_Backend__instance'] = Backend.__instance
-
- def __getattr__(self, attr):
- return getattr(self.__instance, attr)
-
- def __setattr__(self, attr, value):
- return setattr(self.__instance, attr, value)
+class Backend(base.BaseBackend):
+ def queue_declare(self, queue, **kwargs):
+ global QUEUES
+ if queue not in QUEUES:
+ logging.debug(_('Declaring queue %s'), queue)
+ QUEUES[queue] = Queue(queue)
+
+ def exchange_declare(self, exchange, type, *args, **kwargs):
+ global EXCHANGES
+ if exchange not in EXCHANGES:
+ logging.debug(_('Declaring exchange %s'), exchange)
+ EXCHANGES[exchange] = Exchange(exchange, type)
+
+ def queue_bind(self, queue, exchange, routing_key, **kwargs):
+ global EXCHANGES
+ global QUEUES
+ logging.debug(_('Binding %s to %s with key %s'),
+ queue, exchange, routing_key)
+ EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
+
+ def declare_consumer(self, queue, callback, *args, **kwargs):
+ self.current_queue = queue
+ self.current_callback = callback
+
+ def consume(self, limit=None):
+ while True:
+ item = self.get(self.current_queue)
+ if item:
+ self.current_callback(item)
+ raise StopIteration()
+ greenthread.sleep(0)
+
+ def get(self, queue, no_ack=False):
+ global QUEUES
+ if not queue in QUEUES or not QUEUES[queue].size():
+ return None
+ (message_data, content_type, content_encoding) = QUEUES[queue].pop()
+ message = Message(backend=self, body=message_data,
+ content_type=content_type,
+ content_encoding=content_encoding)
+ message.result = True
+ logging.debug(_('Getting from %s: %s'), queue, message)
+ return message
+
+ def prepare_message(self, message_data, delivery_mode,
+ content_type, content_encoding, **kwargs):
+ """Prepare message for sending."""
+ return (message_data, content_type, content_encoding)
+
+ def publish(self, message, exchange, routing_key, **kwargs):
+ global EXCHANGES
+ if exchange in EXCHANGES:
+ EXCHANGES[exchange].publish(message, routing_key=routing_key)
def reset_all():
- Backend()._reset_all()
+ global EXCHANGES
+ global QUEUES
+ EXCHANGES = {}
+ QUEUES = {}
diff --git a/nova/network/linux_net.py b/nova/network/linux_net.py
index 771f1c932..931a89554 100644
--- a/nova/network/linux_net.py
+++ b/nova/network/linux_net.py
@@ -51,6 +51,8 @@ flags.DEFINE_bool('use_nova_chains', False,
'use the nova_ routing chains instead of default')
flags.DEFINE_string('dns_server', None,
'if set, uses specific dns server for dnsmasq')
+flags.DEFINE_string('dmz_cidr', '10.128.0.0/24',
+ 'dmz range that should be accepted')
def metadata_forward():
diff --git a/nova/network/manager.py b/nova/network/manager.py
index 9e1cf3903..16aa8f895 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -47,6 +47,7 @@ topologies. All of the network commands are issued to a subclass of
import datetime
import logging
import math
+import socket
import IPy
@@ -56,6 +57,7 @@ from nova import exception
from nova import flags
from nova import manager
from nova import utils
+from nova import rpc
FLAGS = flags.FLAGS
@@ -87,6 +89,10 @@ flags.DEFINE_bool('update_dhcp_on_disassociate', False,
'Whether to update dhcp when fixed_ip is disassociated')
flags.DEFINE_integer('fixed_ip_disassociate_timeout', 600,
'Seconds after which a deallocated ip is disassociated')
+flags.DEFINE_string('network_host', socket.gethostname(),
+ 'Network host to use for ip allocation in flat modes')
+flags.DEFINE_bool('fake_call', False,
+ 'If True, skip using the queue and make local calls')
class AddressAlreadyAllocated(exception.Error):
@@ -223,8 +229,8 @@ class NetworkManager(manager.Manager):
network_ref = self.db.fixed_ip_get_network(context, address)
self.driver.update_dhcp(context, network_ref['id'])
- def get_network(self, context):
- """Get the network for the current context."""
+ def get_network_host(self, context):
+ """Get the network host for the current context."""
raise NotImplementedError()
def create_networks(self, context, num_networks, network_size,
@@ -312,10 +318,6 @@ class FlatManager(NetworkManager):
"""Network is created manually."""
pass
- def setup_fixed_ip(self, context, address):
- """Currently no setup."""
- pass
-
def create_networks(self, context, cidr, num_networks, network_size,
*args, **kwargs):
"""Create networks based on parameters."""
@@ -336,14 +338,25 @@ class FlatManager(NetworkManager):
if network_ref:
self._create_fixed_ips(context, network_ref['id'])
- def get_network(self, context):
- """Get the network for the current context."""
- # NOTE(vish): To support mutilple network hosts, This could randomly
- # select from multiple networks instead of just
- # returning the one. It could also potentially be done
- # in the scheduler.
- return self.db.network_get_by_bridge(context,
- FLAGS.flat_network_bridge)
+ def get_network_host(self, context):
+ """Get the network host for the current context."""
+ network_ref = self.db.network_get_by_bridge(context,
+ FLAGS.flat_network_bridge)
+ # NOTE(vish): If the network has no host, use the network_host flag.
+ # This could eventually be a a db lookup of some sort, but
+ # a flag is easy to handle for now.
+ host = network_ref['host']
+ if not host:
+ topic = self.db.queue_get_for(context,
+ FLAGS.network_topic,
+ FLAGS.network_host)
+ if FLAGS.fake_call:
+ return self.set_network_host(context, network_ref['id'])
+ host = rpc.call(context,
+ FLAGS.network_topic,
+ {"method": "set_network_host",
+ "args": {"network_id": network_ref['id']}})
+ return host
def _on_set_network_host(self, context, network_id):
"""Called when this host becomes the host for a network."""
@@ -374,10 +387,16 @@ class FlatDHCPManager(FlatManager):
self.driver.ensure_bridge(network_ref['bridge'],
FLAGS.flat_interface)
- def setup_fixed_ip(self, context, address):
+ def allocate_fixed_ip(self, context, instance_id, *args, **kwargs):
"""Setup dhcp for this network."""
+ address = super(FlatDHCPManager, self).allocate_fixed_ip(context,
+ instance_id,
+ *args,
+ **kwargs)
network_ref = db.fixed_ip_get_network(context, address)
- self.driver.update_dhcp(context, network_ref['id'])
+ if not FLAGS.fake_network:
+ self.driver.update_dhcp(context, network_ref['id'])
+ return address
def deallocate_fixed_ip(self, context, address, *args, **kwargs):
"""Returns a fixed ip to the pool."""
@@ -446,28 +465,20 @@ class VlanManager(NetworkManager):
network_ref['id'],
instance_id)
self.db.fixed_ip_update(context, address, {'allocated': True})
+ if not FLAGS.fake_network:
+ self.driver.update_dhcp(context, network_ref['id'])
return address
def deallocate_fixed_ip(self, context, address, *args, **kwargs):
"""Returns a fixed ip to the pool."""
self.db.fixed_ip_update(context, address, {'allocated': False})
- def setup_fixed_ip(self, context, address):
- """Sets forwarding rules and dhcp for fixed ip."""
- network_ref = self.db.fixed_ip_get_network(context, address)
- self.driver.update_dhcp(context, network_ref['id'])
-
def setup_compute_network(self, context, instance_id):
"""Sets up matching network for compute hosts."""
network_ref = db.network_get_by_instance(context, instance_id)
self.driver.ensure_vlan_bridge(network_ref['vlan'],
network_ref['bridge'])
- def restart_nets(self):
- """Ensure the network for each user is enabled."""
- # TODO(vish): Implement this
- pass
-
def create_networks(self, context, cidr, num_networks, network_size,
vlan_start, vpn_start):
"""Create networks based on parameters."""
@@ -494,10 +505,23 @@ class VlanManager(NetworkManager):
if network_ref:
self._create_fixed_ips(context, network_ref['id'])
- def get_network(self, context):
+ def get_network_host(self, context):
"""Get the network for the current context."""
- return self.db.project_get_network(context.elevated(),
- context.project_id)
+ network_ref = self.db.project_get_network(context.elevated(),
+ context.project_id)
+ # NOTE(vish): If the network has no host, do a call to get an
+ # available host. This should be changed to go through
+ # the scheduler at some point.
+ host = network_ref['host']
+ if not host:
+ if FLAGS.fake_call:
+ return self.set_network_host(context, network_ref['id'])
+ host = rpc.call(context,
+ FLAGS.network_topic,
+ {"method": "set_network_host",
+ "args": {"network_id": network_ref['id']}})
+
+ return host
def _on_set_network_host(self, context, network_id):
"""Called when this host becomes the host for a network."""
diff --git a/nova/rpc.py b/nova/rpc.py
index 6e2cf051a..844088348 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -245,7 +245,7 @@ def msg_reply(msg_id, reply=None, failure=None):
logging.error(_("Returning exception %s to caller"), message)
logging.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
- conn = Connection.instance()
+ conn = Connection.instance(True)
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py
index 53a762310..70d2c44da 100644
--- a/nova/tests/cloud_unittest.py
+++ b/nova/tests/cloud_unittest.py
@@ -22,20 +22,18 @@ import logging
from M2Crypto import BIO
from M2Crypto import RSA
import os
-import StringIO
import tempfile
import time
from eventlet import greenthread
-from xml.etree import ElementTree
from nova import context
from nova import crypto
from nova import db
from nova import flags
from nova import rpc
+from nova import service
from nova import test
-from nova import utils
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
@@ -54,7 +52,8 @@ os.makedirs(IMAGES_PATH)
class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
- self.flags(connection_type='fake', images_path=IMAGES_PATH)
+ self.flags(connection_type='fake',
+ images_path=IMAGES_PATH)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.DEBUG)
@@ -62,27 +61,23 @@ class CloudTestCase(test.TestCase):
# set up our cloud
self.cloud = cloud.CloudController()
- # set up a service
- self.compute = utils.import_object(FLAGS.compute_manager)
- self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.compute_topic,
- proxy=self.compute)
- self.compute_consumer.attach_to_eventlet()
- self.network = utils.import_object(FLAGS.network_manager)
- self.network_consumer = rpc.AdapterConsumer(connection=self.conn,
- topic=FLAGS.network_topic,
- proxy=self.network)
- self.network_consumer.attach_to_eventlet()
+ # set up services
+ self.compute = service.Service.create(binary='nova-compute')
+ self.compute.start()
+ self.network = service.Service.create(binary='nova-network')
+ self.network.start()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
self.project = self.manager.create_project('proj', 'admin', 'proj')
self.context = context.RequestContext(user=self.user,
- project=self.project)
+ project=self.project)
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
+ self.compute.kill()
+ self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@@ -109,12 +104,13 @@ class CloudTestCase(test.TestCase):
{'address': address,
'host': FLAGS.host})
self.cloud.allocate_address(self.context)
- inst = db.instance_create(self.context, {})
+ inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
+ greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,
diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py
index 187ca31de..348bb3351 100644
--- a/nova/tests/compute_unittest.py
+++ b/nova/tests/compute_unittest.py
@@ -41,6 +41,7 @@ class ComputeTestCase(test.TestCase):
logging.getLogger().setLevel(logging.DEBUG)
super(ComputeTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py
index bcac20585..96473ac7c 100644
--- a/nova/tests/network_unittest.py
+++ b/nova/tests/network_unittest.py
@@ -26,6 +26,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
+from nova import service
from nova import test
from nova import utils
from nova.auth import manager
@@ -40,6 +41,7 @@ class NetworkTestCase(test.TestCase):
# NOTE(vish): if you change these flags, make sure to change the
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
+ fake_call=True,
fake_network=True,
network_size=16,
num_networks=5)
@@ -56,16 +58,13 @@ class NetworkTestCase(test.TestCase):
# create the necessary network data for the project
user_context = context.RequestContext(project=self.projects[i],
user=self.user)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
instance_ref = self._create_instance(0)
self.instance_id = instance_ref['id']
instance_ref = self._create_instance(1)
self.instance2_id = instance_ref['id']
def tearDown(self):
- super(NetworkTestCase, self).tearDown()
# TODO(termie): this should really be instantiating clean datastores
# in between runs, one failure kills all the tests
db.instance_destroy(context.get_admin_context(), self.instance_id)
@@ -73,6 +72,7 @@ class NetworkTestCase(test.TestCase):
for project in self.projects:
self.manager.delete_project(project)
self.manager.delete_user(self.user)
+ super(NetworkTestCase, self).tearDown()
def _create_instance(self, project_num, mac=None):
if not mac:
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py
index a2495e65a..6ea2edcab 100644
--- a/nova/tests/rpc_unittest.py
+++ b/nova/tests/rpc_unittest.py
@@ -33,7 +33,7 @@ class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
- self.conn = rpc.Connection.instance()
+ self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
@@ -79,6 +79,33 @@ class RpcTestCase(test.TestCase):
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
+ def test_nested_calls(self):
+ """Test that we can do an rpc.call inside another call"""
+ class Nested(object):
+ @staticmethod
+ def echo(context, queue, value):
+ """Calls echo in the passed queue"""
+ logging.debug("Nested received %s, %s", queue, value)
+ ret = rpc.call(context,
+ queue,
+ {"method": "echo",
+ "args": {"value": value}})
+ logging.debug("Nested return %s", ret)
+ return value
+
+ nested = Nested()
+ conn = rpc.Connection.instance(True)
+ consumer = rpc.AdapterConsumer(connection=conn,
+ topic='nested',
+ proxy=nested)
+ consumer.attach_to_eventlet()
+ value = 42
+ result = rpc.call(self.context,
+ 'nested', {"method": "echo",
+ "args": {"queue": "test",
+ "value": value}})
+ self.assertEqual(value, result)
+
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call
diff --git a/nova/tests/scheduler_unittest.py b/nova/tests/scheduler_unittest.py
index d1756b8fb..df5e7afa5 100644
--- a/nova/tests/scheduler_unittest.py
+++ b/nova/tests/scheduler_unittest.py
@@ -78,6 +78,7 @@ class SimpleDriverTestCase(test.TestCase):
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
self.flags(connection_type='fake',
+ stub_network=True,
max_cores=4,
max_gigabytes=4,
network_manager='nova.network.manager.FlatManager',
diff --git a/nova/tests/virt_unittest.py b/nova/tests/virt_unittest.py
index 7682f9662..9ad009510 100644
--- a/nova/tests/virt_unittest.py
+++ b/nova/tests/virt_unittest.py
@@ -33,6 +33,7 @@ flags.DECLARE('instances_path', 'nova.compute.manager')
class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
+ self.flags(fake_call=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
@@ -88,9 +89,9 @@ class LibvirtConnTestCase(test.TestCase):
user_context = context.RequestContext(project=self.project,
user=self.user)
instance_ref = db.instance_create(user_context, instance)
- network_ref = self.network.get_network(user_context)
- self.network.set_network_host(context.get_admin_context(),
- network_ref['id'])
+ host = self.network.get_network_host(user_context.elevated())
+ network_ref = db.project_get_network(context.get_admin_context(),
+ self.project.id)
fixed_ip = {'address': self.test_ip,
'network_id': network_ref['id']}
diff --git a/nova/virt/libvirt.uml.xml.template.THIS b/nova/virt/libvirt.uml.xml.template.THIS
deleted file mode 100644
index 506f2ef72..000000000
--- a/nova/virt/libvirt.uml.xml.template.THIS
+++ /dev/null
@@ -1,27 +0,0 @@
-<domain type='%(type)s'>
- <name>%(name)s</name>
- <memory>%(memory_kb)s</memory>
- <os>
- <type>%(type)s</type>
- <kernel>/usr/bin/linux</kernel>
- <root>/dev/ubda1</root>
- </os>
- <devices>
- <disk type='file'>
- <source file='%(basepath)s/disk'/>
- <target dev='ubd0' bus='uml'/>
- </disk>
- <interface type='bridge'>
- <source bridge='%(bridge_name)s'/>
- <mac address='%(mac_address)s'/>
- <filterref filter="nova-instance-%(name)s">
- <parameter name="IP" value="%(ip_address)s" />
- <parameter name="DHCPSERVER" value="%(dhcp_server)s" />
- %(extra_params)s
- </filterref>
- </interface>
- <console type="file">
- <source path='%(basepath)s/console.log'/>
- </console>
- </devices>
-</domain>
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index 16bcfe3c0..8d3a6a261 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -40,6 +40,7 @@ import logging
import os
import shutil
+from eventlet import greenthread
from eventlet import event
from eventlet import tpool
@@ -498,8 +499,8 @@ class LibvirtConnection(object):
# TODO(termie): cache?
logging.debug(_('instance %s: starting toXML method'),
instance['name'])
- network = db.project_get_network(context.get_admin_context(),
- instance['project_id'])
+ network = db.network_get_by_instance(context.get_admin_context(),
+ instance['id'])
# FIXME(vish): stick this in db
instance_type = instance['instance_type']
instance_type = instance_types.INSTANCE_TYPES[instance_type]
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 3b00ce8bf..4d897af35 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -59,8 +59,8 @@ class VMOps(object):
raise Exception('Attempted to create non-unique name %s' %
instance.name)
- bridge = db.project_get_network(context.get_admin_context(),
- instance.project_id).bridge
+ bridge = db.network_get_by_instance(context.get_admin_context(),
+ instance['id'])['bridge']
network_ref = \
NetworkHelper.find_network_with_bridge(self._session, bridge)