summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--roles/collectd/fedmsg-activation/files/fedmsg-map.py49
1 files changed, 26 insertions, 23 deletions
diff --git a/roles/collectd/fedmsg-activation/files/fedmsg-map.py b/roles/collectd/fedmsg-activation/files/fedmsg-map.py
index fffdabe51..42c97b2a5 100644
--- a/roles/collectd/fedmsg-activation/files/fedmsg-map.py
+++ b/roles/collectd/fedmsg-activation/files/fedmsg-map.py
@@ -22,7 +22,7 @@ for_collectd = 'verbose' not in sys.argv
active = collections.defaultdict(list)
inactive = collections.defaultdict(list)
-pool = multiprocessing.pool.ThreadPool(10)
+pool = multiprocessing.pool.ThreadPool(25)
def info(content="\n"):
if not for_collectd:
@@ -30,32 +30,33 @@ def info(content="\n"):
sys.stdout.flush()
def scan_one(item):
- name, endpoints = item
- for endpoint in endpoints:
- if not endpoint.startswith('tcp://'):
- raise ValueError("Don't know how to deal with %r" % endpoint)
- endpoint = endpoint[len('tcp://'):].split(':')
- connection = None
- try:
- connection = socket.create_connection(endpoint, timeout)
- actual = base64.b64encode(connection.recv(10))
- if actual != expected:
- inactive[name].append((
- endpoint, "%r is not %r" % (actual, expected)))
- info("F")
- else:
- active[name].append((endpoint, "all active"))
- info(".")
- except socket.error as e:
- inactive[name].append((endpoint, str(e)))
+ name, endpoint = item
+ if not endpoint.startswith('tcp://'):
+ raise ValueError("Don't know how to deal with %r" % endpoint)
+ endpoint = endpoint[len('tcp://'):].split(':')
+ connection = None
+ try:
+ connection = socket.create_connection(endpoint, timeout)
+ actual = base64.b64encode(connection.recv(10))
+ if actual != expected:
+ inactive[name].append((
+ endpoint, "%r is not %r" % (actual, expected)))
info("F")
- if connection:
- connection.close()
+ else:
+ active[name].append((endpoint, "all active"))
+ info(".")
+ except socket.error as e:
+ inactive[name].append((endpoint, str(e)))
+ info("F")
+ if connection:
+ connection.close()
def scan_all():
-
- pool.map(scan_one, config['endpoints'].items())
+ items = [(name, addr)
+ for name, endpoints in config['endpoints'].items()
+ for addr in endpoints]
+ pool.map(scan_one, items)
info()
@@ -128,3 +129,5 @@ else:
print(output)
if interval - delta > 0:
time.sleep(interval - delta)
+
+pool.close()