Source code for resqpy.lines._polyline_set

"""_polyline_set.py: Resqml polyline set module."""

import logging

log = logging.getLogger(__name__)

import os
import numpy as np

import resqpy.crs as rcrs
import resqpy.lines
import resqpy.property as rqp
import resqpy.olio.simple_lines as rsl
import resqpy.olio.uuid as bu
import resqpy.olio.vector_utilities as vu
import resqpy.olio.write_hdf5 as rwh5
import resqpy.olio.xml_et as rqet
import resqpy.lines._common as rql_c
from resqpy.olio.xml_namespaces import curly_namespace as ns


class PolylineSet(rql_c._BasePolyline):
    """Class for RESQML polyline set representation."""

    resqml_type = 'PolylineSetRepresentation'

[docs] def __init__(self, parent_model, uuid = None, polylines = None, irap_file = None, charisma_file = None, crs_uuid = None, title = None, originator = None, extra_metadata = None): """Initialises a new PolylineSet object. arguments: parent_model (model.Model object): the model which the new PolylineSetRepresentation belongs to uuid (uuid.UUID, optional): the uuid of an existing RESQML PolylineSetRepresentation object from which to initialise this resqpy PolylineSet polylines (optional): list of polyline objects from which to build the polylineset irap_file (str, optional): the name of a file in irap format from which to import the polyline set charisma_file (str, optional): the name of a file in charisma format from which to import the polyline set crs_uuid (UUID, optional): required if loading from an IRAP or Charisma file title (str, optional): the citation title to use for a new polyline set; ignored if uuid is not None originator (str, optional): the name of the person creating the polyline set, defaults to login id; ignored if uuid is not None extra_metadata (dict, optional): string key, value pairs to add as extra metadata for the polyline set; ignored if uuid is not None returns: the newly instantiated PolylineSet object note: this initialiser does not perform any conversion between CRSes; if loading from a list of Polylines, they must all share a common CRS (which must also match crs_uuid if supplied) :meta common: """ self.model = parent_model self.coordinates = None self.count_perpol = None self.polys = [] self.rep_int_root = None self.save_polys = False self.boolnotconstant = None self.boolvalue = None self.closed_array = None self.crs_uuid = crs_uuid self.indices = None self.property_collection = None super().__init__(model = parent_model, uuid = uuid, title = title, originator = originator, extra_metadata = extra_metadata) if self.root is not None: return if polylines is not None: # Create from list of polylines crs_list = [] if crs_uuid is None else [bu.uuid_from_string(crs_uuid)] for poly in polylines: crs_list.append(poly.crs_uuid) crs_set = set(crs_list) # following assumes consistent use of UUID or str assert len(crs_set) == 1, 'More than one CRS found in input polylines for polyline set' for crs_uuid in crs_set: self.crs_uuid = crs_uuid if self.crs_uuid is not None: break self.polys = polylines # Setting the title of the first polyline given as the PolylineSet title if not self.title: if len(polylines) > 1: self.title = f"{polylines[0].title} + {len(polylines)-1} polylines" else: self.title = polylines[0].title self.combine_polylines(polylines) elif irap_file is not None: # Create from an input IRAP file if crs_uuid is None: log.warning(f'applying generic model CRS when loading polylines from IRAP file: {irap_file}') self._set_from_irap(irap_file) elif charisma_file is not None: if crs_uuid is None: log.warning(f'applying generic model CRS when loading polylines from Charisma file: {charisma_file}') self._set_from_charisma(charisma_file)
def _load_from_xml(self): assert self.root is not None # polyline set xml node specified root = self.root self.rep_int_root = self.model.referenced_node(rqet.find_tag(root, 'RepresentedInterpretation')) self.closed_array = np.empty((0,), dtype = bool) for patch_node in rqet.list_of_tag(root, 'LinePatch'): # Loop over all LinePatches - likely just the one assert patch_node is not None # Required field geometry_node = rqet.find_tag(patch_node, 'Geometry') assert geometry_node is not None # Required field crs_root = self.model.referenced_node(rqet.find_tag(geometry_node, 'LocalCrs')) assert crs_root is not None # Required field uuid = rqet.uuid_for_part_root(crs_root) assert uuid is not None # Required field if self.crs_uuid is not None: assert bu.matching_uuids(self.crs_uuid, uuid), 'mixed CRS uuids in use in PolylineSet' self.crs_uuid = uuid closed_node = rqet.find_tag(patch_node, 'ClosedPolylines') assert closed_node is not None # Required field # The ClosedPolylines could be a BooleanConstantArray, or a BooleanArrayFromIndexArray closed_array = self.get_bool_array(closed_node) count_node = rqet.find_tag(patch_node, 'NodeCountPerPolyline') rql_c.load_hdf5_array(self, count_node, 'count_perpol', tag = 'Values', dtype = 'int') points_node = rqet.find_tag(geometry_node, 'Points') assert points_node is not None # Required field rql_c.load_hdf5_array(self, points_node, 'coordinates', tag = 'Coordinates') # Check that the number of bools aligns with the number of count_perpoly # Check that the total of the count_perpoly aligns with the number of coordinates assert len(self.count_perpol) == len(closed_array) assert np.sum(self.count_perpol) == len(self.coordinates) subpolys = self.convert_to_polylines(closed_array, self.count_perpol, self.coordinates, self.crs_uuid, self.rep_int_root) # Check we have the right number of polygons assert len(subpolys) == len(self.count_perpol) # Remove duplicate coordinates and count arrays (exist in polylines now) # delattr(self,'coordinates') # delattr(self,'count_perpol') self.polys.extend(subpolys) self.closed_array = np.concatenate((self.closed_array, closed_array)) self.bool_array_format() def _set_from_irap(self, irap_file): inpoints = rsl.read_lines(irap_file) self.count_perpol = [] closed_array = [] if not self.title: self.title = os.path.basename(irap_file).split(".")[0] for i, poly in enumerate(inpoints): if len(poly) > 1: # Polylines must have at least 2 points self.count_perpol.append(len(poly)) if vu.isclose(poly[0], poly[-1]): closed_array.append(True) else: closed_array.append(False) if i == 0: self.coordinates = poly else: self.coordinates = np.concatenate((self.coordinates, poly)) self.count_perpol = np.array(self.count_perpol) if self.crs_uuid is None: # If no crs_uuid is provided, assume the main model crs is valid self.crs_uuid = self.model.crs_uuid self.polys = self.convert_to_polylines(closed_array, self.count_perpol, self.coordinates, self.crs_uuid, self.rep_int_root) def _set_from_charisma(self, charisma_file): with open(charisma_file) as f: inpoints = f.readlines() self.count_perpol = [] closed_array = [] if not self.title: self.title = os.path.basename(charisma_file).split(".")[0] for i, line in enumerate(inpoints): line = line.split() coord_entry = np.array([[float(line[3]), float(line[4]), float(line[5])]]) if i == 0: self.coordinates = coord_entry stick = line[7] count = 1 else: self.coordinates = np.concatenate((self.coordinates, coord_entry)) count += 1 if stick != line[7] or i == len(inpoints) - 1: if count <= 2 and stick != line[7]: # Line has fewer than 2 points log.warning(f"Polylines must contain at least 2 points - ignoring point {self.coordinates[-2]}") self.coordinates = np.delete(self.coordinates, -2, 0) # Remove the second to last entry else: self.count_perpol.append(count - 1) closed_array.append(False) count = 1 stick = line[7] self.count_perpol = np.array(self.count_perpol) if self.crs_uuid is None: # If no crs_uuid is provided, assume the main model crs is valid self.crs_uuid = self.model.crs_uuid self.polys = self.convert_to_polylines(closed_array, self.count_perpol, self.coordinates, self.crs_uuid, self.rep_int_root)
[docs] def poly_index_containing_point_in_xy(self, p, mode = 'crossing'): """Returns the index of the first (closed) polyline containing point p in the xy plane, or None. :meta common: """ assert mode in ['crossing', 'winding'], 'unrecognised mode when looking for polygon containing point' for i, poly in enumerate(self.polys): if not poly.isclosed: continue if poly.point_is_inside_xy(p, mode = mode): return i return None
[docs] def create_xml(self, ext_uuid = None, add_as_part = True, add_relationships = True, title = None, originator = None, save_polylines = False): """Create xml from polylineset. args: save_polylines: If true, polylines are also saved individually :meta common: """ if ext_uuid is None: ext_uuid = self.model.h5_uuid() self.save_polys = save_polylines if title: self.title = title if not self.title: self.title = 'polyline set' if self.save_polys: for poly in self.polys: poly.create_xml(ext_uuid, add_relationships = add_relationships, originator = originator) polyset = super().create_xml(add_as_part = False, originator = originator) if self.rep_int_root is not None: rep_int = self.rep_int_root if "FaultInterpretation" in str(rqet.content_type(rep_int)): content_type = 'obj_FaultInterpretation' else: content_type = 'obj_HorizonInterpretation' self.model.create_ref_node('RepresentedInterpretation', rqet.citation_title_for_node(rep_int), rep_int.attrib['uuid'], content_type = content_type, root = polyset) # We convert all Polylines to the CRS of the first Polyline in the set, so set this as crs_uuid patch = rqet.SubElement(polyset, ns['resqml2'] + 'LinePatch') patch.set(ns['xsi'] + 'type', ns['resqml2'] + 'PolylineSetPatch') patch.text = '\n' pindex = rqet.SubElement(patch, ns['resqml2'] + 'PatchIndex') pindex.set(ns['xsi'] + 'type', ns['xsd'] + 'nonNegativeInteger') pindex.text = '0' if self.boolnotconstant: # We have mixed data - use a BooleanArrayFromIndexArray closed = rqet.SubElement(patch, ns['resqml2'] + 'ClosedPolylines') closed.set(ns['xsi'] + 'type', ns['xsd'] + 'BooleanArrayFromIndexArray') closed.text = '\n' bool_val = rqet.SubElement(closed, ns['resqml2'] + 'IndexIsTrue') bool_val.set(ns['xsi'] + 'type', ns['xsd'] + 'boolean') bool_val.text = str(not self.boolvalue).lower() count = rqet.SubElement(closed, ns['resqml2'] + 'Count') count.set(ns['xsi'] + 'type', ns['xsd'] + 'positiveInteger') count.text = str(len(self.count_perpol)) ind_val = rqet.SubElement(closed, ns['resqml2'] + 'Indices') ind_val.set(ns['xsi'] + 'type', ns['resqml2'] + 'IntegerHdf5Array') ind_val.text = '\n' iv_null = rqet.SubElement(ind_val, ns['resqml2'] + 'NullValue') iv_null.set(ns['xsi'] + 'type', ns['xsd'] + 'integer') iv_null.text = '-1' iv_values = rqet.SubElement(ind_val, ns['resqml2'] + 'Values') iv_values.set(ns['xsi'] + 'type', ns['eml'] + 'Hdf5Dataset') iv_values.text = '\n' self.model.create_hdf5_dataset_ref(ext_uuid, self.uuid, 'indices_patch0', root = iv_values) else: # All bools are the same - use a BooleanConstantArray closed = rqet.SubElement(patch, ns['resqml2'] + 'ClosedPolylines') closed.set(ns['xsi'] + 'type', ns['resqml2'] + 'BooleanConstantArray') closed.text = '\n' bool_val = rqet.SubElement(closed, ns['resqml2'] + 'Value') bool_val.set(ns['xsi'] + 'type', ns['xsd'] + 'boolean') bool_val.text = str(self.boolvalue).lower() count = rqet.SubElement(closed, ns['resqml2'] + 'Count') count.set(ns['xsi'] + 'type', ns['xsd'] + 'positiveInteger') count.text = str(len(self.count_perpol)) count_pp = rqet.SubElement(patch, ns['resqml2'] + 'NodeCountPerPolyline') count_pp.set(ns['xsi'] + 'type', ns['resqml2'] + 'IntegerHdf5Array') count_pp.text = '\n' null = rqet.SubElement(count_pp, ns['resqml2'] + 'NullValue') null.set(ns['xsi'] + 'type', ns['xsd'] + 'integer') null.text = '0' count_val = rqet.SubElement(count_pp, ns['resqml2'] + 'Values') count_val.set(ns['xsi'] + 'type', ns['eml'] + 'Hdf5Dataset') count_val.text = '\n' self.model.create_hdf5_dataset_ref(ext_uuid, self.uuid, 'NodeCountPerPolyline_patch0', root = count_val) geom = rqet.SubElement(patch, ns['resqml2'] + 'Geometry') geom.set(ns['xsi'] + 'type', ns['resqml2'] + 'PointGeometry') geom.text = '\n' self.model.create_crs_reference(crs_uuid = self.crs_uuid, root = geom) points = rqet.SubElement(geom, ns['resqml2'] + 'Points') points.set(ns['xsi'] + 'type', ns['resqml2'] + 'Point3dHdf5Array') points.text = '\n' coords = rqet.SubElement(points, ns['resqml2'] + 'Coordinates') coords.set(ns['xsi'] + 'type', ns['eml'] + 'Hdf5Dataset') coords.text = '\n' self.model.create_hdf5_dataset_ref(ext_uuid, self.uuid, 'points_patch0', root = coords) if add_as_part: self.model.add_part('obj_PolylineSetRepresentation', self.uuid, polyset) if add_relationships: crs_root = self.model.root_for_uuid(self.crs_uuid) self.model.create_reciprocal_relationship(polyset, 'destinationObject', crs_root, 'sourceObject') if self.rep_int_root is not None: # Optional self.model.create_reciprocal_relationship(polyset, 'destinationObject', self.rep_int_root, 'sourceObject') if self.save_polys: for poly in self.polys: self.model.create_reciprocal_relationship(polyset, 'destinationObject', poly.root_node, 'sourceObject') ext_part = rqet.part_name_for_object('obj_EpcExternalPartReference', ext_uuid, prefixed = False) ext_node = self.model.root_for_part(ext_part) self.model.create_reciprocal_relationship(polyset, 'mlToExternalPartProxy', ext_node, 'externalPartProxyToMl') return polyset
[docs] def write_hdf5(self, file_name = None, mode = 'a', save_polylines = False): """Create or append the coordinates, counts and indices hdf5 arrays to hdf5 file. :meta common: """ if self.uuid is None: self.uuid = bu.new_uuid() self.combine_polylines(self.polys) self.bool_array_format() self.save_polys = save_polylines if self.save_polys: for poly in self.polys: poly.write_hdf5(file_name) h5_reg = rwh5.H5Register(self.model) h5_reg.register_dataset(self.uuid, 'points_patch0', self.coordinates) h5_reg.register_dataset(self.uuid, 'NodeCountPerPolyline_patch0', self.count_perpol.astype(np.int32)) if self.boolnotconstant: h5_reg.register_dataset(self.uuid, 'indices_patch0', np.array(self.indices, dtype = np.int32)) h5_reg.write(file_name, mode = mode)
[docs] def get_bool_array(self, closed_node): # TODO: check for other permissible forms of the abstract boolean array """Returns a boolean array using details in the xml node location. arguments: closed_node (xml node): the node under which the boolean array information sits returns: 1D numpy bool array set True for those pplylines within the set which are marked as closed """ # if type of boolean array is BooleanConstantArray, uses the array value and count to generate # the array; if type of boolean array is BooleanArrayFromIndexArray, finds the "other" value # bool and indices of the "other" values, and inserts these into an array opposite to the main bool flavour = rqet.node_type(closed_node) if flavour == 'BooleanConstantArray': count = rqet.find_tag_int(closed_node, 'Count') value = rqet.bool_from_text(rqet.node_text(rqet.find_tag(closed_node, 'Value'))) return np.full((count,), value, dtype = bool) elif flavour == 'BooleanArrayFromIndexArray': count = rqet.find_tag_int(closed_node, 'Count') indices_node = rqet.find_tag(closed_node, 'Indices') assert indices_node is not None indices_arr = rql_c.load_hdf5_array(self, indices_node, 'indices_arr', tag = 'Values', dtype = 'int') istrue = rqet.bool_from_text(rqet.node_text(rqet.find_tag(closed_node, 'IndexIsTrue'))) out = np.full((count,), not istrue, dtype = bool) out[indices_arr] = istrue return out raise ValueError(f'unrecognised closed array xml node type: {flavour}')
[docs] def convert_to_polylines(self, closed_array = None, count_perpol = None, coordinates = None, crs_uuid = None, rep_int_root = None): """Returns a list of Polylines objects from a PolylineSet. note: all arguments are optional and by default the data will be taken from self args: closed_array: array containing a bool for each polygon in if it is open (False) or closed (True) count_perpol: array containing a list of polygon "lengths" for each polygon coordinates: array containing coordinates for all the polygons crs_uuid: crs_uuid for polylineset rep_int_root: represented interpretation root (optional) returns: list of polyline objects :meta common: """ from resqpy.lines._polyline import Polyline if count_perpol is None: count_perpol = self.count_perpol if closed_array is None: closed_array = np.zeros(len(count_perpol), dtype = bool) closed_node = rqet.find_nested_tags(self.root, ['LinePatch', 'ClosedPolylines']) if closed_node is not None: closed_array[:] = self.get_bool_array(closed_node) else: closed_array = None if coordinates is None: coordinates = self.coordinates if crs_uuid is None: crs_uuid = self.crs_uuid if rep_int_root is None: rep_int_root = self.rep_int_root polys = [] count = 0 for i in range(len(count_perpol)): if i != len(count_perpol) - 1: subset = coordinates[count:int(count_perpol[i]) + count].copy() else: subset = coordinates[count:int(count_perpol[i]) + count + 1].copy() # is this correct? duplicated_end_points = np.all(np.isclose(subset[0], subset[-1])) and len(subset) > 2 if closed_array is None: isclosed = duplicated_end_points else: isclosed = closed_array[i] if isclosed and duplicated_end_points: subset = subset[:-1] count += int(count_perpol[i]) subtitle = f"{self.title} {i+1}" polys.append( Polyline(self.model, is_closed = isclosed, set_coord = subset, set_crs = crs_uuid, title = subtitle, rep_int_root = rep_int_root)) return polys
[docs] def combine_polylines(self, polylines): """Combines the isclosed boolean array, coordinates and count data for a list of polyline objects. args: polylines: list of polyline objects """ self.count_perpol = [] self.closed_array = [] for poly in polylines: if poly == polylines[0]: master_crs = rcrs.Crs(self.model, uuid = poly.crs_uuid) self.crs_uuid = poly.crs_uuid self.coordinates = poly.coordinates.copy() else: curr_crs = rcrs.Crs(self.model, uuid = poly.crs_uuid) if not curr_crs.is_equivalent(master_crs): shifted = curr_crs.convert_array_to(master_crs, poly.coordinates) self.coordinates = np.concatenate((self.coordinates, shifted)) else: self.coordinates = np.concatenate((self.coordinates, poly.coordinates)) self.closed_array.append(poly.isclosed) self.count_perpol.append(int(len(poly.coordinates))) self.count_perpol = np.array(self.count_perpol) assert len(self.closed_array) == len(self.count_perpol) assert np.sum(self.count_perpol) == len(self.coordinates) self.polys = polylines
[docs] def bool_array_format(self): """Determines an appropriate output boolean array format depending on the closed_array bools. self.boolnotconstant - set to True if all are not open or all closed self.boolvalue - value of isclosed for all polylines, or for the majority of polylines if mixed self.indices - array of indices where the values are not self.boolvalue, if the polylines are mixed """ assert self.closed_array is not None self.indices = [] self.boolnotconstant = False if all(self.closed_array): self.boolvalue = True elif not any(self.closed_array): self.boolvalue = False else: if np.count_nonzero(self.closed_array) > (len(self.closed_array) // 2): self.boolvalue = True for i, val in enumerate(self.closed_array): if not val: self.indices.append(i) else: self.boolvalue = False for i, val in enumerate(self.closed_array): if val: self.indices.append(i) self.boolnotconstant = True
[docs] def set_interpretation_root(self, rep_int_root, recursive = True): """Updates the rep_int_root for the polylineset. args: rep_int_root: new rep_int_root recursive: boolean, if true will update individual polys with same root """ self.rep_int_root = rep_int_root if recursive: for poly in self.polys: poly.rep_int_root = rep_int_root
[docs] def convert_to_irap(self, file_name): """Output an irap file from a polyline set. If file_name exists, it will be overwritten. args: file_name: output file name for polyline set representation """ end_of_line = np.array([[999.0, 999.0, 999.0]]) for poly in self.polys: if poly == self.polys[0]: out_coords = poly.coordinates else: out_coords = np.concatenate((out_coords, poly.coordinates)) out_coords = np.concatenate((out_coords, end_of_line)) np.savetxt(file_name, out_coords, delimiter = ' ')
[docs] def convert_to_charisma(self, file_name): """Output to Charisma fault sticks from a polyline set. If file_name exists, it will be overwritten. args: file_name: output file name for polyline set representation """ faultname = self.title.replace(" ", "_") if self.title else 'fault' lines = [] for i, poly in enumerate(self.polys): for point in poly.coordinates: lines.append(f"INLINE-\t0\t0\t{point[0]}\t{point[1]}\t{point[2]}\t{faultname}\t{i+1}\n") with open(file_name, 'w') as f: for item in lines: f.write(item)
[docs] def extract_property_collection(self, refresh = False): """Returns a property collection for the polyline set. arguments: refresh (bool, default False): if True, the property collection is refreshed from the current state of the model; if False and the property collection has already been cached for the polyline set, then the cached copy is used returns: a PropertyCollection holding those properties in the model where this polyline set is the supporting representation notes: polyline set properties may have indexable element of 'nodes' or 'intervals'; for interval properties, the number of elements per polyline depends on whether the polyline is closed or not; it is currently left to calling code build the composite property array from data for individual polylines, or to find the subset of the combined property data that is applicable to an individual polyline """ if self.property_collection is None or refresh: self.property_collection = rqp.PropertyCollection(support = self) return self.property_collection