Source code for raytools.io
# -*- coding: utf-8 -*-
# Copyright (c) 2019 Stephen Wasilewski
# =======================================================================
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# =======================================================================
"""functions for reading and writing"""
import re
import shlex
import os
import sys
import textwrap
from subprocess import Popen, PIPE
import numpy as np
[docs]
def get_nproc(nproc=None):
if nproc is not None:
return nproc
env_nproc = os.getenv('RAYTOOLS_PROC_CAP')
try:
return int(env_nproc)
except (ValueError, TypeError):
return os.cpu_count()
[docs]
def set_nproc(nproc):
if nproc is None:
return None
if type(nproc) != int:
raise ValueError('nproc must be an int')
if nproc < 1:
unset_nproc()
else:
os.environ['RAYTOOLS_PROC_CAP'] = str(nproc)
[docs]
def np2bytes(ar, dtype='<f'):
"""format ar as bytestring
Parameters
----------
ar: np.array
dtype: str
argument to pass to np.dtype()
Returns
-------
bytes
"""
dt = np.dtype(dtype)
return ar.astype(dt).tobytes()
[docs]
def np2bytefile(ar, outf, dtype='<f', mode='wb'):
"""save vectors to file
Parameters
----------
ar: np.array
array to write
outf: str
file to write to
dtype: str
argument to pass to np.dtype()
"""
f = open(outf, mode)
f.write(np2bytes(ar, dtype=dtype))
f.close()
[docs]
def bytes2np(buf, shape, dtype='<f'):
"""read ar from bytestring
Parameters
----------
buf: bytes, str
shape: tuple
array shape
dtype: str
argument to pass to np.dtype()
Returns
-------
np.array
"""
dt = np.dtype(dtype)
return np.frombuffer(buf, dtype=dt).reshape(*shape)
[docs]
def bytefile2np(f, shape, dtype='<f'):
"""read binary data from f
Parameters
----------
f: IOBase
file object to read array from
shape: tuple
array shape
dtype: str
argument to pass to np.dtype()
Returns
-------
ar.shape
necessary for reconstruction
"""
return bytes2np(f.read(), shape, dtype)
def _array2hdr(ar, imgf, header, pval, clean=False):
"""write 2d np.array to hdr image format
Parameters
----------
ar: np.array
image array
imgf: str
file path to write
header: list
list of header lines to append to image header
pval: str
pvalue command
Returns
-------
imgf
"""
if imgf is None:
f = None
else:
f = open(imgf, 'wb')
if header is not None:
if clean:
header = clean_header(header)
hdr = "' '".join(header)
getinfo = shlex.split(f"getinfo -a '{hdr}'")
p = Popen(pval.split(), stdin=PIPE, stdout=PIPE)
q = Popen(getinfo, stdin=p.stdout, stdout=f)
else:
p = Popen(pval.split(), stdin=PIPE, stdout=f)
q = p
p.stdin.write(np2bytes(ar))
p.stdin.flush()
q.communicate()
try:
f.close()
except AttributeError:
pass
return imgf
[docs]
def array2hdr(ar, imgf, header=None, clean=False):
"""write 2d np.array (x,y) to hdr image format
Parameters
----------
ar: np.array
image array
imgf: str
file path to right
header: list
list of header lines to append to image header
Returns
-------
imgf
"""
if len(ar.shape) > 2:
return carray2hdr(ar, imgf, header, clean=clean)
pval = f'pvalue -r -b -h -H -df -o -y {ar.shape[-1]} +x {ar.shape[-2]}'
return _array2hdr(ar.T[::-1], imgf, header, pval, clean=clean)
[docs]
def carray2hdr(ar, imgf, header=None, clean=False):
"""write color channel np.array (3, x, y) to hdr image format
Parameters
----------
ar: np.array
image array
imgf: str
file path to right
header: list
list of header lines to append to image header
Returns
-------
imgf
"""
pval = f'pvalue -r -h -H -df -o -y {ar.shape[-1]} +x {ar.shape[-2]}'
return _array2hdr(ar.T[::-1], imgf, header, pval, clean=clean)
def _hdr_in(pval, imgf, stdin):
p = Popen(shlex.split(pval), stdin=stdin, stdout=PIPE, stderr=PIPE)
shape = p.stdout.readline().strip().split()
try:
shape = (int(shape[-3]), int(shape[-1]))
except IndexError:
if imgf == "":
imgf = "-"
if os.path.isfile(imgf):
raise ValueError(f"Bad HDR file '{imgf}',\n"
f"pvalue msg: {p.stderr}")
else:
raise ValueError(f"HDR image file '{imgf}' not found,\n"
f"pvalue msg: {p.stderr}")
return p.stdout.read(), shape
[docs]
def hdr2array(imgf, stdin=None, header=False, vlambda=None):
"""read np.array from hdr image
Parameters
----------
imgf: file path of image
stdin:
passed to Popen (imgf should be "")
Returns
-------
ar: np.array
"""
if vlambda is not None:
imgd = hdr2carray(imgf, stdin)
imgd = np.einsum("ijk,i->jk", imgd, vlambda)
else:
pval = f'pvalue -b -h -df -o {imgf}'
imgd = bytes2np(*_hdr_in(pval, imgf, stdin)).T[:, ::-1]
if header:
return imgd, hdr_header(imgf)
return imgd
[docs]
def hdr2carray(imgf, stdin=None, header=False):
"""read np.array from color hdr image
Parameters
----------
imgf: file path of image
stdin:
passed to Popen (imgf should be "")
Returns
-------
ar: np.array
"""
pval = f'pvalue -n -h -df -o {imgf}'
data, shape = _hdr_in(pval, imgf, stdin)
shape = (3, *shape)
imgd = np.transpose(bytes2np(data, shape)[:, ::-1], (0, 2, 1))
if header:
return imgd, hdr_header(imgf)
return imgd
[docs]
def header_items(header, items):
header = CleanHeader(header, headerwidth=4096).header
items = [i.lower() for i in items]
out = [""] * len(items)
for line in header:
cl = line.strip()
if re.match(r".+:$", cl):
continue
sep = None
if "=" in cl:
sep = "="
try:
key, val = cl.split(sep, 1)
except ValueError:
key = cl
val = ''
key = key.lower()
if key in items:
out[items.index(key)] = val.strip()
return out
[docs]
def clean_header(header):
"""remove redundant entries from radiance image header, updating view,
purging pvalue and clasp_tmp file names"""
return CleanHeader(header).header
[docs]
def hdr_header(imgf, clean=False, items=None):
p = Popen(shlex.split(f"getinfo {imgf}"), stdout=PIPE,
stderr=PIPE).communicate()
err = p[1]
try:
err = err.decode("utf-8")
except AttributeError:
pass
if "bad header!" in err:
raise IOError(f"{err} - wrong file type?")
try:
header = p[0].decode("utf-8")
except UnicodeDecodeError:
raise IOError(f"{err} - wrong file type?")
if "cannot open" in header:
raise FileNotFoundError(f"{imgf} not found")
header = [i for i in header.strip().splitlines() if not re.match(r".*#?RADIANCE.*", i)]
if items is not None:
header = header_items(header, items=items)
elif clean:
header = clean_header(header)
return header
[docs]
def rgb2rad(rgb, vlambda=(0.265, 0.670, 0.065)):
return np.einsum('...j,j->...', rgb, vlambda)
[docs]
def rgb2lum(rgb, vlambda=(0.265, 0.670, 0.065)):
return np.einsum('...j,j->...', rgb, np.array(vlambda) * 179)
[docs]
def rgbe2lum(rgbe):
"""
convert from Radiance hdr rgbe 4-byte data format to floating point
luminance.
Parameters
----------
rgbe: np.array
r,g,b,e unsigned integers according to:
http://radsite.lbl.gov/radiance/refer/filefmts.pdf
Returns
-------
lum: luminance in cd/m^2
"""
v = np.power(2., rgbe[:, 3] - 128).reshape(-1, 1) / 256
rgb = np.where(rgbe[:, 0:3] == 0, 0, (rgbe[:, 0:3] + 0.5) * v)
# luminance = 179 * (0.265*R + 0.670*G + 0.065*B)
return rgb2lum(rgb)
[docs]
def load_txt(farray, **kwargs):
"""consistent error handing of np.loadtxt
Parameters
----------
farray: any
candidate to load
kwargs:
passed to np.loadtxt
Returns
-------
np.array
Raises
------
ValueError:
file exists, but is not loadable
FileNotFoundError:
farray is str, but file does not exist
TypeError:
farray is not str or bytes.
"""
if isinstance(farray, (str, bytes)):
if os.path.isfile(farray):
try:
return np.loadtxt(farray, **kwargs)
except (ValueError, AttributeError):
raise ValueError
else:
raise FileNotFoundError(f"{farray}")
else:
raise TypeError
[docs]
class CleanHeader(object):
"""takes lists of rgbe hdr format header lines (with tabbed hierarchy)
and processes to group redundant information and avoid overly long lines
(which throw errors with pfs tools). After initialization (or subsequent
call to text setter, the 'header' parameter holds the cleaned header result
as a list of header lines.
Parameters
----------
text: Union(str, list)
the header to process
spacespertab: int, optional
convert leading spaces to tabs
outtab: str, optional
output tab
headerwidth: int, optional
"""
def __init__(self, text, spacespertab=4, outtab="\t", headerwidth=150):
self.spacespertab = spacespertab
self.outtab = outtab
self.headerwidth = headerwidth
self._header = None
self.text = text
@property
def header(self):
"""the output header"""
return self._header
@property
def text(self):
"""text version of the input header"""
return self._text
@text.setter
def text(self, t):
"""process text into output header"""
if type(t) != str:
t = "\n".join(t)
self._text = t
data, view = self._preprocess()
outdata = self._clean_up(data, depth=-1)
hdr = []
for i in outdata:
try:
key, _ = i.strip().split("=", 1)
indent = re.match(r"\s*", i).group() + key + "= "
except ValueError:
indent = re.match(r"\s*", i).group() + " ... "
hdr.append(textwrap.fill(i, expand_tabs=False,
replace_whitespace=False,
drop_whitespace=False,
width=self.headerwidth,
subsequent_indent=indent))
header = "\n".join(hdr).splitlines()
if view is not None:
header.append(f"VIEW={view}")
self._header = header
def _preprocess(self, text=None):
"""process indentation into nested file hierarchy each file lines
creates a dictionary with 'content', 'depth', and 'name'. other
lines are filtered and stored as str items in the content list."""
if text is not None:
self._text = text
files = []
view = None # only store last found view
vals = [] # use to check for redundant lines
cs = [] # the current stack of file nesting
lastkey = ""
for i, h in enumerate(self._text.splitlines()):
cl = h.strip()
cl = re.sub(r"\S*clasp_tmp[^\s:]*", "<stdin>", cl)
inset = re.match(r"\s*", h).group()
inset = inset.replace(" " * self.spacespertab,
"\t").replace(" ", "")
depth = len(inset)
if re.match(r"(.+):$", cl):
curf = cl.rsplit("/", 1)[-1][:-1]
fi = dict(name=curf, depth=depth, content=[])
if len(cs) == 0:
cs.append(fi)
files.append(fi)
elif depth > cs[-1]['depth']:
cs[-1]['content'].append(fi)
cs.append(fi)
else:
le = depth - cs[-1]['depth']
cs = cs[:-(le+1)]
if len(cs) == 0:
cs.append(fi)
files.append(fi)
else:
cs[-1]['content'].append(fi)
cs.append(fi)
continue
sep = None
if "=" in cl:
sep = "="
try:
key, val = cl.split(sep, 1)
except ValueError:
key = cl
val = ''
if key in ["pvalue", "FORMAT"]: # don't care about this
continue
if sep is None:
sep = " "
if key == "VIEW": # only store last view
view = val
continue
if len(cs) > 0:
curf = cs[-1]['content']
else:
curf = files
if key == "..." or key == lastkey and len(curf) > 0: # unwrap previously cleaned header
curf[-1] += " " + val.lstrip()
vals[-1] += " " + val.lstrip()
elif cl in vals: # promote
idx = self._pop_and_promote(files, cl)
if idx is not None:
f = files
k = 0
for j in idx[:-2]:
k += 1
f = f[j]['content']
f.insert(idx[k], cl)
else:
vals.append(f"{key}{sep}{val}")
if len(cs) > 0 and depth <= cs[-1]['depth']:
le = cs[-1]['depth'] - depth
cs = cs[:-(le + 1)]
if len(cs) > 0:
cs[-1]['content'].append(vals[-1])
else:
files.append(vals[-1])
lastkey = key
return files, view
def _pop_and_promote(self, data, val, idx=None):
"""recursive function that pops existing value out of list and reutrns
idx (at eack depth level of data) where the value was. Used to take
duplicate header lines and promote them up to a higher tab level"""
if idx is None:
idx = []
for i, v in enumerate(data):
if type(v) == str:
if v == val:
if len(idx) == 0:
return None
data.pop(i)
return idx + [i]
continue
return self._pop_and_promote(v['content'], val, idx + [i])
return idx
def _clean_up(self, data, depth=0, da=0):
"""recursive function to format the prepocessed data into a new
header"""
outdata = []
for v in data:
if type(v) == str:
indent = self.outtab * (depth + 1 - da)
outdata.append(f"{indent}{v}")
elif len(v['content']) == 0:
continue
elif len(v['content']) == 1 and type(v['content'][0]) != str:
outdata += self._clean_up(v['content'], v['depth'], da+1)
elif np.all([type(i) != str for i in v['content']]):
outdata += self._clean_up(v['content'], v['depth'], da + 1)
else:
indent = self.outtab * (v['depth'] - da)
outdata.append(indent + v['name'] + ":")
outdata += self._clean_up(v['content'], v['depth'], da)
return outdata