Source code for fluidfoam.readpostpro

"""Read OpenFoam PostProcessing Files for Python
=======================================================================
This module provides functions to list and read OpenFoam PostProcessing Files:

.. autofunction:: readforce
.. autofunction:: readprobes
"""
import os
import numpy as np
from glob import glob


def _find_latesttime(path):
    dir_list = os.listdir(path)
    time_list = []

    for directory in dir_list:
        try:
            float(directory)
            time_list.append(directory)
        except:
            pass
    time_list.sort(key=float)
    return time_list[-1]


[docs]def readforce(path, namepatch="forces", time_name="0", name="forces"): """read the data contained in the force file . create the forces variables in the Forcesfile object Args: path: str\n namepatch: str\n time_name: str ('latestTime' and 'mergeTime' are supported)\n name: str Returns: array: array of force field; size of the array is the size of the time A way you might use me is:\n force = readforce(path='path_of_OpenFoam_case', namepatch='forces', time_name='0', name='forces') """ path_namepatch = glob(f'{path}/**/'+namepatch, recursive=True)[0] if time_name is "latestTime": time_name = _find_latesttime(path_namepatch) elif time_name is "mergeTime": time_list = [] dir_list = os.listdir(path_namepatch) for directory in dir_list: try: float(directory) time_list.append(directory) except: pass time_list.sort(key=float) time_list = np.array(time_list) for timename in time_list: tab = readforce(path, namepatch, timename, name) if "tab_merge" in locals(): for jj in range(np.size(tab[:, 0])): if tab[jj, 0] > tab_merge[-1, 0]: break else: continue if jj + 1 < np.size(tab[:, 0]): tab_merge = np.concatenate([tab_merge, tab[jj:, :]]) else: tab_merge = tab return tab_merge with open(os.path.join(path_namepatch, time_name, name + ".dat"), "rb") as f: content = f.read() data = content.split(b"\n") j = 0 header = True for dummy, line in enumerate(data[:-1]): if "#".encode() in line: j += 1 elif "#".encode() not in line and header is True: header = False line = line.replace(b"(", b"") line = line.replace(b")", b"") line = line.split() tab = np.zeros([len(data) - j - 1, len(line)], dtype=float) j = 0 tab[j, :] = np.array(line, dtype=float) else: j += 1 line = line.replace(b"(", b"") line = line.replace(b")", b"") line = line.split() tab[j, :] = np.array(line, dtype=float) return tab
[docs]def readprobes(path, probes_name="probes", time_name="0", name="U"): """read the data contained in the force file . create the forces variables in the Forcesfile object Args: path: str\n probes_name: str\n time_name: str ('latestTime' and 'mergeTime' are supported)\n name: str Returns: array: array of time values and array of probes data; A way you might use me is:\n probe_data = read('path_of_OpenFoam_case', '0', 'probes', 'U') """ time_vect = None tab = None jj = 0 probes_loc = None path_probes_name = glob(f'{path}/**/'+probes_name, recursive=True)[0] if time_name is "latestTime": time_name = _find_latesttime(path_probes_name) elif time_name is "mergeTime": time_list = [] dir_list = os.listdir(path_probes_name) for directory in dir_list: try: float(directory) time_list.append(directory) except: pass time_list.sort(key=float) time_list = np.array(time_list) for timename in time_list: probes_loc, time_vect, tab = readprobes( path, probes_name, timename, name) if "tab_merge" in locals(): for jj in range(np.size(time_vect[:])): if time_vect[jj] > timevect_merge[-1]: break else: continue if jj + 1 < np.size(time_vect[:]): timevect_merge = np.concatenate( [timevect_merge, time_vect[jj:]] ) tab_merge = np.concatenate([tab_merge, tab[jj:, :]]) else: timevect_merge = time_vect tab_merge = tab return probes_loc, timevect_merge, tab_merge with open(os.path.join(path_probes_name, time_name, name), "rb") as f: content = f.readlines() j = 0 for dummy, line in enumerate(content): if "#".encode() in line: j += 1 elif "#".encode() not in line: break n_probes = j-2 probes_loc = np.zeros([n_probes, 3], dtype=float) j = 0 header = True for dummy, line in enumerate(content): if "#".encode() in line: if j<n_probes: for k in range(3): probes_loc[j, k] = (line.split( b"(")[1].split(b")")[0].split()[k]) j += 1 elif "#".encode() not in line and header: header = False line = line.replace(b")", b"") line = line.split(b"(") try: dim = len(line[1].split()) except IndexError: dim = 1 line = line[0].split() time_vect = np.zeros(len(content) - j) time_vect[0] = line[0] tab = np.zeros([len(content) - j, len(line) - 1, dim], dtype=float) print( "Reading file " + os.path.join( path_probes_name, time_name, name ) ) print( str(len(line) - 1) + " probes over " + str(len(tab[:, 0, 0])) + " timesteps" ) for k, probedata in enumerate(line[1:]): values = probedata.split() for l, vect in enumerate(values): tab[0, k, l] = np.array(vect, dtype=float) j = 0 else: j += 1 line = line.replace(b")", b"") line = line.split(b"(") try: time_vect[j] = line[0] except ValueError: line = line[0].split() time_vect[j] = line[0] for k, probedata in enumerate(line[1:]): values = probedata.split() for l, vect in enumerate(values): tab[j, k, l] = np.array(vect, dtype=float) if time_vect is None: time_vect = np.array([]) if tab is None: tab = np.array([]) return probes_loc, time_vect, tab
if __name__ == "__main__": rep = "../output_samples/ascii" probes_loc, time_vect, dummy = readprobes(rep, time_name="latestTime")