summaryrefslogtreecommitdiffstats
path: root/jenkins_jobs/parallel.py
diff options
context:
space:
mode:
Diffstat (limited to 'jenkins_jobs/parallel.py')
-rw-r--r--jenkins_jobs/parallel.py24
1 files changed, 13 insertions, 11 deletions
diff --git a/jenkins_jobs/parallel.py b/jenkins_jobs/parallel.py
index 5c0da07c..a409491e 100644
--- a/jenkins_jobs/parallel.py
+++ b/jenkins_jobs/parallel.py
@@ -33,11 +33,12 @@ class TaskFunc(dict):
"""
Simple class to wrap around the information needed to run a function.
"""
+
def __init__(self, n_ord, func, args=None, kwargs=None):
- self['func'] = func
- self['args'] = args or []
- self['kwargs'] = kwargs or {}
- self['ord'] = n_ord
+ self["func"] = func
+ self["args"] = args or []
+ self["kwargs"] = kwargs or {}
+ self["ord"] = n_ord
class Worker(threading.Thread):
@@ -47,6 +48,7 @@ class Worker(threading.Thread):
If the string 'done' is passed instead of a TaskFunc instance, the thread
will end.
"""
+
def __init__(self, in_queue, out_queue):
threading.Thread.__init__(self)
self.in_queue = in_queue
@@ -55,15 +57,14 @@ class Worker(threading.Thread):
def run(self):
while True:
task = self.in_queue.get()
- if task == 'done':
+ if task == "done":
return
try:
- res = task['func'](*task['args'],
- **task['kwargs'])
+ res = task["func"](*task["args"], **task["kwargs"])
except Exception as exc:
res = exc
traceback.print_exc()
- self.out_queue.put((task['ord'], res))
+ self.out_queue.put((task["ord"], res))
def concurrent(func):
@@ -102,8 +103,8 @@ def concurrent(func):
array with the results of the executions in the same order the
parameters were passed.
"""
- n_workers = kwargs.pop('n_workers', 0)
- p_kwargs = kwargs.pop('concurrent', [])
+ n_workers = kwargs.pop("n_workers", 0)
+ p_kwargs = kwargs.pop("concurrent", [])
# if only one parameter is passed inside the concurrent dict, run the
# original function as is, no need for pools
if len(p_kwargs) == 1:
@@ -133,7 +134,7 @@ def concurrent(func):
in_queue.put(TaskFunc(n_ord, func, args, f_kwargs))
n_ord += 1
for _ in range(n_workers):
- in_queue.put('done')
+ in_queue.put("done")
# Wait for the results
logging.debug("Waiting for workers to finish processing")
@@ -148,4 +149,5 @@ def concurrent(func):
results = [r[1] for r in sorted(results)]
logging.debug("Concurrent task finished")
return results
+
return concurrentized