"""_polyline_set.py: Resqml polyline set module."""
import logging
log = logging.getLogger(__name__)
import os
import numpy as np
import resqpy.crs as rcrs
import resqpy.lines
import resqpy.olio.simple_lines as rsl
import resqpy.olio.uuid as bu
import resqpy.olio.vector_utilities as vu
import resqpy.olio.write_hdf5 as rwh5
import resqpy.olio.xml_et as rqet
import resqpy.lines._common as rql_c
from resqpy.olio.xml_namespaces import curly_namespace as ns
class PolylineSet(rql_c._BasePolyline):
"""Class for RESQML polyline set representation."""
resqml_type = 'PolylineSetRepresentation'
[docs] def __init__(self,
parent_model,
uuid = None,
polylines = None,
irap_file = None,
charisma_file = None,
crs_uuid = None,
title = None,
originator = None,
extra_metadata = None):
"""Initialises a new PolylineSet object.
arguments:
parent_model (model.Model object): the model which the new PolylineSetRepresentation belongs to
uuid (uuid.UUID, optional): the uuid of an existing RESQML PolylineSetRepresentation object from
which to initialise this resqpy PolylineSet
polylines (optional): list of polyline objects from which to build the polylineset
irap_file (str, optional): the name of a file in irap format from which to import the polyline set
charisma_file (str, optional): the name of a file in charisma format from which to import the polyline set
crs_uuid (UUID, optional): required if loading from an IRAP or Charisma file
title (str, optional): the citation title to use for a new polyline set;
ignored if uuid is not None
originator (str, optional): the name of the person creating the polyline set, defaults to login id;
ignored if uuid is not None
extra_metadata (dict, optional): string key, value pairs to add as extra metadata for the polyline set;
ignored if uuid is not None
returns:
the newly instantiated PolylineSet object
note:
this initialiser does not perform any conversion between CRSes; if loading from a list of Polylines,
they must all share a common CRS (which must also match crs_uuid if supplied)
:meta common:
"""
self.model = parent_model
self.coordinates = None
self.count_perpol = None
self.polys = []
self.rep_int_root = None
self.save_polys = False
self.boolnotconstant = None
self.boolvalue = None
self.closed_array = None
self.crs_uuid = crs_uuid
self.indices = None
super().__init__(model = parent_model,
uuid = uuid,
title = title,
originator = originator,
extra_metadata = extra_metadata)
if self.root is not None:
return
if polylines is not None: # Create from list of polylines
crs_list = [] if crs_uuid is None else [bu.uuid_from_string(crs_uuid)]
for poly in polylines:
crs_list.append(poly.crs_uuid)
crs_set = set(crs_list)
# following assumes consistent use of UUID or str
assert len(crs_set) == 1, 'More than one CRS found in input polylines for polyline set'
for crs_uuid in crs_set:
self.crs_uuid = crs_uuid
if self.crs_uuid is not None:
break
self.polys = polylines
# Setting the title of the first polyline given as the PolylineSet title
if not self.title:
if len(polylines) > 1:
self.title = f"{polylines[0].title} + {len(polylines)-1} polylines"
else:
self.title = polylines[0].title
elif irap_file is not None: # Create from an input IRAP file
if crs_uuid is None:
log.warning(f'applying generic model CRS when loading polylines from IRAP file: {irap_file}')
self._set_from_irap(irap_file)
elif charisma_file is not None:
if crs_uuid is None:
log.warning(f'applying generic model CRS when loading polylines from Charisma file: {charisma_file}')
self._set_from_charisma(charisma_file)
def _load_from_xml(self):
assert self.root is not None # polyline set xml node specified
root = self.root
self.rep_int_root = self.model.referenced_node(rqet.find_tag(root, 'RepresentedInterpretation'))
self.closed_array = np.empty((0,), dtype = bool)
for patch_node in rqet.list_of_tag(root, 'LinePatch'): # Loop over all LinePatches - likely just the one
assert patch_node is not None # Required field
geometry_node = rqet.find_tag(patch_node, 'Geometry')
assert geometry_node is not None # Required field
crs_root = self.model.referenced_node(rqet.find_tag(geometry_node, 'LocalCrs'))
assert crs_root is not None # Required field
uuid = rqet.uuid_for_part_root(crs_root)
assert uuid is not None # Required field
if self.crs_uuid is not None:
assert bu.matching_uuids(self.crs_uuid, uuid), 'mixed CRS uuids in use in PolylineSet'
self.crs_uuid = uuid
closed_node = rqet.find_tag(patch_node, 'ClosedPolylines')
assert closed_node is not None # Required field
# The ClosedPolylines could be a BooleanConstantArray, or a BooleanArrayFromIndexArray
closed_array = self.get_bool_array(closed_node)
count_node = rqet.find_tag(patch_node, 'NodeCountPerPolyline')
rql_c.load_hdf5_array(self, count_node, 'count_perpol', tag = 'Values', dtype = 'int')
points_node = rqet.find_tag(geometry_node, 'Points')
assert points_node is not None # Required field
rql_c.load_hdf5_array(self, points_node, 'coordinates', tag = 'Coordinates')
# Check that the number of bools aligns with the number of count_perpoly
# Check that the total of the count_perpoly aligns with the number of coordinates
assert len(self.count_perpol) == len(closed_array)
assert np.sum(self.count_perpol) == len(self.coordinates)
subpolys = self.convert_to_polylines(closed_array, self.count_perpol, self.coordinates, self.crs_uuid,
self.rep_int_root)
# Check we have the right number of polygons
assert len(subpolys) == len(self.count_perpol)
# Remove duplicate coordinates and count arrays (exist in polylines now)
# delattr(self,'coordinates')
# delattr(self,'count_perpol')
self.polys.extend(subpolys)
self.closed_array = np.concatenate((self.closed_array, closed_array))
self.bool_array_format()
def _set_from_irap(self, irap_file):
inpoints = rsl.read_lines(irap_file)
self.count_perpol = []
closed_array = []
if not self.title:
self.title = os.path.basename(irap_file).split(".")[0]
for i, poly in enumerate(inpoints):
if len(poly) > 1: # Polylines must have at least 2 points
self.count_perpol.append(len(poly))
if vu.isclose(poly[0], poly[-1]):
closed_array.append(True)
else:
closed_array.append(False)
if i == 0:
self.coordinates = poly
else:
self.coordinates = np.concatenate((self.coordinates, poly))
self.count_perpol = np.array(self.count_perpol)
if self.crs_uuid is None: # If no crs_uuid is provided, assume the main model crs is valid
self.crs_uuid = self.model.crs_uuid
self.polys = self.convert_to_polylines(closed_array, self.count_perpol, self.coordinates, self.crs_uuid,
self.rep_int_root)
def _set_from_charisma(self, charisma_file):
with open(charisma_file) as f:
inpoints = f.readlines()
self.count_perpol = []
closed_array = []
if not self.title:
self.title = os.path.basename(charisma_file).split(".")[0]
for i, line in enumerate(inpoints):
line = line.split()
coord_entry = np.array([[float(line[3]), float(line[4]), float(line[5])]])
if i == 0:
self.coordinates = coord_entry
stick = line[7]
count = 1
else:
self.coordinates = np.concatenate((self.coordinates, coord_entry))
count += 1
if stick != line[7] or i == len(inpoints) - 1:
if count <= 2 and stick != line[7]: # Line has fewer than 2 points
log.warning(f"Polylines must contain at least 2 points - ignoring point {self.coordinates[-2]}")
self.coordinates = np.delete(self.coordinates, -2, 0) # Remove the second to last entry
else:
self.count_perpol.append(count - 1)
closed_array.append(False)
count = 1
stick = line[7]
self.count_perpol = np.array(self.count_perpol)
if self.crs_uuid is None: # If no crs_uuid is provided, assume the main model crs is valid
self.crs_uuid = self.model.crs_uuid
self.polys = self.convert_to_polylines(closed_array, self.count_perpol, self.coordinates, self.crs_uuid,
self.rep_int_root)
[docs] def poly_index_containing_point_in_xy(self, p, mode = 'crossing'):
"""Returns the index of the first (closed) polyline containing point p in the xy plane, or None.
:meta common:
"""
assert mode in ['crossing', 'winding'], 'unrecognised mode when looking for polygon containing point'
for i, poly in enumerate(self.polys):
if not poly.isclosed:
continue
if poly.point_is_inside_xy(p, mode = mode):
return i
return None
[docs] def create_xml(self,
ext_uuid = None,
add_as_part = True,
add_relationships = True,
title = None,
originator = None,
save_polylines = False):
"""Create xml from polylineset.
args:
save_polylines: If true, polylines are also saved individually
:meta common:
"""
if ext_uuid is None:
ext_uuid = self.model.h5_uuid()
self.save_polys = save_polylines
if title:
self.title = title
if not self.title:
self.title = 'polyline set'
if self.save_polys:
for poly in self.polys:
poly.create_xml(ext_uuid, add_relationships = add_relationships, originator = originator)
polyset = super().create_xml(add_as_part = False, originator = originator)
if self.rep_int_root is not None:
rep_int = self.rep_int_root
if "FaultInterpretation" in str(rqet.content_type(rep_int)):
content_type = 'obj_FaultInterpretation'
else:
content_type = 'obj_HorizonInterpretation'
self.model.create_ref_node('RepresentedInterpretation',
rqet.citation_title_for_node(rep_int),
rep_int.attrib['uuid'],
content_type = content_type,
root = polyset)
# We convert all Polylines to the CRS of the first Polyline in the set, so set this as crs_uuid
patch = rqet.SubElement(polyset, ns['resqml2'] + 'LinePatch')
patch.set(ns['xsi'] + 'type', ns['resqml2'] + 'PolylineSetPatch')
patch.text = '\n'
pindex = rqet.SubElement(patch, ns['resqml2'] + 'PatchIndex')
pindex.set(ns['xsi'] + 'type', ns['xsd'] + 'nonNegativeInteger')
pindex.text = '0'
if self.boolnotconstant:
# We have mixed data - use a BooleanArrayFromIndexArray
closed = rqet.SubElement(patch, ns['resqml2'] + 'ClosedPolylines')
closed.set(ns['xsi'] + 'type', ns['xsd'] + 'BooleanArrayFromIndexArray')
closed.text = '\n'
bool_val = rqet.SubElement(closed, ns['resqml2'] + 'IndexIsTrue')
bool_val.set(ns['xsi'] + 'type', ns['xsd'] + 'boolean')
bool_val.text = str(not self.boolvalue).lower()
count = rqet.SubElement(closed, ns['resqml2'] + 'Count')
count.set(ns['xsi'] + 'type', ns['xsd'] + 'positiveInteger')
count.text = str(len(self.count_perpol))
ind_val = rqet.SubElement(closed, ns['resqml2'] + 'Indices')
ind_val.set(ns['xsi'] + 'type', ns['resqml2'] + 'IntegerHdf5Array')
ind_val.text = '\n'
iv_null = rqet.SubElement(ind_val, ns['resqml2'] + 'NullValue')
iv_null.set(ns['xsi'] + 'type', ns['xsd'] + 'integer')
iv_null.text = '-1'
iv_values = rqet.SubElement(ind_val, ns['resqml2'] + 'Values')
iv_values.set(ns['xsi'] + 'type', ns['eml'] + 'Hdf5Dataset')
iv_values.text = '\n'
self.model.create_hdf5_dataset_ref(ext_uuid, self.uuid, 'indices_patch0', root = iv_values)
else:
# All bools are the same - use a BooleanConstantArray
closed = rqet.SubElement(patch, ns['resqml2'] + 'ClosedPolylines')
closed.set(ns['xsi'] + 'type', ns['resqml2'] + 'BooleanConstantArray')
closed.text = '\n'
bool_val = rqet.SubElement(closed, ns['resqml2'] + 'Value')
bool_val.set(ns['xsi'] + 'type', ns['xsd'] + 'boolean')
bool_val.text = str(self.boolvalue).lower()
count = rqet.SubElement(closed, ns['resqml2'] + 'Count')
count.set(ns['xsi'] + 'type', ns['xsd'] + 'positiveInteger')
count.text = str(len(self.count_perpol))
count_pp = rqet.SubElement(patch, ns['resqml2'] + 'NodeCountPerPolyline')
count_pp.set(ns['xsi'] + 'type', ns['resqml2'] + 'IntegerHdf5Array')
count_pp.text = '\n'
null = rqet.SubElement(count_pp, ns['resqml2'] + 'NullValue')
null.set(ns['xsi'] + 'type', ns['xsd'] + 'integer')
null.text = '0'
count_val = rqet.SubElement(count_pp, ns['resqml2'] + 'Values')
count_val.set(ns['xsi'] + 'type', ns['eml'] + 'Hdf5Dataset')
count_val.text = '\n'
self.model.create_hdf5_dataset_ref(ext_uuid, self.uuid, 'NodeCountPerPolyline_patch0', root = count_val)
geom = rqet.SubElement(patch, ns['resqml2'] + 'Geometry')
geom.set(ns['xsi'] + 'type', ns['resqml2'] + 'PointGeometry')
geom.text = '\n'
self.model.create_crs_reference(crs_uuid = self.crs_uuid, root = geom)
points = rqet.SubElement(geom, ns['resqml2'] + 'Points')
points.set(ns['xsi'] + 'type', ns['resqml2'] + 'Point3dHdf5Array')
points.text = '\n'
coords = rqet.SubElement(points, ns['resqml2'] + 'Coordinates')
coords.set(ns['xsi'] + 'type', ns['eml'] + 'Hdf5Dataset')
coords.text = '\n'
self.model.create_hdf5_dataset_ref(ext_uuid, self.uuid, 'points_patch0', root = coords)
if add_as_part:
self.model.add_part('obj_PolylineSetRepresentation', self.uuid, polyset)
if add_relationships:
crs_root = self.model.root_for_uuid(self.crs_uuid)
self.model.create_reciprocal_relationship(polyset, 'destinationObject', crs_root, 'sourceObject')
if self.rep_int_root is not None: # Optional
self.model.create_reciprocal_relationship(polyset, 'destinationObject', self.rep_int_root,
'sourceObject')
if self.save_polys:
for poly in self.polys:
self.model.create_reciprocal_relationship(polyset, 'destinationObject', poly.root_node,
'sourceObject')
ext_part = rqet.part_name_for_object('obj_EpcExternalPartReference', ext_uuid, prefixed = False)
ext_node = self.model.root_for_part(ext_part)
self.model.create_reciprocal_relationship(polyset, 'mlToExternalPartProxy', ext_node,
'externalPartProxyToMl')
return polyset
[docs] def write_hdf5(self, file_name = None, mode = 'a', save_polylines = False):
"""Create or append the coordinates, counts and indices hdf5 arrays to hdf5 file.
:meta common:
"""
if self.uuid is None:
self.uuid = bu.new_uuid()
self.combine_polylines(self.polys)
self.bool_array_format()
self.save_polys = save_polylines
if self.save_polys:
for poly in self.polys:
poly.write_hdf5(file_name)
h5_reg = rwh5.H5Register(self.model)
h5_reg.register_dataset(self.uuid, 'points_patch0', self.coordinates)
h5_reg.register_dataset(self.uuid, 'NodeCountPerPolyline_patch0', self.count_perpol.astype(np.int32))
if self.boolnotconstant:
h5_reg.register_dataset(self.uuid, 'indices_patch0', np.array(self.indices, dtype = np.int32))
h5_reg.write(file_name, mode = mode)
[docs] def get_bool_array(self, closed_node):
# TODO: check for other permissible forms of the abstract boolean array
"""Returns a boolean array using details in the xml node location.
arguments:
closed_node (xml node): the node under which the boolean array information sits
returns:
1D numpy bool array set True for those pplylines within the set which are marked as closed
"""
# if type of boolean array is BooleanConstantArray, uses the array value and count to generate
# the array; if type of boolean array is BooleanArrayFromIndexArray, finds the "other" value
# bool and indices of the "other" values, and inserts these into an array opposite to the main bool
flavour = rqet.node_type(closed_node)
if flavour == 'BooleanConstantArray':
count = rqet.find_tag_int(closed_node, 'Count')
value = rqet.bool_from_text(rqet.node_text(rqet.find_tag(closed_node, 'Value')))
return np.full((count,), value, dtype = bool)
elif flavour == 'BooleanArrayFromIndexArray':
count = rqet.find_tag_int(closed_node, 'Count')
indices_node = rqet.find_tag(closed_node, 'Indices')
assert indices_node is not None
indices_arr = rql_c.load_hdf5_array(self, indices_node, 'indices_arr', tag = 'Values', dtype = 'int')
istrue = rqet.bool_from_text(rqet.node_text(rqet.find_tag(closed_node, 'IndexIsTrue')))
out = np.full((count,), not istrue, dtype = bool)
out[indices_arr] = istrue
return out
raise ValueError(f'unrecognised closed array xml node type: {flavour}')
[docs] def convert_to_polylines(self,
closed_array = None,
count_perpol = None,
coordinates = None,
crs_uuid = None,
rep_int_root = None):
"""Returns a list of Polylines objects from a PolylineSet.
note:
all arguments are optional and by default the data will be taken from self
args:
closed_array: array containing a bool for each polygon in if it is open (False) or closed (True)
count_perpol: array containing a list of polygon "lengths" for each polygon
coordinates: array containing coordinates for all the polygons
crs_uuid: crs_uuid for polylineset
rep_int_root: represented interpretation root (optional)
returns:
list of polyline objects
:meta common:
"""
from resqpy.lines._polyline import Polyline
if count_perpol is None:
count_perpol = self.count_perpol
if closed_array is None:
closed_array = np.zeros(len(count_perpol), dtype = bool)
closed_node = rqet.find_nested_tags(self.root, ['LinePatch', 'ClosedPolylines'])
if closed_node is not None:
closed_array[:] = self.get_bool_array(closed_node)
if coordinates is None:
coordinates = self.coordinates
if crs_uuid is None:
crs_uuid = self.crs_uuid
if rep_int_root is None:
rep_int_root = self.rep_int_root
polys = []
count = 0
for i in range(len(count_perpol)):
if i != len(count_perpol) - 1:
subset = coordinates[count:int(count_perpol[i]) + count].copy()
else:
subset = coordinates[count:int(count_perpol[i]) + count + 1].copy()
if vu.isclose(subset[0], subset[-1]):
isclosed = True
else:
isclosed = closed_array[i]
count += int(count_perpol[i])
subtitle = f"{self.title} {i+1}"
polys.append(
Polyline(self.model,
is_closed = isclosed,
set_coord = subset,
set_crs = crs_uuid,
title = subtitle,
rep_int_root = rep_int_root))
return polys
[docs] def combine_polylines(self, polylines):
"""Combines the isclosed boolean array, coordinates and count data for a list of polyline objects.
args:
polylines: list of polyline objects
"""
self.count_perpol = []
self.closed_array = []
for poly in polylines:
if poly == polylines[0]:
master_crs = rcrs.Crs(self.model, uuid = poly.crs_uuid)
self.crs_uuid = poly.crs_uuid
self.coordinates = poly.coordinates.copy()
else:
curr_crs = rcrs.Crs(self.model, uuid = poly.crs_uuid)
if not curr_crs.is_equivalent(master_crs):
shifted = curr_crs.convert_array_to(master_crs, poly.coordinates)
self.coordinates = np.concatenate((self.coordinates, shifted))
else:
self.coordinates = np.concatenate((self.coordinates, poly.coordinates))
self.closed_array.append(poly.isclosed)
self.count_perpol.append(int(len(poly.coordinates)))
self.count_perpol = np.array(self.count_perpol)
assert len(self.closed_array) == len(self.count_perpol)
assert np.sum(self.count_perpol) == len(self.coordinates)
self.polys = polylines
[docs] def set_interpretation_root(self, rep_int_root, recursive = True):
"""Updates the rep_int_root for the polylineset.
args:
rep_int_root: new rep_int_root
recursive: boolean, if true will update individual polys with same root
"""
self.rep_int_root = rep_int_root
if recursive:
for poly in self.polys:
poly.rep_int_root = rep_int_root
[docs] def convert_to_irap(self, file_name):
"""Output an irap file from a polyline set.
If file_name exists, it will be overwritten.
args:
file_name: output file name for polyline set representation
"""
end_of_line = np.array([[999.0, 999.0, 999.0]])
for poly in self.polys:
if poly == self.polys[0]:
out_coords = poly.coordinates
else:
out_coords = np.concatenate((out_coords, poly.coordinates))
out_coords = np.concatenate((out_coords, end_of_line))
np.savetxt(file_name, out_coords, delimiter = ' ')
[docs] def convert_to_charisma(self, file_name):
"""Output to Charisma fault sticks from a polyline set.
If file_name exists, it will be overwritten.
args:
file_name: output file name for polyline set representation
"""
faultname = self.title.replace(" ", "_") if self.title else 'fault'
lines = []
for i, poly in enumerate(self.polys):
for point in poly.coordinates:
lines.append(f"INLINE-\t0\t0\t{point[0]}\t{point[1]}\t{point[2]}\t{faultname}\t{i+1}\n")
with open(file_name, 'w') as f:
for item in lines:
f.write(item)