# -*- coding: utf-8 -*-
# Copyright (c) 2019 Stephen Wasilewski
# =======================================================================
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# =======================================================================
"""progress bar"""
import shutil
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from concurrent.futures import wait, FIRST_COMPLETED
from multiprocessing import get_context
from tqdm import tqdm
from raytools import io
[docs]
class TStqdm(tqdm):
"""object for managing tqdm progress bar, multiprcessing, and logging"""
def __init__(self, instance=None, workers=False, position=0,
desc=None, ncols=100, cap=None, context='fork', **kwargs):
if str(workers).lower() in ('thread', 't', 'threads'):
pool = ThreadPoolExecutor()
elif workers:
context = get_context(context)
nproc = io.get_nproc(cap)
pool = ProcessPoolExecutor(nproc, mp_context=context)
else:
pool = None
self._instance = instance
self.loglevel = position
self.pool = pool
self.wait = wait
self.FIRST_COMPLETED = FIRST_COMPLETED
if pool is None:
self.nworkers = 0
else:
self.nworkers = pool._max_workers
ncols = min(ncols, shutil.get_terminal_size().columns)
super().__init__(desc=self.ts_message(desc), position=position,
ncols=ncols, **kwargs)
[docs]
def ts_message(self, s):
if self._instance is not None:
p = type(self._instance).__name__
else:
p = ""
if s is None:
s = f"{p}"
else:
s = f"{p} {s}"
s = f"{' | ' * self.loglevel} {s}"
return s
[docs]
def write(self, s, file=None, end="\n", nolock=False):
super().write(self.ts_message(s), file, end, nolock)
[docs]
def set_description(self, desc=None, refresh=True):
super().set_description(desc=self.ts_message(desc), refresh=refresh)
[docs]
def pool_call(func, args, *fixed_args, cap=None, expandarg=True,
desc="processing", workers=True, pbar=True, context='fork', **kwargs):
"""calls func for a sequence of arguments using a ProcessPool executor
and a progress bar. result is equivalent to::
result = []
for arg in args:
result.append(func(*args, *fixed_args, **kwargs))
return result
Parameters
----------
func: callable
the function to execute in parallel
args: Sequence[Sequence]
list of arguments (each item is expanded with '*' unless expandarg
is false). first N args of func
fixed_args: Sequence
arguments passed to func that are the same for all calls (next N
arguments after args)
cap: int, optional
execution cap for ProcessPool
expandarg: bool, optional
expand args with '*' when calling func
desc: str, optional
label for progress bar
workers: Union[bool, str], optional
return threadpool ('t', 'threads', 'thread') or processpool (True)
pbar: bool, optional
display progress bar while executing
context: str, optional
'fork' or 'forkserver' this is for special cases that seem to emerge
with certain hardware/os/python version combinations where 'fork' has
issues. this option should go away once the correct platform settings
are determined
kwargs:
additional keyword arguments passed to func
Returns
-------
sequence of results from func (order preserved)
"""
results = []
if not workers:
result = []
for arg in args:
if expandarg:
result.append(func(*arg, *fixed_args, **kwargs))
else:
result.append(func(arg, *fixed_args, **kwargs))
return result
with TStqdm(workers=workers, total=len(args), cap=cap,
desc=desc, disable=not pbar, context=context) as pbar:
exc = pbar.pool
futures = []
# submit asynchronous to process pool
for arg in args:
if expandarg:
fu = exc.submit(func, *arg, *fixed_args, **kwargs)
else:
fu = exc.submit(func, arg, *fixed_args, **kwargs)
futures.append(fu)
# gather results (in order)
for future in futures:
results.append(future.result())
pbar.update(1)
return results