Source code for raytools.utility

# -*- coding: utf-8 -*-

# Copyright (c) 2019 Stephen Wasilewski
# =======================================================================
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# =======================================================================

"""progress bar"""
import shutil

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from concurrent.futures import wait, FIRST_COMPLETED
from multiprocessing import get_context

from tqdm import tqdm
from raytools import io


[docs] class TStqdm(tqdm): """object for managing tqdm progress bar, multiprcessing, and logging""" def __init__(self, instance=None, workers=False, position=0, desc=None, ncols=100, cap=None, context='fork', **kwargs): if str(workers).lower() in ('thread', 't', 'threads'): pool = ThreadPoolExecutor() elif workers: context = get_context(context) nproc = io.get_nproc(cap) pool = ProcessPoolExecutor(nproc, mp_context=context) else: pool = None self._instance = instance self.loglevel = position self.pool = pool self.wait = wait self.FIRST_COMPLETED = FIRST_COMPLETED if pool is None: self.nworkers = 0 else: self.nworkers = pool._max_workers ncols = min(ncols, shutil.get_terminal_size().columns) super().__init__(desc=self.ts_message(desc), position=position, ncols=ncols, **kwargs)
[docs] def ts_message(self, s): if self._instance is not None: p = type(self._instance).__name__ else: p = "" if s is None: s = f"{p}" else: s = f"{p} {s}" s = f"{' | ' * self.loglevel} {s}" return s
[docs] def write(self, s, file=None, end="\n", nolock=False): super().write(self.ts_message(s), file, end, nolock)
[docs] def set_description(self, desc=None, refresh=True): super().set_description(desc=self.ts_message(desc), refresh=refresh)
[docs] def pool_call(func, args, *fixed_args, cap=None, expandarg=True, desc="processing", workers=True, pbar=True, context='fork', **kwargs): """calls func for a sequence of arguments using a ProcessPool executor and a progress bar. result is equivalent to:: result = [] for arg in args: result.append(func(*args, *fixed_args, **kwargs)) return result Parameters ---------- func: callable the function to execute in parallel args: Sequence[Sequence] list of arguments (each item is expanded with '*' unless expandarg is false). first N args of func fixed_args: Sequence arguments passed to func that are the same for all calls (next N arguments after args) cap: int, optional execution cap for ProcessPool expandarg: bool, optional expand args with '*' when calling func desc: str, optional label for progress bar workers: Union[bool, str], optional return threadpool ('t', 'threads', 'thread') or processpool (True) pbar: bool, optional display progress bar while executing context: str, optional 'fork' or 'forkserver' this is for special cases that seem to emerge with certain hardware/os/python version combinations where 'fork' has issues. this option should go away once the correct platform settings are determined kwargs: additional keyword arguments passed to func Returns ------- sequence of results from func (order preserved) """ results = [] if not workers: result = [] for arg in args: if expandarg: result.append(func(*arg, *fixed_args, **kwargs)) else: result.append(func(arg, *fixed_args, **kwargs)) return result with TStqdm(workers=workers, total=len(args), cap=cap, desc=desc, disable=not pbar, context=context) as pbar: exc = pbar.pool futures = [] # submit asynchronous to process pool for arg in args: if expandarg: fu = exc.submit(func, *arg, *fixed_args, **kwargs) else: fu = exc.submit(func, arg, *fixed_args, **kwargs) futures.append(fu) # gather results (in order) for future in futures: results.append(future.result()) pbar.update(1) return results