summaryrefslogtreecommitdiffstats
path: root/BitTorrent/TorrentQueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'BitTorrent/TorrentQueue.py')
-rw-r--r--BitTorrent/TorrentQueue.py848
1 files changed, 848 insertions, 0 deletions
diff --git a/BitTorrent/TorrentQueue.py b/BitTorrent/TorrentQueue.py
new file mode 100644
index 0000000..4187a80
--- /dev/null
+++ b/BitTorrent/TorrentQueue.py
@@ -0,0 +1,848 @@
+# The contents of this file are subject to the BitTorrent Open Source License
+# Version 1.1 (the License). You may not copy or use this file, in either
+# source code or executable form, except in compliance with the License. You
+# may obtain a copy of the License at http://www.bittorrent.com/license/.
+#
+# Software distributed under the License is distributed on an AS IS basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
+# for the specific language governing rights and limitations under the
+# License.
+
+# Written by Uoti Urpala
+
+from __future__ import division
+
+import os
+import sys
+import threading
+import traceback
+
+
+from BitTorrent import GetTorrent
+from BitTorrent.platform import bttime
+from BitTorrent.download import Feedback, Multitorrent
+from BitTorrent.bencode import bdecode
+from BitTorrent.ConvertedMetainfo import ConvertedMetainfo
+from BitTorrent.prefs import Preferences
+from BitTorrent import BTFailure, BTShutdown, INFO, WARNING, ERROR, CRITICAL
+from BitTorrent import configfile
+from BitTorrent import FAQ_URL
+import BitTorrent
+
+
+RUNNING = 0
+RUN_QUEUED = 1
+QUEUED = 2
+KNOWN = 3
+ASKING_LOCATION = 4
+
+
+class TorrentInfo(object):
+
+ def __init__(self, config):
+ self.metainfo = None
+ self.dl = None
+ self.state = None
+ self.completion = None
+ self.finishtime = None
+ self.uptotal = 0
+ self.uptotal_old = 0
+ self.downtotal = 0
+ self.downtotal_old = 0
+ self.config = config
+
+ def _set_dlpath(self, value):
+ self.config['save_as'] = value
+
+ def _get_dlpath(self):
+ return self.config['save_as']
+
+ dlpath = property(_get_dlpath, _set_dlpath)
+
+
+def decode_position(l, pred, succ, default=None):
+ if default is None:
+ default = len(l)
+ if pred is None and succ is None:
+ return default
+ if pred is None:
+ return 0
+ if succ is None:
+ return len(l)
+ try:
+ if l[0] == succ and pred not in l:
+ return 0
+ if l[-1] == pred and succ not in l:
+ return len(l)
+ i = l.index(pred)
+ if l[i+1] == succ:
+ return i+1
+ except (ValueError, IndexError):
+ pass
+ return default
+
+
+class TorrentQueue(Feedback):
+
+ def __init__(self, config, ui_options, ipc):
+ self.ui_options = ui_options
+ self.ipc = ipc
+ self.config = config
+ self.config['def_running_torrents'] = 1 # !@# XXX
+ self.config['max_running_torrents'] = 100 # !@# XXX
+ self.doneflag = threading.Event()
+ self.torrents = {}
+ self.starting_torrent = None
+ self.running_torrents = []
+ self.queue = []
+ self.other_torrents = []
+ self.last_save_time = 0
+ self.last_version_check = 0
+ self.initialized = 0
+
+ def run(self, ui, ui_wrap, startflag):
+ try:
+ self.ui = ui
+ self.run_ui_task = ui_wrap
+ self.multitorrent = Multitorrent(self.config, self.doneflag,
+ self.global_error, listen_fail_ok=True)
+ self.rawserver = self.multitorrent.rawserver
+ self.ipc.set_rawserver(self.rawserver)
+ self.ipc.start(self.external_command)
+ try:
+ self._restore_state()
+ except BTFailure, e:
+ self.torrents = {}
+ self.running_torrents = []
+ self.queue = []
+ self.other_torrents = []
+ self.global_error(ERROR, _("Could not load saved state: ")+str(e))
+ else:
+ for infohash in self.running_torrents + self.queue + \
+ self.other_torrents:
+ t = self.torrents[infohash]
+ if t.dlpath is not None:
+ t.completion = self.multitorrent.get_completion(
+ self.config, t.metainfo, t.dlpath)
+ state = t.state
+ if state == RUN_QUEUED:
+ state = RUNNING
+ self.run_ui_task(self.ui.new_displayed_torrent, infohash,
+ t.metainfo, t.dlpath, state, t.config,
+ t.completion, t.uptotal, t.downtotal, )
+ self._check_queue()
+ self.initialized = 1
+ startflag.set()
+ except Exception, e:
+ # dump a normal exception traceback
+ traceback.print_exc()
+ # set the error flag
+ self.initialized = -1
+ # signal the gui thread to stop waiting
+ startflag.set()
+ return
+
+ self._queue_loop()
+ self.multitorrent.rawserver.listen_forever()
+ if self.doneflag.isSet():
+ self.run_ui_task(self.ui.quit)
+ self.multitorrent.close_listening_socket()
+ self.ipc.stop()
+ for infohash in list(self.running_torrents):
+ t = self.torrents[infohash]
+ if t.state == RUN_QUEUED:
+ continue
+ t.dl.shutdown()
+ if t.dl is not None: # possibly set to none by failed()
+ totals = t.dl.get_total_transfer()
+ t.uptotal = t.uptotal_old + totals[0]
+ t.downtotal = t.downtotal_old + totals[1]
+ self._dump_state()
+
+ def _check_version(self):
+ now = bttime()
+ if self.last_version_check > 0 and \
+ self.last_version_check > now - 24*60*60:
+ return
+ self.last_version_check = now
+ self.run_ui_task(self.ui.check_version)
+
+ def _dump_config(self):
+ configfile.save_ui_config(self.config, 'bittorrent',
+ self.ui_options, self.global_error)
+ for infohash,t in self.torrents.items():
+ ec = lambda level, message: self.error(t.metainfo, level, message)
+ config = t.config.getDict()
+ if config:
+ configfile.save_torrent_config(self.config['data_dir'],
+ infohash, config, ec)
+
+ def _dump_state(self):
+ self.last_save_time = bttime()
+ r = []
+ def write_entry(infohash, t):
+ if t.dlpath is None:
+ assert t.state == ASKING_LOCATION
+ r.append(infohash.encode('hex') + '\n')
+ else:
+ r.append(infohash.encode('hex') + ' ' + str(t.uptotal) + ' ' +
+ str(t.downtotal)+' '+t.dlpath.encode('string_escape')+'\n')
+ r.append('BitTorrent UI state file, version 3\n')
+ r.append('Running torrents\n')
+ for infohash in self.running_torrents:
+ write_entry(infohash, self.torrents[infohash])
+ r.append('Queued torrents\n')
+ for infohash in self.queue:
+ write_entry(infohash, self.torrents[infohash])
+ r.append('Known torrents\n')
+ for infohash in self.other_torrents:
+ write_entry(infohash, self.torrents[infohash])
+ r.append('End\n')
+ f = None
+ try:
+ filename = os.path.join(self.config['data_dir'], 'ui_state')
+ f = file(filename + '.new', 'wb')
+ f.write(''.join(r))
+ f.close()
+ if os.access(filename, os.F_OK):
+ os.remove(filename) # no atomic rename on win32
+ os.rename(filename + '.new', filename)
+ except Exception, e:
+ self.global_error(ERROR, _("Could not save UI state: ") + str(e))
+ if f is not None:
+ f.close()
+
+ def _restore_state(self):
+ def decode_line(line):
+ hashtext = line[:40]
+ try:
+ infohash = hashtext.decode('hex')
+ except:
+ raise BTFailure(_("Invalid state file contents"))
+ if len(infohash) != 20:
+ raise BTFailure(_("Invalid state file contents"))
+ try:
+ path = os.path.join(self.config['data_dir'], 'metainfo',
+ hashtext)
+ f = file(path, 'rb')
+ data = f.read()
+ f.close()
+ except Exception, e:
+ try:
+ f.close()
+ except:
+ pass
+ self.global_error(ERROR,
+ (_("Error reading file \"%s\".") % path) +
+ " (" + str(e)+ "), " +
+ _("cannot restore state completely"))
+ return None
+ if infohash in self.torrents:
+ raise BTFailure(_("Invalid state file (duplicate entry)"))
+ t = TorrentInfo(Preferences(self.config))
+ self.torrents[infohash] = t
+ try:
+ t.metainfo = ConvertedMetainfo(bdecode(data))
+ except Exception, e:
+ self.global_error(ERROR,
+ (_("Corrupt data in \"%s\", cannot restore torrent.") % path) +
+ '('+str(e)+')')
+ return None
+ t.metainfo.reported_errors = True # suppress redisplay on restart
+ if infohash != t.metainfo.infohash:
+ self.global_error(ERROR,
+ (_("Corrupt data in \"%s\", cannot restore torrent.") % path) +
+ _("(infohash mismatch)"))
+ return None
+ if len(line) == 41:
+ t.dlpath = None
+ return infohash, t
+ try:
+ if version < 2:
+ t.dlpath = line[41:-1].decode('string_escape')
+ elif version == 3:
+ up, down, dlpath = line[41:-1].split(' ', 2)
+ t.uptotal = t.uptotal_old = int(up)
+ t.downtotal = t.downtotal_old = int(down)
+ t.dlpath = dlpath.decode('string_escape')
+ elif version >= 4:
+ up, down = line[41:-1].split(' ', 1)
+ t.uptotal = t.uptotal_old = int(up)
+ t.downtotal = t.downtotal_old = int(down)
+ except ValueError: # unpack, int(), decode()
+ raise BTFailure(_("Invalid state file (bad entry)"))
+ config = configfile.read_torrent_config(self.config,
+ self.config['data_dir'],
+ infohash, self.global_error)
+ t.config.update(config)
+ return infohash, t
+ filename = os.path.join(self.config['data_dir'], 'ui_state')
+ if not os.path.exists(filename):
+ return
+ f = None
+ try:
+ f = file(filename, 'rb')
+ lines = f.readlines()
+ f.close()
+ except Exception, e:
+ if f is not None:
+ f.close()
+ raise BTFailure(str(e))
+ i = iter(lines)
+ try:
+ txt = 'BitTorrent UI state file, version '
+ version = i.next()
+ if not version.startswith(txt):
+ raise BTFailure(_("Bad UI state file"))
+ try:
+ version = int(version[len(txt):-1])
+ except:
+ raise BTFailure(_("Bad UI state file version"))
+ if version > 4:
+ raise BTFailure(_("Unsupported UI state file version (from "
+ "newer client version?)"))
+ if version < 3:
+ if i.next() != 'Running/queued torrents\n':
+ raise BTFailure(_("Invalid state file contents"))
+ else:
+ if i.next() != 'Running torrents\n':
+ raise BTFailure(_("Invalid state file contents"))
+ while True:
+ line = i.next()
+ if line == 'Queued torrents\n':
+ break
+ t = decode_line(line)
+ if t is None:
+ continue
+ infohash, t = t
+ if t.dlpath is None:
+ raise BTFailure(_("Invalid state file contents"))
+ t.state = RUN_QUEUED
+ self.running_torrents.append(infohash)
+ while True:
+ line = i.next()
+ if line == 'Known torrents\n':
+ break
+ t = decode_line(line)
+ if t is None:
+ continue
+ infohash, t = t
+ if t.dlpath is None:
+ raise BTFailure(_("Invalid state file contents"))
+ t.state = QUEUED
+ self.queue.append(infohash)
+ while True:
+ line = i.next()
+ if line == 'End\n':
+ break
+ t = decode_line(line)
+ if t is None:
+ continue
+ infohash, t = t
+ if t.dlpath is None:
+ t.state = ASKING_LOCATION
+ else:
+ t.state = KNOWN
+ self.other_torrents.append(infohash)
+ except StopIteration:
+ raise BTFailure(_("Invalid state file contents"))
+
+ def _queue_loop(self):
+ if self.doneflag.isSet():
+ return
+ self.rawserver.add_task(self._queue_loop, 20)
+ now = bttime()
+ self._check_version()
+ if self.queue and self.starting_torrent is None:
+ mintime = now - self.config['next_torrent_time'] * 60
+ minratio = self.config['next_torrent_ratio'] / 100
+ if self.config['seed_forever']:
+ minratio = 1e99
+ else:
+ mintime = 0
+ minratio = self.config['last_torrent_ratio'] / 100
+ if self.config['seed_last_forever']:
+ minratio = 1e99
+ if minratio >= 1e99:
+ return
+ for infohash in self.running_torrents:
+ t = self.torrents[infohash]
+ myminratio = minratio
+ if t.dl:
+ if self.queue and t.dl.config['seed_last_forever']:
+ myminratio = 1e99
+ elif t.dl.config['seed_forever']:
+ myminratio = 1e99
+ if t.state == RUN_QUEUED:
+ continue
+ totals = t.dl.get_total_transfer()
+ # not updated for remaining torrents if one is stopped, who cares
+ t.uptotal = t.uptotal_old + totals[0]
+ t.downtotal = t.downtotal_old + totals[1]
+ if t.finishtime is None or t.finishtime > now - 120:
+ continue
+ if t.finishtime > mintime:
+ if t.uptotal < t.metainfo.total_bytes * myminratio:
+ continue
+ self.change_torrent_state(infohash, RUNNING, KNOWN)
+ break
+ if self.running_torrents and self.last_save_time < now - 300:
+ self._dump_state()
+
+ def _check_queue(self):
+ if self.starting_torrent is not None or self.config['pause']:
+ return
+ for infohash in self.running_torrents:
+ if self.torrents[infohash].state == RUN_QUEUED:
+ self.starting_torrent = infohash
+ t = self.torrents[infohash]
+ t.state = RUNNING
+ t.finishtime = None
+ t.dl = self.multitorrent.start_torrent(t.metainfo, t.config,
+ self, t.dlpath)
+ return
+ if not self.queue or len(self.running_torrents) >= \
+ self.config['def_running_torrents']:
+ return
+ infohash = self.queue.pop(0)
+ self.starting_torrent = infohash
+ t = self.torrents[infohash]
+ assert t.state == QUEUED
+ t.state = RUNNING
+ t.finishtime = None
+ self.running_torrents.append(infohash)
+ t.dl = self.multitorrent.start_torrent(t.metainfo, t.config, self,
+ t.dlpath)
+ self._send_state(infohash)
+
+ def _send_state(self, infohash):
+ t = self.torrents[infohash]
+ state = t.state
+ if state == RUN_QUEUED:
+ state = RUNNING
+ pos = None
+ if state in (KNOWN, RUNNING, QUEUED):
+ l = self._get_list(state)
+ if l[-1] != infohash:
+ pos = l.index(infohash)
+ self.run_ui_task(self.ui.torrent_state_changed, infohash, t.dlpath,
+ state, t.completion, t.uptotal_old, t.downtotal_old, pos)
+
+ def _stop_running(self, infohash):
+ t = self.torrents[infohash]
+ if t.state == RUN_QUEUED:
+ self.running_torrents.remove(infohash)
+ t.state = KNOWN
+ return True
+ assert t.state == RUNNING
+ shutdown_succeded = t.dl.shutdown()
+ if not shutdown_succeded:
+ self.run_ui_task(self.ui.open_log)
+ self.error(t.metainfo, ERROR, "Unable to stop torrent. Please send this application log to bugs@bittorrent.com .")
+ return False
+ if infohash == self.starting_torrent:
+ self.starting_torrent = None
+ try:
+ self.running_torrents.remove(infohash)
+ except ValueError:
+ self.other_torrents.remove(infohash)
+ return False
+ else:
+ t.state = KNOWN
+ totals = t.dl.get_total_transfer()
+ t.uptotal_old += totals[0]
+ t.uptotal = t.uptotal_old
+ t.downtotal_old += totals[1]
+ t.downtotal = t.downtotal_old
+ t.dl = None
+ t.completion = self.multitorrent.get_completion(self.config,
+ t.metainfo, t.dlpath)
+ return True
+
+ def external_command(self, action, *datas):
+ if action == 'start_torrent':
+ assert len(datas) == 2
+ self.start_new_torrent_by_name(datas[0], save_as=datas[1])
+ elif action == 'show_error':
+ assert len(datas) == 1
+ self.global_error(ERROR, datas[0])
+ elif action == 'no-op':
+ pass
+
+ def remove_torrent(self, infohash):
+ if infohash not in self.torrents:
+ return
+ state = self.torrents[infohash].state
+ if state == QUEUED:
+ self.queue.remove(infohash)
+ elif state in (RUNNING, RUN_QUEUED):
+ self._stop_running(infohash)
+ self._check_queue()
+ else:
+ self.other_torrents.remove(infohash)
+ self.run_ui_task(self.ui.removed_torrent, infohash)
+ del self.torrents[infohash]
+
+ for d in ['metainfo', 'resume']:
+ filename = os.path.join(self.config['data_dir'], d,
+ infohash.encode('hex'))
+ try:
+ os.remove(filename)
+ except Exception, e:
+ self.global_error(WARNING,
+ (_("Could not delete cached %s file:")%d) +
+ str(e))
+ ec = lambda level, message: self.global_error(level, message)
+ configfile.remove_torrent_config(self.config['data_dir'],
+ infohash, ec)
+ self._dump_state()
+
+ def set_save_location(self, infohash, dlpath):
+ torrent = self.torrents.get(infohash)
+ if torrent is None or torrent.state == RUNNING:
+ return
+ torrent.dlpath = dlpath
+ self._dump_config()
+ torrent.completion = self.multitorrent.get_completion(self.config,
+ torrent.metainfo, dlpath)
+ if torrent.state == ASKING_LOCATION:
+ torrent.state = KNOWN
+ self.change_torrent_state(infohash, KNOWN, QUEUED)
+ else:
+ self._send_state(infohash)
+ self._dump_state()
+
+ def _get_torrent_then_callback(self, name, save_as=None):
+ data, errors = GetTorrent.get_quietly(name)
+
+ if data:
+ self.start_new_torrent(data, save_as)
+ for error in errors:
+ self.run_ui_task(self.ui.global_error, ERROR, error)
+
+ def start_new_torrent_by_name(self, name, save_as=None):
+ t = threading.Thread(target=self._get_torrent_then_callback,
+ args=(name, save_as,))
+ t.setDaemon(True)
+ t.start()
+
+ def start_new_torrent(self, data, save_as=None):
+ t = TorrentInfo(Preferences(self.config))
+ try:
+ t.metainfo = ConvertedMetainfo(bdecode(data))
+ except Exception, e:
+ self.global_error(ERROR, _("This is not a valid torrent file. (%s)")
+ % str(e))
+ return
+ infohash = t.metainfo.infohash
+ if infohash in self.torrents:
+ real_state = self.torrents[infohash].state
+ if real_state in (RUNNING, RUN_QUEUED):
+ self.error(t.metainfo, ERROR,
+ _("This torrent (or one with the same contents) is "
+ "already running."))
+ elif real_state == QUEUED:
+ self.error(t.metainfo, ERROR,
+ _("This torrent (or one with the same contents) is "
+ "already waiting to run."))
+ elif real_state == ASKING_LOCATION:
+ pass
+ elif real_state == KNOWN:
+ self.change_torrent_state(infohash, KNOWN, newstate=QUEUED)
+ else:
+ raise BTFailure(_("Torrent in unknown state %d") % real_state)
+ return
+
+ path = os.path.join(self.config['data_dir'], 'metainfo',
+ infohash.encode('hex'))
+ try:
+ f = file(path+'.new', 'wb')
+ f.write(data)
+ f.close()
+ if os.access(path, os.F_OK):
+ os.remove(path) # no atomic rename on win32
+ os.rename(path+'.new', path)
+ except Exception, e:
+ try:
+ f.close()
+ except:
+ pass
+ self.global_error(ERROR, _("Could not write file ") + path +
+ ' (' + str(e) + '), ' +
+ _("torrent will not be restarted "
+ "correctly on client restart"))
+
+ config = configfile.read_torrent_config(self.config,
+ self.config['data_dir'],
+ infohash, self.global_error)
+ if config:
+ t.config.update(config)
+ if save_as:
+ self.run_ui_task(self.ui.set_config, 'save_as', save_as)
+ else:
+ save_as = None
+
+ self.torrents[infohash] = t
+ t.state = ASKING_LOCATION
+ self.other_torrents.append(infohash)
+ self._dump_state()
+ self.run_ui_task(self.ui.new_displayed_torrent, infohash,
+ t.metainfo, save_as, t.state, t.config)
+
+ def show_error(level, text):
+ self.run_ui_task(self.ui.error, infohash, level, text)
+ t.metainfo.show_encoding_errors(show_error)
+
+ def set_config(self, option, value, ihash=None):
+ if not ihash:
+ oldvalue = self.config[option]
+ self.config[option] = value
+ self.multitorrent.set_option(option, value)
+ if option == 'pause':
+ if value:# and not oldvalue:
+ self.set_zero_running_torrents()
+ elif not value:# and oldvalue:
+ self._check_queue()
+ else:
+ torrent = self.torrents[ihash]
+ if torrent.state == RUNNING:
+ torrent.dl.set_option(option, value)
+ if option in ('forwarded_port', 'maxport'):
+ torrent.dl.change_port()
+ torrent.config[option] = value
+ self._dump_config()
+
+ def request_status(self, infohash, want_spew, want_fileinfo):
+ torrent = self.torrents.get(infohash)
+ if torrent is None or torrent.state != RUNNING:
+ return
+ status = torrent.dl.get_status(want_spew, want_fileinfo)
+ if torrent.finishtime is not None:
+ now = bttime()
+ uptotal = status['upTotal'] + torrent.uptotal_old
+ downtotal = status['downTotal'] + torrent.downtotal_old
+ ulspeed = status['upRate2']
+ if self.queue:
+ ratio = torrent.dl.config['next_torrent_ratio'] / 100
+ if torrent.dl.config['seed_forever']:
+ ratio = 1e99
+ else:
+ ratio = torrent.dl.config['last_torrent_ratio'] / 100
+ if torrent.dl.config['seed_last_forever']:
+ ratio = 1e99
+ if ulspeed <= 0 or ratio >= 1e99:
+ rem = 1e99
+ elif downtotal == 0:
+ rem = (torrent.metainfo.total_bytes * ratio - uptotal) / ulspeed
+ else:
+ rem = (downtotal * ratio - uptotal) / ulspeed
+ if self.queue and not torrent.dl.config['seed_forever']:
+ rem = min(rem, torrent.finishtime +
+ torrent.dl.config['next_torrent_time'] * 60 - now)
+ rem = max(rem, torrent.finishtime + 120 - now)
+ if rem <= 0:
+ rem = 1
+ if rem >= 1e99:
+ rem = None
+ status['timeEst'] = rem
+ self.run_ui_task(self.ui.update_status, infohash, status)
+
+ def _get_list(self, state):
+ if state == KNOWN:
+ return self.other_torrents
+ elif state == QUEUED:
+ return self.queue
+ elif state in (RUNNING, RUN_QUEUED):
+ return self.running_torrents
+ assert False
+
+ def change_torrent_state(self, infohash, oldstate, newstate=None,
+ pred=None, succ=None, replaced=None, force_running=False):
+ t = self.torrents.get(infohash)
+ if t is None or (t.state != oldstate and not (t.state == RUN_QUEUED and
+ oldstate == RUNNING)):
+ return
+ if newstate is None:
+ newstate = oldstate
+ assert oldstate in (KNOWN, QUEUED, RUNNING)
+ assert newstate in (KNOWN, QUEUED, RUNNING)
+ pos = None
+ if oldstate != RUNNING and newstate == RUNNING and replaced is None:
+ if len(self.running_torrents) >= (force_running and self.config[
+ 'max_running_torrents'] or self.config['def_running_torrents']):
+ if force_running:
+ self.global_error(ERROR,
+ _("Can't run more than %d torrents "
+ "simultaneously. For more info see the"
+ " FAQ at %s.")%
+ (self.config['max_running_torrents'],
+ FAQ_URL))
+ newstate = QUEUED
+ pos = 0
+ l = self._get_list(newstate)
+ if newstate == oldstate:
+ origpos = l.index(infohash)
+ del l[origpos]
+ if pos is None:
+ pos = decode_position(l, pred, succ, -1)
+ if pos == -1 or l == origpos:
+ l.insert(origpos, infohash)
+ return
+ l.insert(pos, infohash)
+ self._dump_state()
+ self.run_ui_task(self.ui.reorder_torrent, infohash, pos)
+ return
+ if pos is None:
+ pos = decode_position(l, pred, succ)
+ if newstate == RUNNING:
+ newstate = RUN_QUEUED
+ if replaced and len(self.running_torrents) >= \
+ self.config['def_running_torrents']:
+ t2 = self.torrents.get(replaced)
+ if t2 is None or t2.state not in (RUNNING, RUN_QUEUED):
+ return
+ if self.running_torrents.index(replaced) < pos:
+ pos -= 1
+ if self._stop_running(replaced):
+ t2.state = QUEUED
+ self.queue.insert(0, replaced)
+ self._send_state(replaced)
+ else:
+ self.other_torrents.append(replaced)
+ if oldstate == RUNNING:
+ if newstate == QUEUED and len(self.running_torrents) <= \
+ self.config['def_running_torrents'] and pos == 0:
+ return
+ if not self._stop_running(infohash):
+ if newstate == KNOWN:
+ self.other_torrents.insert(pos, infohash)
+ self.run_ui_task(self.ui.reorder_torrent, infohash, pos)
+ else:
+ self.other_torrents.append(infohash)
+ return
+ else:
+ self._get_list(oldstate).remove(infohash)
+ t.state = newstate
+ l.insert(pos, infohash)
+ self._check_queue() # sends state if it starts the torrent from queue
+ if t.state != RUNNING or newstate == RUN_QUEUED:
+ self._send_state(infohash)
+ self._dump_state()
+
+ def set_zero_running_torrents(self):
+ newrun = []
+ for infohash in list(self.running_torrents):
+ t = self.torrents[infohash]
+ if self._stop_running(infohash):
+ newrun.append(infohash)
+ t.state = RUN_QUEUED
+ else:
+ self.other_torrents.append(infohash)
+ self.running_torrents = newrun
+
+ def check_completion(self, infohash, filelist=False):
+ t = self.torrents.get(infohash)
+ if t is None:
+ return
+ r = self.multitorrent.get_completion(self.config, t.metainfo,
+ t.dlpath, filelist)
+ if r is None or not filelist:
+ self.run_ui_task(self.ui.update_completion, infohash, r)
+ else:
+ self.run_ui_task(self.ui.update_completion, infohash, *r)
+
+ def global_error(self, level, text):
+ self.run_ui_task(self.ui.global_error, level, text)
+
+ # callbacks from torrent instances
+
+ def failed(self, torrent, is_external):
+ infohash = torrent.infohash
+ if infohash == self.starting_torrent:
+ self.starting_torrent = None
+ self.running_torrents.remove(infohash)
+ t = self.torrents[infohash]
+ t.state = KNOWN
+ if is_external:
+ t.completion = self.multitorrent.get_completion(
+ self.config, t.metainfo, t.dlpath)
+ else:
+ t.completion = None
+ totals = t.dl.get_total_transfer()
+ t.uptotal_old += totals[0]
+ t.uptotal = t.uptotal_old
+ t.downtotal_old += totals[1]
+ t.downtotal = t.downtotal_old
+ t.dl = None
+ self.other_torrents.append(infohash)
+ self._send_state(infohash)
+ if not self.doneflag.isSet():
+ self._check_queue()
+ self._dump_state()
+
+ def finished(self, torrent):
+ """called when a download reaches 100%"""
+ infohash = torrent.infohash
+ t = self.torrents[infohash]
+ totals = t.dl.get_total_transfer()
+ if t.downtotal == 0 and t.downtotal_old == 0 and totals[1] == 0:
+ self.set_config('seed_forever', True, infohash)
+ self.set_config('seed_last_forever', True, infohash)
+ self.request_status(infohash, False, False)
+
+ if infohash == self.starting_torrent:
+ t = self.torrents[infohash]
+ if self.queue:
+ ratio = t.config['next_torrent_ratio'] / 100
+ if t.config['seed_forever']:
+ ratio = 1e99
+ msg = _("Not starting torrent as there are other torrents "
+ "waiting to run, and this one already meets the "
+ "settings for when to stop seeding.")
+ else:
+ ratio = t.config['last_torrent_ratio'] / 100
+ if t.config['seed_last_forever']:
+ ratio = 1e99
+ msg = _("Not starting torrent as it already meets the "
+ "settings for when to stop seeding the last "
+ "completed torrent.")
+ if ratio < 1e99 and t.uptotal >= t.metainfo.total_bytes * ratio:
+ raise BTShutdown(msg)
+ self.torrents[torrent.infohash].finishtime = bttime()
+
+ def started(self, torrent):
+ infohash = torrent.infohash
+ assert infohash == self.starting_torrent
+ self.starting_torrent = None
+ self._check_queue()
+
+ def error(self, torrent, level, text):
+ self.run_ui_task(self.ui.error, torrent.infohash, level, text)
+
+
+class ThreadWrappedQueue(object):
+
+ def __init__(self, wrapped):
+ self.wrapped = wrapped
+
+ def set_done(self):
+ self.wrapped.doneflag.set()
+ # add a dummy task to make sure the thread wakes up and notices flag
+ def dummy():
+ pass
+ self.wrapped.rawserver.external_add_task(dummy, 0)
+
+# OW
+def _makemethod(methodname):
+ def wrapper(self, *args, **kws):
+ def f():
+ getattr(self.wrapped, methodname)(*args, **kws)
+ self.wrapped.rawserver.external_add_task(f, 0)
+ return wrapper
+
+# also OW
+for methodname in ("request_status set_config start_new_torrent "
+ "start_new_torrent_by_name remove_torrent set_save_location "
+ "change_torrent_state check_completion").split():
+ setattr(ThreadWrappedQueue, methodname, _makemethod(methodname))
+del _makemethod, methodname