summaryrefslogtreecommitdiffstats
path: root/plugins/xenserver
diff options
context:
space:
mode:
authorRick Harris <rconradharris@gmail.com>2012-06-25 22:49:17 +0000
committerRick Harris <rconradharris@gmail.com>2012-10-03 21:29:46 +0000
commit216a83da476f2ea1ca3fcc6af3ac7dce6c20db27 (patch)
treec8cf8540f88950e08e0c12bc1d1f9d80258eadd1 /plugins/xenserver
parent1d4506c16aec9674be6a3685ba585a8bbd9c1559 (diff)
downloadnova-216a83da476f2ea1ca3fcc6af3ac7dce6c20db27.tar.gz
nova-216a83da476f2ea1ca3fcc6af3ac7dce6c20db27.tar.xz
nova-216a83da476f2ea1ca3fcc6af3ac7dce6c20db27.zip
Add ability to download images via BitTorrent.
This patch adds a new dom0 plugin which supports downloading images via BitTorrent. Torrent metadata files are assumed to be served from a webserver which is specified by the `torrent_base_url` config. Under the hood, the dom0 plugins calls out to rasterbar's libtorrent via Python bindings in order perform the initial download as well as the seeding thereafter. Implements BP xenserver-bittorrent-images Change-Id: I824720a6e3a37317080a22cd7405d2a88172c3ef
Diffstat (limited to 'plugins/xenserver')
-rw-r--r--plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec2
-rwxr-xr-xplugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder121
-rwxr-xr-xplugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent299
-rw-r--r--plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py10
4 files changed, 432 insertions, 0 deletions
diff --git a/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec b/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec
index 2260392b1..19b508d2e 100644
--- a/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec
+++ b/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec
@@ -28,8 +28,10 @@ rm -rf $RPM_BUILD_ROOT
%files
%defattr(-,root,root,-)
+/etc/xapi.d/plugins/_bittorrent_seeder
/etc/xapi.d/plugins/agent
/etc/xapi.d/plugins/bandwidth
+/etc/xapi.d/plugins/bittorrent
/etc/xapi.d/plugins/glance
/etc/xapi.d/plugins/kernel
/etc/xapi.d/plugins/migration
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder b/plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder
new file mode 100755
index 000000000..88262139e
--- /dev/null
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder
@@ -0,0 +1,121 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2012 Openstack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Seed a bittorent image. This file should not be executed directly, rather it
+should be kicked off by the `bittorent` dom0 plugin."""
+
+import os
+import sys
+import time
+
+import libtorrent
+
+#FIXME(sirp): should this use pluginlib from 5.6?
+from pluginlib_nova import *
+configure_logging('_bittorrent_seeder')
+
+
+def _daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
+ """
+ do the UNIX double-fork magic, see Stevens' "Advanced
+ Programming in the UNIX Environment" for details (ISBN 0201563177)
+
+ Source: http://www.jejik.com/articles/2007/02/
+ a_simple_unix_linux_daemon_in_python/
+ """
+ # 1st fork
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # first parent returns
+ return False
+ except OSError, e:
+ logging.error("fork #1 failed: %d (%s)" % (
+ e.errno, e.strerror))
+ return
+
+ # decouple from parent environment
+ os.chdir("/")
+ os.setsid()
+ os.umask(0)
+
+ # 2nd fork
+ try:
+ pid = os.fork()
+ if pid > 0:
+ # second parent exits
+ sys.exit(0)
+ except OSError, e:
+ logging.error("fork #2 failed: %d (%s)" % (
+ e.errno, e.strerror))
+ return
+
+ # redirect standard file descriptors
+ sys.stdout.flush()
+ sys.stderr.flush()
+ si = open(stdin, 'r')
+ so = open(stdout, 'a+')
+ se = open(stderr, 'a+', 0)
+ os.dup2(si.fileno(), sys.stdin.fileno())
+ os.dup2(so.fileno(), sys.stdout.fileno())
+ os.dup2(se.fileno(), sys.stderr.fileno())
+ return True
+
+
+def main(torrent_path, seed_cache_path, torrent_seed_duration,
+ torrent_listen_port_start, torrent_listen_port_end):
+ seed_time = time.time() + torrent_seed_duration
+ logging.debug("Seeding '%s' for %d secs" % (
+ torrent_path, torrent_seed_duration))
+
+ child = _daemonize()
+ if not child:
+ return
+
+ # At this point we're the daemonized child...
+ session = libtorrent.session()
+ session.listen_on(torrent_listen_port_start, torrent_listen_port_end)
+
+ torrent_file = open(torrent_path, 'rb')
+ try:
+ torrent_data = torrent_file.read()
+ finally:
+ torrent_file.close()
+
+ decoded_data = libtorrent.bdecode(torrent_data)
+
+ info = libtorrent.torrent_info(decoded_data)
+ torrent = session.add_torrent(
+ info, seed_cache_path,
+ storage_mode=libtorrent.storage_mode_t.storage_mode_sparse)
+ try:
+ while time.time() < seed_time:
+ time.sleep(5)
+ finally:
+ session.remove_torrent(torrent)
+
+ logging.debug("Seeding of '%s' finished" % torrent_path)
+
+
+if __name__ == "__main__":
+ (torrent_path, seed_cache_path, torrent_seed_duration,
+ torrent_listen_port_start, torrent_listen_port_end) = sys.argv[1:]
+ torrent_seed_duration = int(torrent_seed_duration)
+ torrent_listen_port_start = int(torrent_listen_port_start)
+ torrent_listen_port_end = int(torrent_listen_port_end)
+
+ main(torrent_path, seed_cache_path, torrent_seed_duration,
+ torrent_listen_port_start, torrent_listen_port_end)
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent b/plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent
new file mode 100755
index 000000000..fef1862b1
--- /dev/null
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent
@@ -0,0 +1,299 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2012 Openstack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Download images via BitTorrent."""
+
+import errno
+import inspect
+import os
+import random
+import shutil
+import tempfile
+import time
+
+import libtorrent
+import urllib2
+import XenAPIPlugin
+
+import utils
+
+#FIXME(sirp): should this use pluginlib from 5.6?
+from pluginlib_nova import *
+configure_logging('bittorrent')
+
+DEFAULT_TORRENT_CACHE = '/images/torrents'
+DEFAULT_SEED_CACHE = '/images/seeds'
+SEEDER_PROCESS = '_bittorrent_seeder'
+
+
+def _make_torrent_cache():
+ torrent_cache_path = os.environ.get(
+ 'TORRENT_CACHE', DEFAULT_TORRENT_CACHE)
+
+ if not os.path.exists(torrent_cache_path):
+ os.mkdir(torrent_cache_path)
+
+ return torrent_cache_path
+
+
+def _fetch_torrent_file(torrent_cache_path, image_id, torrent_base_url):
+ torrent_path = os.path.join(
+ torrent_cache_path, image_id + '.torrent')
+
+ if not os.path.exists(torrent_path):
+ torrent_url = torrent_base_url + "/%s.torrent" % image_id
+ logging.info("Downloading %s" % torrent_url)
+
+ # Write contents to temporary path to ensure we don't have partially
+ # completed files in the cache.
+ temp_directory = tempfile.mkdtemp(dir=torrent_cache_path)
+ try:
+ temp_path = os.path.join(
+ temp_directory, os.path.basename(torrent_path))
+ temp_file = open(temp_path, 'wb')
+ try:
+ remote_torrent_file = urllib2.urlopen(torrent_url)
+ shutil.copyfileobj(remote_torrent_file, temp_file)
+ finally:
+ temp_file.close()
+
+ os.rename(temp_path, torrent_path)
+ finally:
+ shutil.rmtree(temp_directory)
+
+ return torrent_path
+
+
+def _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed):
+ """Delete any torrent files that haven't been accessed recently."""
+ if not torrent_max_last_accessed:
+ logging.debug("Reaping old torrent files disabled, skipping...")
+ return
+
+ logging.debug("Preparing to reap old torrent files,"
+ " torrent_max_last_accessed=%d" % torrent_max_last_accessed)
+
+ for fname in os.listdir(torrent_cache_path):
+ torrent_path = os.path.join(torrent_cache_path, fname)
+ last_accessed = time.time() - os.path.getatime(torrent_path)
+ if last_accessed > torrent_max_last_accessed:
+ logging.debug("Reaping '%s', last_accessed=%d" % (
+ torrent_path, last_accessed))
+ utils.delete_if_exists(torrent_path)
+
+
+def _download(torrent_path, save_as_path, torrent_listen_port_start,
+ torrent_listen_port_end, torrent_download_stall_cutoff):
+ session = libtorrent.session()
+ session.listen_on(torrent_listen_port_start, torrent_listen_port_end)
+ info = libtorrent.torrent_info(
+ libtorrent.bdecode(open(torrent_path, 'rb').read()))
+
+ torrent = session.add_torrent(
+ info, save_as_path,
+ storage_mode=libtorrent.storage_mode_t.storage_mode_sparse)
+
+ try:
+ last_progress = 0
+ last_progress_updated = time.time()
+
+ while not torrent.is_seed():
+ s = torrent.status()
+
+ progress = s.progress * 100
+
+ if progress != last_progress:
+ last_progress = progress
+ last_progress_updated = time.time()
+
+ stall_duration = time.time() - last_progress_updated
+ if stall_duration > torrent_download_stall_cutoff:
+ logging.error(
+ "Download stalled: stall_duration=%d,"
+ " torrent_download_stall_cutoff=%d" % (
+ stall_duration, torrent_download_stall_cutoff))
+ raise Exception("Bittorrent download stall detected, bailing!")
+
+ logging.debug(
+ '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d)'
+ ' %s %s' % (progress, s.download_rate / 1000,
+ s.upload_rate / 1000, s.num_peers, s.state,
+ torrent_path))
+ time.sleep(1)
+ finally:
+ session.remove_torrent(torrent)
+
+ logging.debug("Download of '%s' finished" % torrent_path)
+
+
+def _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
+ torrent_max_seeder_processes_per_host):
+ if not torrent_seed_duration:
+ logging.debug("Seeding disabled, skipping...")
+ return False
+
+ if os.path.exists(seed_path):
+ logging.debug("Seed is already present, skipping....")
+ return False
+
+ rand = random.random()
+ if rand > torrent_seed_chance:
+ logging.debug("%.2f > %.2f, seeding randomly skipping..." % (
+ rand, torrent_seed_chance))
+ return False
+
+ num_active_seeders = len(list(_active_seeder_processes()))
+ if (torrent_max_seeder_processes_per_host >= 0 and
+ num_active_seeders >= torrent_max_seeder_processes_per_host):
+ logging.debug("max number of seeder processes for this host reached"
+ " (%d), skipping..." %
+ torrent_max_seeder_processes_per_host)
+ return False
+
+ return True
+
+
+def _seed(torrent_path, seed_cache_path, torrent_seed_duration,
+ torrent_listen_port_start, torrent_listen_port_end):
+ plugin_path = os.path.dirname(inspect.getabsfile(inspect.currentframe()))
+ seeder_path = os.path.join(plugin_path, SEEDER_PROCESS)
+ seed_cmd = "%s %s %s %d %d %d" % (
+ seeder_path, torrent_path, seed_cache_path, torrent_seed_duration,
+ torrent_listen_port_start, torrent_listen_port_end)
+
+ seed_proc = utils.make_subprocess(seed_cmd)
+ utils.finish_subprocess(seed_proc, seed_cmd)
+
+
+def _seed_if_needed(seed_cache_path, tarball_path, torrent_path,
+ torrent_seed_duration, torrent_seed_chance,
+ torrent_listen_port_start, torrent_listen_port_end,
+ torrent_max_seeder_processes_per_host):
+ seed_filename = os.path.basename(tarball_path)
+ seed_path = os.path.join(seed_cache_path, seed_filename)
+
+ if _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
+ torrent_max_seeder_processes_per_host):
+ logging.debug("Preparing to seed '%s' for %d secs" % (
+ seed_path, torrent_seed_duration))
+ utils._rename(tarball_path, seed_path)
+
+ # Daemonize and seed the image
+ _seed(torrent_path, seed_cache_path, torrent_seed_duration,
+ torrent_listen_port_start, torrent_listen_port_end)
+ else:
+ utils.delete_if_exists(tarball_path)
+
+
+def _extract_tarball(tarball_path, staging_path):
+ """Extract the tarball into the staging directory."""
+ tarball_fileobj = open(tarball_path, 'rb')
+ try:
+ utils.extract_tarball(tarball_fileobj, staging_path)
+ finally:
+ tarball_fileobj.close()
+
+
+def _active_seeder_processes():
+ """Yields command-line of active seeder processes.
+
+ Roughly equivalent to performing ps | grep _bittorrent_seeder
+ """
+ pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
+ for pid in pids:
+ try:
+ cmdline = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
+ except IOError, e:
+ if e.errno != errno.ENOENT:
+ raise
+
+ if SEEDER_PROCESS in cmdline:
+ yield cmdline
+
+
+def _reap_finished_seeds(seed_cache_path):
+ """Delete any cached seeds where the seeder process has died."""
+ logging.debug("Preparing to reap finished seeds")
+ missing = {}
+ for fname in os.listdir(seed_cache_path):
+ seed_path = os.path.join(seed_cache_path, fname)
+ missing[seed_path] = None
+
+ for cmdline in _active_seeder_processes():
+ for seed_path in missing.keys():
+ seed_filename = os.path.basename(seed_path)
+ if seed_filename in cmdline:
+ del missing[seed_path]
+
+ for seed_path in missing:
+ logging.debug("Reaping cached seed '%s'" % seed_path)
+ utils.delete_if_exists(seed_path)
+
+
+def _make_seed_cache():
+ seed_cache_path = os.environ.get('SEED_CACHE', DEFAULT_SEED_CACHE)
+ if not os.path.exists(seed_cache_path):
+ os.mkdir(seed_cache_path)
+ return seed_cache_path
+
+
+def download_vhd(session, image_id, torrent_base_url, torrent_seed_duration,
+ torrent_seed_chance, torrent_max_last_accessed,
+ torrent_listen_port_start, torrent_listen_port_end,
+ torrent_download_stall_cutoff, uuid_stack, sr_path,
+ torrent_max_seeder_processes_per_host):
+ """Download an image from BitTorrent, unbundle it, and then deposit the
+ VHDs into the storage repository
+ """
+ seed_cache_path = _make_seed_cache()
+ torrent_cache_path = _make_torrent_cache()
+
+ # Housekeeping
+ _reap_finished_seeds(seed_cache_path)
+ _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed)
+
+ torrent_path = _fetch_torrent_file(
+ torrent_cache_path, image_id, torrent_base_url)
+
+ staging_path = utils.make_staging_area(sr_path)
+ try:
+ tarball_filename = os.path.basename(torrent_path).replace(
+ '.torrent', '')
+ tarball_path = os.path.join(staging_path, tarball_filename)
+
+ # Download tarball into staging area
+ _download(torrent_path, staging_path, torrent_listen_port_start,
+ torrent_listen_port_end, torrent_download_stall_cutoff)
+
+ # Extract the tarball into the staging area
+ _extract_tarball(tarball_path, staging_path)
+
+ # Move the VHDs from the staging area into the storage repository
+ vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack)
+
+ # Seed image for others in the swarm
+ _seed_if_needed(seed_cache_path, tarball_path, torrent_path,
+ torrent_seed_duration, torrent_seed_chance,
+ torrent_listen_port_start, torrent_listen_port_end,
+ torrent_max_seeder_processes_per_host)
+ finally:
+ utils.cleanup_staging_area(staging_path)
+
+ return vdi_list
+
+
+if __name__ == '__main__':
+ utils.register_plugin_calls(download_vhd)
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py
index 66f46a839..510687d7b 100644
--- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py
@@ -27,6 +27,16 @@ import XenAPIPlugin
CHUNK_SIZE = 8192
+def delete_if_exists(path):
+ try:
+ os.unlink(path)
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ logging.warning("'%s' was already deleted, skipping delete" % path)
+ else:
+ raise
+
+
def _link(src, dst):
logging.info("Hard-linking file '%s' -> '%s'" % (src, dst))
os.link(src, dst)