diff options
Diffstat (limited to 'jenkins_jobs/parallel.py')
-rw-r--r-- | jenkins_jobs/parallel.py | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/jenkins_jobs/parallel.py b/jenkins_jobs/parallel.py index 5c0da07c..a409491e 100644 --- a/jenkins_jobs/parallel.py +++ b/jenkins_jobs/parallel.py @@ -33,11 +33,12 @@ class TaskFunc(dict): """ Simple class to wrap around the information needed to run a function. """ + def __init__(self, n_ord, func, args=None, kwargs=None): - self['func'] = func - self['args'] = args or [] - self['kwargs'] = kwargs or {} - self['ord'] = n_ord + self["func"] = func + self["args"] = args or [] + self["kwargs"] = kwargs or {} + self["ord"] = n_ord class Worker(threading.Thread): @@ -47,6 +48,7 @@ class Worker(threading.Thread): If the string 'done' is passed instead of a TaskFunc instance, the thread will end. """ + def __init__(self, in_queue, out_queue): threading.Thread.__init__(self) self.in_queue = in_queue @@ -55,15 +57,14 @@ class Worker(threading.Thread): def run(self): while True: task = self.in_queue.get() - if task == 'done': + if task == "done": return try: - res = task['func'](*task['args'], - **task['kwargs']) + res = task["func"](*task["args"], **task["kwargs"]) except Exception as exc: res = exc traceback.print_exc() - self.out_queue.put((task['ord'], res)) + self.out_queue.put((task["ord"], res)) def concurrent(func): @@ -102,8 +103,8 @@ def concurrent(func): array with the results of the executions in the same order the parameters were passed. """ - n_workers = kwargs.pop('n_workers', 0) - p_kwargs = kwargs.pop('concurrent', []) + n_workers = kwargs.pop("n_workers", 0) + p_kwargs = kwargs.pop("concurrent", []) # if only one parameter is passed inside the concurrent dict, run the # original function as is, no need for pools if len(p_kwargs) == 1: @@ -133,7 +134,7 @@ def concurrent(func): in_queue.put(TaskFunc(n_ord, func, args, f_kwargs)) n_ord += 1 for _ in range(n_workers): - in_queue.put('done') + in_queue.put("done") # Wait for the results logging.debug("Waiting for workers to finish processing") @@ -148,4 +149,5 @@ def concurrent(func): results = [r[1] for r in sorted(results)] logging.debug("Concurrent task finished") return results + return concurrentized |