Source code for resqpy.surface._pointset

"""PointSet class based on RESQML standard."""

# RMS and ROXAR are registered trademarks of Roxar Software Solutions AS, an Emerson company
# GOCAD is also a trademark of Emerson

import logging

log = logging.getLogger(__name__)

import numpy as np

import resqpy.crs as rcrs
import resqpy.olio.uuid as bu
import resqpy.olio.vector_utilities as vec
import resqpy.olio.write_hdf5 as rwh5
import resqpy.olio.xml_et as rqet
import resqpy.property as rqp
import resqpy.surface
import resqpy.surface._base_surface as rqsb
from resqpy.olio.xml_namespaces import curly_namespace as ns


class PointSet(rqsb.BaseSurface):
    """Class for RESQML Point Set Representation within resqpy model object."""  # TODO: Work in Progress

    resqml_type = 'PointSetRepresentation'

[docs] def __init__(self, parent_model, uuid = None, load_hdf5 = False, points_array = None, crs_uuid = None, polyset = None, polyline = None, random_point_count = None, charisma_file = None, irap_file = None, title = None, originator = None, extra_metadata = None, remove_trailing_999 = True): """Creates an empty Point Set object and optionally populates from xml or other source. arguments: parent_model (model.Model object): the model to which the new point set belongs uuid (uuid.UUID, optional): if present, the object is populated from the RESQML PointSetRepresentation with this uuid load_hdf5 (boolean, default False): if True and uuid is present, the actual points are pre-loaded into a numpy array; otherwise the points will be loaded on demand points_array (numpy float array of shape (..., 2 or 3), optional): if present, the xy(&z) data which will constitute the point set; missing z will be set to zero; ignored if uuid is not None crs_uuid (uuid.UUID, optional): if present, identifies the coordinate reference system for the points; ignored if uuid is not None; if None, 'imported' points will be associated with the default crs of the parent model polyset (optional): if present, creates a pointset from points in a polylineset polyline (optional): if present and random_point_count is None or zero, creates a pointset from points in a polyline; if present and random_point_count is set, creates random points within the (closed, convex) polyline random_point_count (int, optional): if present and polyline is present then the number of random points to generate within the (closed) polyline in the xy plane, with z set to 0.0 charisma_file (optional): if present, creates a pointset from a charisma 3d interpretation file irap_file (optional): if present, creates a pointset from an IRAP classic points format file title (str, optional): the citation title to use for a new point set; ignored if uuid is not None originator (str, optional): the name of the person creating the point set, defaults to login id; ignored if uuid is not None extra_metadata (dict, optional): string key, value pairs to add as extra metadata for the point set; ignored if uuid is not None remove_trailing_999 (bool, default True): if True, when loading from an irap_file, if the last point in the file is triple 999.0, then it is excluded from the point set returns: newly created PointSet object :meta common: """ self.crs_uuid = crs_uuid self.patch_count = None self.patch_ref_list = [] # ordered list of (patch hdf5 ext uuid, path in hdf5, point count) self.patch_array_list = [] # ordered list of numpy float arrays (or None before loading), each of shape (N, 3) self.full_array = None # composite points (all patches) self.represented_interpretation_root = None self.property_collection = None super().__init__(model = parent_model, uuid = uuid, title = title, originator = originator, extra_metadata = extra_metadata) if self.root is not None: if load_hdf5: self.load_all_patches() elif points_array is not None: assert self.crs_uuid is not None, 'missing crs uuid when establishing point set from array' self.add_patch(points_array) elif polyline is not None: # Points from or within polyline self.from_polyline(polyline, random_point_count) elif polyset is not None: # Points from polylineSet self.from_polyset(polyset) elif charisma_file is not None: # Points from Charisma 3D interpretation lines self.from_charisma(charisma_file) elif irap_file is not None: # Points from IRAP simple points self.from_irap(irap_file, remove_trailing_999) if not self.title: self.title = 'point set'
def _load_from_xml(self): assert self.root is not None self.patch_count = rqet.count_tag(self.root, 'NodePatch') assert self.patch_count, 'no patches found in xml for point set' self.patch_array_list = [None for _ in range(self.patch_count)] patch_index = 0 for child in rqet.list_of_tag(self.root, 'NodePatch'): point_count = rqet.find_tag_int(child, 'Count') geom_node = rqet.find_tag(child, 'Geometry') assert geom_node is not None, 'geometry missing in xml for point set patch' crs_uuid = rqet.find_nested_tags_text(geom_node, ['LocalCrs', 'UUID']) assert crs_uuid, 'crs uuid missing in geometry xml for point set patch' self.check_crs_match(crs_uuid) ext_uuid = rqet.find_nested_tags_text(geom_node, ['Points', 'Coordinates', 'HdfProxy', 'UUID']) assert ext_uuid, 'missing hdf5 uuid in geometry xml for point set patch' hdf5_path = rqet.find_nested_tags_text(geom_node, ['Points', 'Coordinates', 'PathInHdfFile']) assert hdf5_path, 'missing internal hdf5 path in geometry xml for point set patch' self.patch_ref_list.append((ext_uuid, hdf5_path, point_count)) patch_index += 1 ref_node = rqet.find_tag(self.root, 'RepresentedInterpretation') if ref_node is not None: interp_root = self.model.referenced_node(ref_node) self.set_represented_interpretation_root(interp_root) # note: load of patches handled elsewhere
[docs] def from_charisma(self, charisma_file): """Instantiate a pointset using points from an input charisma file arguments: charisma_file: a charisma 3d interpretation file """ with open(charisma_file, 'r') as surf: cpoints = np.loadtxt(surf, usecols = (6, 7, 8), encoding = 'uft-8') self.add_patch(cpoints) assert self.crs_uuid is not None, 'crs uuid missing when establishing point set from charisma file' if not self.title: self.title = charisma_file
[docs] def from_irap(self, irap_file, remove_trailing_999 = True): """Instantiate a pointset using points from an input irap file. arguments: irap_file (str): an existing IRAP classic points format file remove_trailing_999 (bool, default True): if True, and the last point in the irap file is triple 999.0, then it is excluded """ with open(irap_file, 'r') as points: cpoints = np.loadtxt(points, encoding = 'uft-8') assert len(cpoints), f'no points loaded from irap file: {irap_file}' assert cpoints.ndim == 2 and cpoints.shape[1] == 3 if remove_trailing_999 and np.all(np.isclose(cpoints[-1], 999.0)): self.add_patch(cpoints[:-1]) else: self.add_patch(cpoints) assert self.crs_uuid is not None, 'crs uuid missing when establishing point set from irap file' if not self.title: self.title = irap_file
[docs] def from_polyline(self, polyline, random_point_count = None): """Instantiates a pointset using points from an input polyline (PolylineRepresentation) object arguments: polyline (resqpy.lines.Polyline object): if random_point_count is None or zero, creates a pointset from points in a polyline; if present and random_point_count is set, creates random points within the (closed, convex) polyline random_point_count (int, optional): if present then the number of random points to generate within the (closed) polyline in the xy plane, with z set to 0.0 """ if random_point_count: assert polyline.is_convex() points = np.zeros((random_point_count, 3)) points[:, :2] = np.random.random((random_point_count, 2)) for p_i in range(random_point_count): points[p_i, :2] = polyline.denormalised_xy(points[p_i, 0], points[p_i, 1]) self.add_patch(points) else: self.add_patch(polyline.coordinates) if polyline.rep_int_root is not None: self.set_represented_interpretation_root(polyline.rep_int_root) self.check_crs_match(polyline.crs_uuid) if not self.title: self.title = polyline.title
[docs] def from_polyset(self, polyset): """Instantiates a pointset using points from an input polylineset (PolylineSetRepresentation) object arguments: polyset (resqpy.lines.PolylineSet object): a polylineset object to generate the pointset from """ master_crs = rcrs.Crs(self.model, uuid = polyset.polys[0].crs_uuid) if polyset.polys[0].isclosed and vec.isclose(polyset.polys[0].coordinates[0], polyset.polys[0].coordinates[-1]): poly_coords = polyset.polys[0].coordinates[:-1].copy() else: poly_coords = polyset.polys[0].coordinates.copy() for poly in polyset.polys[1:]: curr_crs = rcrs.Crs(self.model, uuid = poly.crs_uuid) assert master_crs is not None assert poly_coords is not None if not curr_crs.is_equivalent(master_crs): shifted = curr_crs.convert_array_to(master_crs, poly.coordinates) poly_coords = concat_polyset_points(poly.isclosed, shifted, poly_coords) else: poly_coords = concat_polyset_points(poly.isclosed, poly.coordinates, poly_coords) self.add_patch(poly_coords) if polyset.rep_int_root is not None: self.set_represented_interpretation_root(polyset.rep_int_root) self.check_crs_match(master_crs.uuid) if not self.title: self.title = polyset.title
[docs] def check_crs_match(self, crs_uuid): """Checks if the point set's crs_uuid matches a given crs_uuid. notes: if the current model crs_uuid is None, the new crs_uuid is used as the model crs_uuid; if a mismatch is found an assertion error is raised """ if self.crs_uuid is None: self.crs_uuid = crs_uuid else: assert bu.matching_uuids(crs_uuid, self.crs_uuid), 'mixed coordinate reference systems in point set'
[docs] def set_represented_interpretation_root(self, interp_root): """Makes a note of the xml root of the represented interpretation.""" self.represented_interpretation_root = interp_root
[docs] def single_patch_array_ref(self, patch_index): """Load numpy array for one patch of the point set from hdf5, cache and return it.""" assert 0 <= patch_index < self.patch_count, 'point set patch index out of range' if self.patch_array_list[patch_index] is not None: return self.patch_array_list[patch_index] h5_key_pair = (self.patch_ref_list[patch_index][0], self.patch_ref_list[patch_index][1] ) # ext uuid, path in hdf5 try: self.model.h5_array_element(h5_key_pair, cache_array = True, object = self, array_attribute = 'temp_points', dtype = 'float') except Exception: log.exception('hdf5 points failure for point set patch ' + str(patch_index)) assert self.temp_points.ndim == 2 and self.temp_points.shape[1] == 3, \ 'unsupported dimensionality to points array' self.patch_array_list[patch_index] = self.temp_points.copy() delattr(self, 'temp_points') return self.patch_array_list[patch_index]
[docs] def load_all_patches(self): """Load hdf5 data for all patches and cache as separate numpy arrays; not usually called directly.""" for patch_index in range(self.patch_count): self.single_patch_array_ref(patch_index)
[docs] def change_crs(self, required_crs): """Changes the crs of the point set, also sets a new uuid if crs changed. notes: this method is usually used to change the coordinate system for a temporary resqpy object; to add as a new part, call write_hdf5() and create_xml() methods """ old_crs = rcrs.Crs(self.model, uuid = self.crs_uuid) self.crs_uuid = required_crs.uuid if required_crs == old_crs or not self.patch_ref_list: log.debug(f'no crs change needed for {self.title}') return log.debug(f'crs change needed for {self.title} from {old_crs.title} to {required_crs.title}') self.load_all_patches() self.patch_ref_list = [] for patch_points in self.patch_array_list: required_crs.convert_array_from(old_crs, patch_points) self.patch_ref_list.append((None, None, len(patch_points))) self.full_array = None # clear cached full array for point set self.uuid = bu.new_uuid() # hope this doesn't cause problems
[docs] def trim_to_xyz_box(self, xyz_box): """Converts point set to a single patch, holding only those points within the xyz box. arguments: xyz_box (numpy float array of shape (2, 3)): the minimum and maximum range to keep in x,y,z notes: usually used to reduce the point set volume for a temprary object; a new uuid is assigned; to add as a new part, call write_hdf5() and create_xml() methods """ points = self.full_array_ref() keep_mask = np.where( np.logical_and(np.all(points >= np.expand_dims(xyz_box[0], axis = 0), axis = -1), np.all(points <= np.expand_dims(xyz_box[1], axis = 0), axis = -1))) self.patch_count = 0 self.patch_ref_list = [] self.patch_array_list = [] self.full_array = None self.add_patch(points[keep_mask, :].copy()) self.uuid = bu.new_uuid() # hope this doesn't cause problems
[docs] def trim_to_xy_polygon(self, xy_polygon): """Converts point set to a single patch, holding only those points within the polygon when projected in xy. arguments: xy_polygon (closed convex resqpy.lines.Polyline): the polygon outlining the area in xy within which points are kept notes: usually used to reduce the point set volume for a temprary object; a new uuid is assigned; to add as a new part, call write_hdf5() and create_xml() methods """ points = self.full_array_ref() keep_mask = xy_polygon.points_are_inside_xy(points) self.patch_count = 0 self.patch_ref_list = [] self.patch_array_list = [] self.full_array = None self.add_patch(points[keep_mask, :].copy()) self.uuid = bu.new_uuid() # hope this doesn't cause problems
[docs] def minimum_xy_area_rectangle(self, delta_theta = 5.0): """Returns the xy projection rectangle of minimum area that contains the points. arguments: delta_theta (float, default 5.0): the incremental angle in degrees to test against returns: (d1, d2, r) where d1 and d2 are lengths of sides of an xy plane rectangle that just contains the points, and d1 <= d2, and r is a bearing in degrees of a d2 (longer) side between 0.0 and 180.0 """ def try_angle(pset, theta): p = pset.full_array_ref() m = vec.rotation_matrix_3d_axial(2, theta) p = vec.rotate_array(m, p)[:, :2] dxy = np.nanmax(p, axis = 0) - np.nanmin(p, axis = 0) return dxy assert self.patch_count > 0 theta = 0.0 min_dxy = try_angle(self, theta) min_area = min_dxy[0] * min_dxy[1] min_theta = theta while theta < 180.0 - delta_theta: theta += delta_theta dxy = try_angle(self, theta) area = dxy[0] * dxy[1] if area < min_area: min_dxy = dxy.copy() min_area = area min_theta = theta if min_dxy[0] <= min_dxy[1]: return min_dxy[0], min_dxy[1], min_theta return min_dxy[1], min_dxy[0], min_theta + 90.0 if min_theta < 90.0 else min_theta - 90.0
[docs] def full_array_ref(self): """Return a single numpy float array of shape (N, 3) containing all points from all patches. :meta common: """ if self.full_array is not None: return self.full_array self.load_all_patches() if self.patch_count == 1: # optimisation, as usually the case self.full_array = self.patch_array_list[0] return self.full_array point_count = 0 for patch_index in range(self.patch_count): point_count += self.patch_ref_list[patch_index][2] self.full_array = np.empty((point_count, 3)) full_index = 0 for patch_index in range(self.patch_count): self.full_array[full_index:full_index + self.patch_ref_list[patch_index][2]] = self.patch_array_list[patch_index] full_index += self.patch_ref_list[patch_index][2] assert full_index == point_count, 'point count mismatch when constructing full array for point set' return self.full_array
[docs] def add_patch(self, points_array): """Extend the current point set with a new patch of points.""" assert points_array.ndim >= 2 and points_array.shape[-1] in [2, 3] if points_array.shape[-1] == 2: shape = list(points_array.shape) shape[-1] = 3 p = np.zeros(shape) p[..., :2] = points_array points_array = p self.patch_array_list.append(points_array.reshape(-1, 3).copy()) self.patch_ref_list.append((None, None, points_array.size // 3)) self.full_array = None if self.patch_count is None: self.patch_count = 0 self.patch_count += 1
[docs] def extract_property_collection(self, refresh = False): """Returns a property collection for the point set. arguments: refresh (bool, default False): if True, the property collection is refreshed from the current state of the model; if False and the property collection has already been cached for the point set, then the cached copy is used returns: a PropertyCollection holding those properties in the model where this point set is the supporting representation note: point set properties have indexable element of 'nodes' """ if self.property_collection is None or refresh: self.property_collection = rqp.PropertyCollection(support = self) return self.property_collection
[docs] def write_hdf5(self, file_name = None, mode = 'a'): """Create or append to an hdf5 file, writing datasets for the point set patches after caching arrays. :meta common: """ if not file_name: file_name = self.model.h5_file_name() if self.uuid is None: self.uuid = bu.new_uuid() # NB: patch arrays must all have been set up prior to calling this function h5_reg = rwh5.H5Register(self.model) for patch_index in range(self.patch_count): h5_reg.register_dataset(self.uuid, 'points_{}'.format(patch_index), self.patch_array_list[patch_index]) h5_reg.write(file_name, mode = mode)
[docs] def create_xml(self, ext_uuid = None, add_as_part = True, add_relationships = True, title = None, originator = None): """Creates a point set representation xml node from this point set object and optionally adds as part of model. arguments: ext_uuid (uuid.UUID): the uuid of the hdf5 external part holding the points array(s) add_as_part (boolean, default True): if True, the newly created xml node is added as a part in the model add_relationships (boolean, default True): if True, a relationship xml part is created relating the new point set part to the crs part (and optional interpretation part) title (string): used as the citation Title text; should be meaningful to a human originator (string, optional): the name of the human being who created the point set representation part; default is to use the login name returns: the newly created point set representation xml node :meta common: """ if ext_uuid is None: ext_uuid = self.model.h5_uuid() ps_node = super().create_xml(add_as_part = False, title = title, originator = originator) if self.crs_uuid is None: self.crs_uuid = self.model.crs_uuid # maverick use of model's default crs if self.represented_interpretation_root is not None: interp_root = self.represented_interpretation_root interp_uuid = bu.uuid_from_string(interp_root.attrib['uuid']) interp_part = self.model.part_for_uuid(interp_uuid) self.model.create_ref_node('RepresentedInterpretation', rqet.find_nested_tags_text(interp_root, ['Citation', 'Title']), interp_uuid, content_type = self.model.type_of_part(interp_part), root = ps_node) for patch_index in range(self.patch_count): p_node = rqet.SubElement(ps_node, ns['resqml2'] + 'NodePatch') p_node.set(ns['xsi'] + 'type', ns['resqml2'] + 'NodePatch') p_node.text = '\n' pi_node = rqet.SubElement(p_node, ns['resqml2'] + 'PatchIndex') pi_node.set(ns['xsi'] + 'type', ns['xsd'] + 'nonNegativeInteger') pi_node.text = str(patch_index) ct_node = rqet.SubElement(p_node, ns['resqml2'] + 'Count') ct_node.set(ns['xsi'] + 'type', ns['xsd'] + 'positiveInteger') ct_node.text = str(self.patch_ref_list[patch_index][2]) geom = rqet.SubElement(p_node, ns['resqml2'] + 'Geometry') geom.set(ns['xsi'] + 'type', ns['resqml2'] + 'PointGeometry') geom.text = '\n' self.model.create_crs_reference(crs_uuid = self.crs_uuid, root = geom) points_node = rqet.SubElement(geom, ns['resqml2'] + 'Points') points_node.set(ns['xsi'] + 'type', ns['resqml2'] + 'Point3dHdf5Array') points_node.text = '\n' coords = rqet.SubElement(points_node, ns['resqml2'] + 'Coordinates') coords.set(ns['xsi'] + 'type', ns['eml'] + 'Hdf5Dataset') coords.text = '\n' self.model.create_hdf5_dataset_ref(ext_uuid, self.uuid, 'points_{}'.format(patch_index), root = coords) if add_as_part: self.model.add_part('obj_PointSetRepresentation', self.uuid, ps_node) if add_relationships: # todo: add multiple crs'es (one per patch)? crs_root = self.model.root_for_uuid(self.crs_uuid) self.model.create_reciprocal_relationship(ps_node, 'destinationObject', crs_root, 'sourceObject') if self.represented_interpretation_root is not None: self.model.create_reciprocal_relationship(ps_node, 'destinationObject', self.represented_interpretation_root, 'sourceObject') ext_part = rqet.part_name_for_object('obj_EpcExternalPartReference', ext_uuid, prefixed = False) ext_node = self.model.root_for_part(ext_part) self.model.create_reciprocal_relationship(ps_node, 'mlToExternalPartProxy', ext_node, 'externalPartProxyToMl') return ps_node
[docs] def convert_to_charisma(self, file_name): """Output to Charisma 3D interpretation format from a pointset. If file_name exists, it will be overwritten. args: file_name: output file name to save to """ # hznname = self.title.replace(" ","_") lines = [] self.load_all_patches() for patch in self.patch_array_list: for points in patch: lines.append(f"INLINE :\t1 XLINE :\t1\t{points[0]}\t{points[1]}\t{points[2]}\n") with open(file_name, 'w') as f: for item in lines: f.write(item)
[docs] def convert_to_irap(self, file_name): """Output to IRAP simple points format from a pointset. If file_name exists, it will be overwritten. args: file_name: output file name to save to """ # hznname = self.title.replace(" ","_") lines = [] self.load_all_patches() for patch in self.patch_array_list: for points in patch: lines.append(f"{points[0]} {points[1]} {points[2]}\n") with open(file_name, 'w') as f: for item in lines: f.write(item)
def concat_polyset_points(closed, coordinates, poly_coords): """Concatenates two numpy arrays of coordinates for polylines, omitting the final (duplicated) point in a polyline if the polyline is closed. arguments: closed (bool): True if polyline is closed coordinates (np.array): Coordinates to be concatenated poly_coords (np.array): Array to concatenate the given coordinates to returns: poly_coords (np.array) with additional coordinates concatenated """ if closed and vec.isclose(coordinates[0], coordinates[-1]): poly_coords = np.concatenate((poly_coords, coordinates[:-1])) else: poly_coords = np.concatenate((poly_coords, coordinates)) return poly_coords