diff options
author | Rick Harris <rconradharris@gmail.com> | 2012-06-25 22:49:17 +0000 |
---|---|---|
committer | Rick Harris <rconradharris@gmail.com> | 2012-10-03 21:29:46 +0000 |
commit | 216a83da476f2ea1ca3fcc6af3ac7dce6c20db27 (patch) | |
tree | c8cf8540f88950e08e0c12bc1d1f9d80258eadd1 /plugins | |
parent | 1d4506c16aec9674be6a3685ba585a8bbd9c1559 (diff) | |
download | nova-216a83da476f2ea1ca3fcc6af3ac7dce6c20db27.tar.gz nova-216a83da476f2ea1ca3fcc6af3ac7dce6c20db27.tar.xz nova-216a83da476f2ea1ca3fcc6af3ac7dce6c20db27.zip |
Add ability to download images via BitTorrent.
This patch adds a new dom0 plugin which supports downloading images via
BitTorrent. Torrent metadata files are assumed to be served from a
webserver which is specified by the `torrent_base_url` config.
Under the hood, the dom0 plugins calls out to rasterbar's libtorrent via
Python bindings in order perform the initial download as well as the
seeding thereafter.
Implements BP xenserver-bittorrent-images
Change-Id: I824720a6e3a37317080a22cd7405d2a88172c3ef
Diffstat (limited to 'plugins')
4 files changed, 432 insertions, 0 deletions
diff --git a/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec b/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec index 2260392b1..19b508d2e 100644 --- a/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec +++ b/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec @@ -28,8 +28,10 @@ rm -rf $RPM_BUILD_ROOT %files %defattr(-,root,root,-) +/etc/xapi.d/plugins/_bittorrent_seeder /etc/xapi.d/plugins/agent /etc/xapi.d/plugins/bandwidth +/etc/xapi.d/plugins/bittorrent /etc/xapi.d/plugins/glance /etc/xapi.d/plugins/kernel /etc/xapi.d/plugins/migration diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder b/plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder new file mode 100755 index 000000000..88262139e --- /dev/null +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder @@ -0,0 +1,121 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 Openstack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Seed a bittorent image. This file should not be executed directly, rather it +should be kicked off by the `bittorent` dom0 plugin.""" + +import os +import sys +import time + +import libtorrent + +#FIXME(sirp): should this use pluginlib from 5.6? +from pluginlib_nova import * +configure_logging('_bittorrent_seeder') + + +def _daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): + """ + do the UNIX double-fork magic, see Stevens' "Advanced + Programming in the UNIX Environment" for details (ISBN 0201563177) + + Source: http://www.jejik.com/articles/2007/02/ + a_simple_unix_linux_daemon_in_python/ + """ + # 1st fork + try: + pid = os.fork() + if pid > 0: + # first parent returns + return False + except OSError, e: + logging.error("fork #1 failed: %d (%s)" % ( + e.errno, e.strerror)) + return + + # decouple from parent environment + os.chdir("/") + os.setsid() + os.umask(0) + + # 2nd fork + try: + pid = os.fork() + if pid > 0: + # second parent exits + sys.exit(0) + except OSError, e: + logging.error("fork #2 failed: %d (%s)" % ( + e.errno, e.strerror)) + return + + # redirect standard file descriptors + sys.stdout.flush() + sys.stderr.flush() + si = open(stdin, 'r') + so = open(stdout, 'a+') + se = open(stderr, 'a+', 0) + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + return True + + +def main(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end): + seed_time = time.time() + torrent_seed_duration + logging.debug("Seeding '%s' for %d secs" % ( + torrent_path, torrent_seed_duration)) + + child = _daemonize() + if not child: + return + + # At this point we're the daemonized child... + session = libtorrent.session() + session.listen_on(torrent_listen_port_start, torrent_listen_port_end) + + torrent_file = open(torrent_path, 'rb') + try: + torrent_data = torrent_file.read() + finally: + torrent_file.close() + + decoded_data = libtorrent.bdecode(torrent_data) + + info = libtorrent.torrent_info(decoded_data) + torrent = session.add_torrent( + info, seed_cache_path, + storage_mode=libtorrent.storage_mode_t.storage_mode_sparse) + try: + while time.time() < seed_time: + time.sleep(5) + finally: + session.remove_torrent(torrent) + + logging.debug("Seeding of '%s' finished" % torrent_path) + + +if __name__ == "__main__": + (torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) = sys.argv[1:] + torrent_seed_duration = int(torrent_seed_duration) + torrent_listen_port_start = int(torrent_listen_port_start) + torrent_listen_port_end = int(torrent_listen_port_end) + + main(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent b/plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent new file mode 100755 index 000000000..fef1862b1 --- /dev/null +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent @@ -0,0 +1,299 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 Openstack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Download images via BitTorrent.""" + +import errno +import inspect +import os +import random +import shutil +import tempfile +import time + +import libtorrent +import urllib2 +import XenAPIPlugin + +import utils + +#FIXME(sirp): should this use pluginlib from 5.6? +from pluginlib_nova import * +configure_logging('bittorrent') + +DEFAULT_TORRENT_CACHE = '/images/torrents' +DEFAULT_SEED_CACHE = '/images/seeds' +SEEDER_PROCESS = '_bittorrent_seeder' + + +def _make_torrent_cache(): + torrent_cache_path = os.environ.get( + 'TORRENT_CACHE', DEFAULT_TORRENT_CACHE) + + if not os.path.exists(torrent_cache_path): + os.mkdir(torrent_cache_path) + + return torrent_cache_path + + +def _fetch_torrent_file(torrent_cache_path, image_id, torrent_base_url): + torrent_path = os.path.join( + torrent_cache_path, image_id + '.torrent') + + if not os.path.exists(torrent_path): + torrent_url = torrent_base_url + "/%s.torrent" % image_id + logging.info("Downloading %s" % torrent_url) + + # Write contents to temporary path to ensure we don't have partially + # completed files in the cache. + temp_directory = tempfile.mkdtemp(dir=torrent_cache_path) + try: + temp_path = os.path.join( + temp_directory, os.path.basename(torrent_path)) + temp_file = open(temp_path, 'wb') + try: + remote_torrent_file = urllib2.urlopen(torrent_url) + shutil.copyfileobj(remote_torrent_file, temp_file) + finally: + temp_file.close() + + os.rename(temp_path, torrent_path) + finally: + shutil.rmtree(temp_directory) + + return torrent_path + + +def _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed): + """Delete any torrent files that haven't been accessed recently.""" + if not torrent_max_last_accessed: + logging.debug("Reaping old torrent files disabled, skipping...") + return + + logging.debug("Preparing to reap old torrent files," + " torrent_max_last_accessed=%d" % torrent_max_last_accessed) + + for fname in os.listdir(torrent_cache_path): + torrent_path = os.path.join(torrent_cache_path, fname) + last_accessed = time.time() - os.path.getatime(torrent_path) + if last_accessed > torrent_max_last_accessed: + logging.debug("Reaping '%s', last_accessed=%d" % ( + torrent_path, last_accessed)) + utils.delete_if_exists(torrent_path) + + +def _download(torrent_path, save_as_path, torrent_listen_port_start, + torrent_listen_port_end, torrent_download_stall_cutoff): + session = libtorrent.session() + session.listen_on(torrent_listen_port_start, torrent_listen_port_end) + info = libtorrent.torrent_info( + libtorrent.bdecode(open(torrent_path, 'rb').read())) + + torrent = session.add_torrent( + info, save_as_path, + storage_mode=libtorrent.storage_mode_t.storage_mode_sparse) + + try: + last_progress = 0 + last_progress_updated = time.time() + + while not torrent.is_seed(): + s = torrent.status() + + progress = s.progress * 100 + + if progress != last_progress: + last_progress = progress + last_progress_updated = time.time() + + stall_duration = time.time() - last_progress_updated + if stall_duration > torrent_download_stall_cutoff: + logging.error( + "Download stalled: stall_duration=%d," + " torrent_download_stall_cutoff=%d" % ( + stall_duration, torrent_download_stall_cutoff)) + raise Exception("Bittorrent download stall detected, bailing!") + + logging.debug( + '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d)' + ' %s %s' % (progress, s.download_rate / 1000, + s.upload_rate / 1000, s.num_peers, s.state, + torrent_path)) + time.sleep(1) + finally: + session.remove_torrent(torrent) + + logging.debug("Download of '%s' finished" % torrent_path) + + +def _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance, + torrent_max_seeder_processes_per_host): + if not torrent_seed_duration: + logging.debug("Seeding disabled, skipping...") + return False + + if os.path.exists(seed_path): + logging.debug("Seed is already present, skipping....") + return False + + rand = random.random() + if rand > torrent_seed_chance: + logging.debug("%.2f > %.2f, seeding randomly skipping..." % ( + rand, torrent_seed_chance)) + return False + + num_active_seeders = len(list(_active_seeder_processes())) + if (torrent_max_seeder_processes_per_host >= 0 and + num_active_seeders >= torrent_max_seeder_processes_per_host): + logging.debug("max number of seeder processes for this host reached" + " (%d), skipping..." % + torrent_max_seeder_processes_per_host) + return False + + return True + + +def _seed(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end): + plugin_path = os.path.dirname(inspect.getabsfile(inspect.currentframe())) + seeder_path = os.path.join(plugin_path, SEEDER_PROCESS) + seed_cmd = "%s %s %s %d %d %d" % ( + seeder_path, torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) + + seed_proc = utils.make_subprocess(seed_cmd) + utils.finish_subprocess(seed_proc, seed_cmd) + + +def _seed_if_needed(seed_cache_path, tarball_path, torrent_path, + torrent_seed_duration, torrent_seed_chance, + torrent_listen_port_start, torrent_listen_port_end, + torrent_max_seeder_processes_per_host): + seed_filename = os.path.basename(tarball_path) + seed_path = os.path.join(seed_cache_path, seed_filename) + + if _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance, + torrent_max_seeder_processes_per_host): + logging.debug("Preparing to seed '%s' for %d secs" % ( + seed_path, torrent_seed_duration)) + utils._rename(tarball_path, seed_path) + + # Daemonize and seed the image + _seed(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) + else: + utils.delete_if_exists(tarball_path) + + +def _extract_tarball(tarball_path, staging_path): + """Extract the tarball into the staging directory.""" + tarball_fileobj = open(tarball_path, 'rb') + try: + utils.extract_tarball(tarball_fileobj, staging_path) + finally: + tarball_fileobj.close() + + +def _active_seeder_processes(): + """Yields command-line of active seeder processes. + + Roughly equivalent to performing ps | grep _bittorrent_seeder + """ + pids = [pid for pid in os.listdir('/proc') if pid.isdigit()] + for pid in pids: + try: + cmdline = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read() + except IOError, e: + if e.errno != errno.ENOENT: + raise + + if SEEDER_PROCESS in cmdline: + yield cmdline + + +def _reap_finished_seeds(seed_cache_path): + """Delete any cached seeds where the seeder process has died.""" + logging.debug("Preparing to reap finished seeds") + missing = {} + for fname in os.listdir(seed_cache_path): + seed_path = os.path.join(seed_cache_path, fname) + missing[seed_path] = None + + for cmdline in _active_seeder_processes(): + for seed_path in missing.keys(): + seed_filename = os.path.basename(seed_path) + if seed_filename in cmdline: + del missing[seed_path] + + for seed_path in missing: + logging.debug("Reaping cached seed '%s'" % seed_path) + utils.delete_if_exists(seed_path) + + +def _make_seed_cache(): + seed_cache_path = os.environ.get('SEED_CACHE', DEFAULT_SEED_CACHE) + if not os.path.exists(seed_cache_path): + os.mkdir(seed_cache_path) + return seed_cache_path + + +def download_vhd(session, image_id, torrent_base_url, torrent_seed_duration, + torrent_seed_chance, torrent_max_last_accessed, + torrent_listen_port_start, torrent_listen_port_end, + torrent_download_stall_cutoff, uuid_stack, sr_path, + torrent_max_seeder_processes_per_host): + """Download an image from BitTorrent, unbundle it, and then deposit the + VHDs into the storage repository + """ + seed_cache_path = _make_seed_cache() + torrent_cache_path = _make_torrent_cache() + + # Housekeeping + _reap_finished_seeds(seed_cache_path) + _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed) + + torrent_path = _fetch_torrent_file( + torrent_cache_path, image_id, torrent_base_url) + + staging_path = utils.make_staging_area(sr_path) + try: + tarball_filename = os.path.basename(torrent_path).replace( + '.torrent', '') + tarball_path = os.path.join(staging_path, tarball_filename) + + # Download tarball into staging area + _download(torrent_path, staging_path, torrent_listen_port_start, + torrent_listen_port_end, torrent_download_stall_cutoff) + + # Extract the tarball into the staging area + _extract_tarball(tarball_path, staging_path) + + # Move the VHDs from the staging area into the storage repository + vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack) + + # Seed image for others in the swarm + _seed_if_needed(seed_cache_path, tarball_path, torrent_path, + torrent_seed_duration, torrent_seed_chance, + torrent_listen_port_start, torrent_listen_port_end, + torrent_max_seeder_processes_per_host) + finally: + utils.cleanup_staging_area(staging_path) + + return vdi_list + + +if __name__ == '__main__': + utils.register_plugin_calls(download_vhd) diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py index 66f46a839..510687d7b 100644 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py @@ -27,6 +27,16 @@ import XenAPIPlugin CHUNK_SIZE = 8192 +def delete_if_exists(path): + try: + os.unlink(path) + except OSError, e: + if e.errno == errno.ENOENT: + logging.warning("'%s' was already deleted, skipping delete" % path) + else: + raise + + def _link(src, dst): logging.info("Hard-linking file '%s' -> '%s'" % (src, dst)) os.link(src, dst) |