# -*- coding: utf-8 -*-
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
This contains the code that does the actual unioning of regions.
"""
# TODO: Weak references for memory management problems?
# STDLIB
import weakref
# THIRD-PARTY
import numpy as np
# LOCAL
from spherical_geometry import great_circle_arc as gca
from spherical_geometry import vector
from spherical_geometry.polygon import (SingleSphericalPolygon, SphericalPolygon,
MalformedPolygonError)
__all__ = ['Graph']
# Set to True to enable some sanity checks
DEBUG = True
# The following two functions are called by sorted to provide a consistent
# ordering of nodes and edges retrieved from the graph, since values are
# retrieved from sets in an order that varies from run to run
def node_order(node):
return hash(tuple(node._point))
def edge_order(edge):
return node_order(edge._nodes[0]) + node_order(edge._nodes[1])
[docs]
class Graph:
"""
A graph of nodes connected by edges. The graph is used to build
unions between polygons.
.. note::
This class is not meant to be used directly. Instead, use
`~spherical_geometry.polygon.SphericalPolygon.union` and
`~spherical_geometry.polygon.SphericalPolygon.intersection`.
"""
class Node:
"""
A `~Graph.Node` represents a single point, connected by an arbitrary
number of `~Graph.Edge` objects to other `~Graph.Node` objects.
"""
def __init__(self, point, source_polygons=[]):
"""
Parameters
----------
point : 3-sequence (*x*, *y*, *z*) coordinate
source_polygon : `~spherical_geometry.polygon.SphericalPolygon` instance, optional
The polygon(s) this node came from. Used for bookkeeping.
"""
self._point = np.asanyarray(point)
self._source_polygons = set(source_polygons)
self._edges = weakref.WeakSet()
def __repr__(self):
return "Node(%s %d)" % (str(self._point), len(self._edges))
def equals(self, other, thresh=1.e-9):
"""
Returns `True` if the location of this and the *other*
`~Graph.Node` are the same.
Parameters
----------
other : `~Graph.Node` instance
The other node.
thres : float
If difference is smaller than this, points are equal.
The default value of 2e-8 radians is set based on
empirical test cases. Relative threshold based on
the actual sizes of polygons is not implemented.
"""
return np.array_equal(self._point, other._point)
class Edge:
"""
An `~Graph.Edge` represents a connection between exactly two
`~Graph.Node` objects. This `~Graph.Edge` class has no direction.
"""
def __init__(self, A, B, source_polygons=[]):
"""
Parameters
----------
A, B : `~Graph.Node` instances
source_polygon : `~spherical_geometry.polygon.SphericalPolygon` instance, optional
The polygon this edge came from. Used for bookkeeping.
"""
self._nodes = [A, B]
for node in self._nodes:
node._edges.add(self)
self._source_polygons = set(source_polygons)
def __repr__(self):
nodes = self._nodes
return "Edge(%s -> %s)" % (nodes[0]._point, nodes[1]._point)
def follow(self, node):
"""
Follow along the edge from the given *node* to the other
node.
Parameters
----------
node : `~Graph.Node` instance
Returns
-------
other : `~Graph.Node` instance
"""
nodes = self._nodes
try:
return nodes[not nodes.index(node)]
except IndexError:
raise RuntimeError("Following from disconnected node")
def equals(self, other):
"""
Returns `True` if the other edge is between the same two nodes.
Parameters
----------
other : `~Graph.Edge` instance
Returns
-------
equals : bool
"""
if (self._nodes[0].equals(other._nodes[0]) and
self._nodes[1].equals(other._nodes[1])):
return True
if (self._nodes[1].equals(other._nodes[0]) and
self._nodes[0].equals(other._nodes[1])):
return True
return False
def __init__(self, polygons):
"""
Parameters
----------
polygons : sequence of `~spherical_geometry.polygon.SphericalPolygon` instances
Build a graph from this initial set of polygons.
"""
self._nodes = set()
self._edges = set()
self._source_polygons = set()
self.add_polygons(polygons)
[docs]
def add_polygons(self, polygons):
"""
Add more polygons to the graph.
.. note::
Must be called before `union` or `intersection`.
Parameters
----------
polygons : sequence of `~spherical_geometry.polygon.SphericalPolygon` instances
Set of polygons to add to the graph
"""
for polygon in polygons:
self.add_polygon(polygon)
[docs]
def add_polygon(self, polygon):
"""
Add a single polygon to the graph.
.. note::
Must be called before `union` or `intersection`.
Parameters
----------
polygon : `~spherical_geometry.polygon.SphericalPolygon` instance
Polygon to add to the graph
"""
points = polygon._points
if len(points) < 3:
return
self._source_polygons.add(polygon)
start_node = nodeA = self._add_node(points[0], [polygon])
for i in range(1, len(points) - 1):
nodeB = self._add_node(points[i], [polygon])
# Don't create self-pointing edges
if nodeB is not nodeA:
self._add_edge(nodeA, nodeB, [polygon])
nodeA = nodeB
# Close the polygon
self._add_edge(nodeA, start_node, [polygon])
def _add_node(self, point, source_polygons=[]):
"""
Add a node to the graph. It will be disconnected until used
in a call to `_add_edge`.
Parameters
----------
point : 3-sequence (*x*, *y*, *z*) coordinate
source_polygon : `~spherical_geometry.polygon.SphericalPolygon` instance, optional
The polygon this node came from. Used for bookkeeping.
Returns
-------
node : `~Graph.Node` instance
The new node
"""
# Any nodes whose Cartesian coordinates are closer together
# than 2 ** -32 will cause numerical problems in the
# intersection calculations, so we merge any nodes that
# are closer together than that.
# Don't add nodes that already exist. Update the existing
# node's source_polygons list to include the new polygon.
point = vector.normalize_vector(point)
if len(self._nodes):
nodes = list(self._nodes)
node_array = np.array([node._point for node in nodes])
diff = np.all(np.abs(point - node_array) < 2 ** -32, axis=-1)
indices = np.nonzero(diff)[0]
if len(indices):
node = nodes[indices[0]]
node._source_polygons.update(source_polygons)
return node
new_node = self.Node(point, source_polygons)
self._nodes.add(new_node)
return new_node
def _remove_node(self, node):
"""
Removes a node and all of the edges that touch it.
.. note::
It is assumed that *Node* is already a part of the graph.
Parameters
----------
node : `~Graph.Node` instance
"""
for edge in list(node._edges):
nodeB = edge.follow(node)
nodeB._edges.remove(edge)
if len(nodeB._edges) == 0:
self._nodes.remove(nodeB)
self._edges.remove(edge)
if node in self._nodes:
self._nodes.remove(node)
def _add_edge(self, A, B, source_polygons=[]):
"""
Add an edge between two nodes.
.. note::
It is assumed both nodes already belong to the graph.
Parameters
----------
A, B : `~Graph.Node` instances
source_polygons : `~spherical_geometry.polygon.SphericalPolygon` instance, optional
The polygon(s) this edge came from. Used for bookkeeping.
Returns
-------
edge : `~Graph.Edge` instance
The new edge
"""
if A not in self._nodes or B not in self._nodes:
raise ValueError("Nodes not in the graph.")
# Don't add any edges that already exist. Update the edge's
# source polygons list to include the new polygon. Care needs
# to be taken here to not create an Edge until we know we need
# one, otherwise the Edge will get hooked up to the nodes but
# be orphaned.
for edge in self._edges:
if ((A is edge._nodes[0] and
B is edge._nodes[1]) or
(A is edge._nodes[1] and
B is edge._nodes[0])):
edge._source_polygons.update(source_polygons)
return edge
new_edge = self.Edge(A, B, source_polygons)
self._edges.add(new_edge)
return new_edge
def _remove_edge(self, edge):
"""
Remove an edge from the graph. The nodes it points to remain intact.
.. note::
It is assumed that *edge* is already a part of the graph.
Parameters
----------
edge : `~Graph.Edge` instance
"""
if edge not in self._edges:
raise ValueError("Edge not in the graph.")
A, B = edge._nodes
A._edges.remove(edge)
if len(A._edges) == 0:
self._remove_node(A)
if A is not B:
B._edges.remove(edge)
if len(B._edges) == 0:
self._remove_node(B)
self._edges.remove(edge)
def _split_edge(self, edge, node):
"""
Splits an `~Graph.Edge` *edge* at `~Graph.Node` *node*, removing
*edge* and replacing it with two new `~Graph.Edge` instances.
It is intended that *E* is along the original edge, but that is
not enforced.
Parameters
----------
edge : `~Graph.Edge` instance
The edge to split
node : `~Graph.Node` instance
The node to insert
Returns
-------
edgeA, edgeB : `~Graph.Edge` instances
The two new edges on either side of *node*.
"""
if edge not in self._edges or node not in self._nodes:
raise ValueError("Either node or edge not in the graph.")
A, B = edge._nodes
edgeA = self._add_edge(A, node, edge._source_polygons)
edgeB = self._add_edge(node, B, edge._source_polygons)
if edge not in (edgeA, edgeB):
self._remove_edge(edge)
return [edgeA, edgeB]
def _sanity_check(self, msg, node_is_2=False):
"""
For debugging purposes: assert that edges and nodes are
connected to each other correctly and there are no orphaned
edges or nodes.
"""
if not DEBUG:
return
unique_edges = set()
for edge in self._edges:
for node in edge._nodes:
if edge not in node._edges or node not in self._nodes:
raise MalformedPolygonError(msg)
edge_repr = [tuple(x._point) for x in edge._nodes]
edge_repr.sort()
edge_repr = tuple(edge_repr)
# assert edge_repr not in unique_edges
unique_edges.add(edge_repr)
for node in self._nodes:
if node_is_2:
if len(node._edges) % 2 != 0:
raise MalformedPolygonError(msg)
else:
if not len(node._edges) >= 2:
raise MalformedPolygonError(msg)
for edge in node._edges:
if node not in edge._nodes or edge not in self._edges:
raise MalformedPolygonError(msg)
[docs]
def union(self):
"""
Once all of the polygons have been added to the graph,
join the polygons together.
Returns
-------
points : Nx3 array of (*x*, *y*, *z*) points
This is a list of points outlining the union of the
polygons that were given to the constructor.
"""
self._find_all_intersections()
self._sanity_check("union: find all intersections")
self._remove_interior_edges()
self._sanity_check("union: remove interior edges")
self._remove_degenerate_edges()
self._sanity_check("union: remove degenerate edges")
self._remove_3ary_edges()
self._sanity_check("union: remove 3ary edges")
self._remove_orphaned_nodes()
self._sanity_check("union: remove orphan nodes", True)
poly = self._trace()
for source_poly in self._source_polygons:
inside_point = source_poly.inside
break
else:
inside_point = None
return SphericalPolygon((poly, ), inside=inside_point)
[docs]
def intersection(self):
"""
Once all of the polygons have been added to the graph,
calculate the intersection.
Returns
-------
points : Nx3 array of (*x*, *y*, *z*) points
This is a list of points outlining the intersection of the
polygons that were given to the constructor.
"""
self._find_all_intersections()
self._sanity_check("intersection: find all intersections")
self._remove_exterior_edges()
self._sanity_check("intersection: remove exterior edges")
self._remove_cut_lines()
self._sanity_check("intersection: remove cut lines")
self._remove_orphaned_nodes()
self._sanity_check("intersection: remove orphan nodes", True)
poly = self._trace()
# If multiple polygons, the inside point can only be in one
if len(poly._polygons) == 1 and not self._contains_inside_point(poly):
poly = poly.invert_polygon()
return poly
[docs]
def disjoint_polygons(self):
"""
Convert a graph containing cut lines and self intersections
into a list of disjoint polygons
"""
changed = self._remove_cut_lines()
self._sanity_check("disjoint: remove cut lines")
changed = self._find_all_intersections() or changed
self._sanity_check("disjoint: find all intersections")
if changed:
polygons = self._trace_polygons()
else:
polygons = list(self._source_polygons)
return polygons
def _remove_cut_lines(self):
"""
Removes any cutlines that may already have existed in the
input polygons. This is so any cutlines in the final result
will be optimized to be as short as possible and won't
intersect each other.
This works by finding coincident edges that are reverse to
each other, and then splicing around them.
"""
# As this proceeds, edges are removed from the graph. It
# iterates over a static list of all edges that exist at the
# start, so each time one is selected, we need to ensure it
# still exists as part of the graph.
# This transforms the following (where = is the cut line)
#
# \ /
# A' + + B'
# | |
# A +====================+ B
#
# D +====================+ C
# | |
# D' + + C'
# / \
#
# to this:
#
# \ /
# A' + + B'
# | |
# A + + C
# | |
# D' + + C'
# / \
#
cut_lines = []
changed = False
for edge in self._edges:
A, B = edge._nodes
if len(A._edges) == 3 and len(B._edges) == 3:
cut_lines.append(edge)
changed = True
for edge in cut_lines:
if edge in self._edges:
self._remove_edge(edge)
return changed
def _get_edge_points(self, edges):
return (np.array([x._nodes[0]._point for x in edges]),
np.array([x._nodes[1]._point for x in edges]))
def _find_point_to_arc_intersections(self):
# For speed, we want to vectorize all of the intersection
# calculations. Therefore, there is a list of edges, and an
# array of points for all of the nodes. Then calculating the
# intersection between an edge and all other nodes becomes a
# fast, vectorized operation.
edges = sorted(self._edges, key=edge_order)
starts, ends = self._get_edge_points(edges)
nodes = sorted(self._nodes, key=node_order)
nodes_array = np.array([x._point for x in nodes])
# Split all edges by any nodes that intersect them
changed = False
while len(edges) > 1:
AB = edges.pop(0)
A, B = list(AB._nodes)
intersects = gca.intersects_point(
A._point, B._point, nodes_array)
intersection_indices = np.nonzero(intersects)[0]
for index in intersection_indices:
node = nodes[index]
if node not in AB._nodes:
changed = True
newA, newB = self._split_edge(AB, node)
new_edges = [
edge for edge in (newA, newB)
if edge not in edges]
for end_point in AB._nodes:
node._source_polygons.update(
end_point._source_polygons)
edges = edges + new_edges
break
return changed
def _find_arc_to_arc_intersections(self):
# For speed, we want to vectorize all of the intersection
# calculations. Therefore, there is a list of edges, and two
# arrays containing the end points of those edges. They all
# need to have things added and removed from them at the same
# time to keep them in sync, but of course the interface for
# doing so is different between Python lists and numpy arrays.
edges = sorted(self._edges, key=edge_order)
starts, ends = self._get_edge_points(edges)
# Calculate edge-to-edge intersections and break
# edges on the intersection point.
changed = False
while len(edges) > 1:
AB = edges.pop(0)
A = starts[0]
starts = starts[1:] # numpy equiv of "pop(0)"
B = ends[0]
ends = ends[1:] # numpy equiv of "pop(0)"
# Calculate the intersection points between AB and all
# other remaining edges
with np.errstate(invalid='ignore'):
intersections = gca.intersection(
A, B, starts, ends)
# intersects is `True` everywhere intersections has an
# actual intersection
intersects = np.isfinite(intersections[..., 0])
intersection_indices = np.nonzero(intersects)[0]
# Iterate through the candidate intersections, if any --
# we want to eliminate intersections that only intersect
# at the end points
for j in intersection_indices:
changed = True
CD = edges[j]
E = intersections[j]
# This is a bona-fide intersection, and E is the
# point at which the two lines intersect. Make a
# new node for it -- this must belong to the all
# of the source polygons of both of the edges that
# crossed.
# A
# |
# C--E--D
# |
# B
E = self._add_node(
E, AB._source_polygons | CD._source_polygons)
newA, newB = self._split_edge(AB, E)
newC, newD = self._split_edge(CD, E)
new_edges = [
edge for edge in (newA, newB, newC, newD)
if edge not in edges]
# Delete CD, and push the new edges to the
# front so they will be tested for intersection
# against all remaining edges.
edges = edges[:j] + edges[j+1:] + new_edges
new_starts, new_ends = self._get_edge_points(new_edges)
starts = np.vstack(
(starts[:j], starts[j+1:], new_starts))
ends = np.vstack(
(ends[:j], ends[j+1:], new_ends))
break
return changed
def _find_all_intersections(self):
"""
Find all the intersecting edges in the graph. For each
intersecting pair, four new edges are created around the
intersection point.
"""
changed = self._find_arc_to_arc_intersections()
changed = self._find_point_to_arc_intersections() or changed
return changed
def _remove_interior_edges(self):
"""
Removes any nodes that are contained inside other polygons.
What's left is the (possibly disjunct) outline.
"""
changed = False
polygons = self._source_polygons
for edge in self._edges:
edge._count = 0
A, B = edge._nodes
for polygon in polygons:
if (polygon not in edge._source_polygons and
((polygon in A._source_polygons or
polygon.contains_point(A._point)) and
(polygon in B._source_polygons or
polygon.contains_point(B._point))) and
polygon.contains_point(
gca.midpoint(A._point, B._point))):
edge._count += 1
for edge in list(self._edges):
if edge._count >= 1:
self._remove_edge(edge)
changed = True
changed = self._remove_orphaned_nodes() or changed
return changed
def _remove_exterior_edges(self):
"""
Removes any edges that are not contained in all of the source
polygons. What's left is the (possibly disjunct) outline.
"""
changed = False
polygons = self._source_polygons
for edge in self._edges:
edge._count = 0
A, B = edge._nodes
for polygon in polygons:
if polygon in edge._source_polygons:
edge._count += 1
elif ((polygon in A._source_polygons or
polygon.contains_point(A._point)) and
(polygon in B._source_polygons or
polygon.contains_point(B._point)) and
polygon.contains_point(
gca.midpoint(A._point, B._point))):
edge._count += 1
for edge in list(self._edges):
if edge._count < len(polygons):
self._remove_edge(edge)
changed = True
changed = self._remove_orphaned_nodes() or changed
return changed
def _remove_degenerate_edges(self):
"""
Remove edges where both endpoints are the same point
"""
changed = False
removals = []
for edge in self._edges:
if edge._nodes[0].equals(edge._nodes[1]):
removals.append(edge)
changed = True
for edge in removals:
if edge in self._edges:
self._remove_edge(edge)
return changed
def _remove_3ary_edges(self):
"""
Remove edges between pairs of nodes that have odd numbers of
edges. This removes triangles that can't be traced.
"""
changed = False
removals = []
for edge in self._edges:
nedges_a = len(edge._nodes[0]._edges)
nedges_b = len(edge._nodes[1]._edges)
if (nedges_a % 2 == 1 and nedges_a >= 3 and
nedges_b % 2 == 1 and nedges_b >= 3):
removals.append(edge)
changed = True
for edge in removals:
if edge in self._edges:
self._remove_edge(edge)
return changed
def _remove_orphaned_nodes(self):
"""
Remove nodes with fewer than 2 edges.
"""
changed = False
while True:
removes = []
for node in list(self._nodes):
if len(node._edges) < 2:
removes.append(node)
changed = True
if len(removes):
for node in removes:
if node in self._nodes:
self._remove_node(node)
else:
break
return changed
def _contains_inside_point(self, poly):
"""
Check if the polygons in the graph all contain
the interior point of a polygon
"""
for point in poly.inside:
for source_poly in self._source_polygons:
if not source_poly.contains_point(point):
return False
return True
def _trace_polygons(self):
"""
Given a graph that has had cutlines removed and all
intersections found, traces it to find a list of
disjoint polygons
"""
def edge_normal(edge, last_edge):
# THe normal vector to the plane defining the arc
normal = gca._cross_and_normalize(edge._nodes[0]._point,
edge._nodes[1]._point)
if last_edge is not None:
orientation = None
for i in (0, 1):
last_edge_point = last_edge._nodes[i]._point
for j in (0, 1):
point = edge._nodes[j]._point
if np.array_equal(last_edge_point, point):
if i == j:
orientation = -1.0
else:
orientation = 1.0
if orientation is None:
raise RuntimeError("Unconnected edge found when tracing")
normal = orientation * normal
return normal
def pick_next_edge(node, last_edge):
# Pick the next edge when arriving at a node from
# last_edge. If there's only one other edge, the choice
# is obvious. If there's more than one, disfavor an edge
# with the same normal as the previous edge, in order to
# trace 4-connected nodes into separate distinct shapes
# and avoid edge crossings.
candidates = []
for edge in node._edges:
if not edge._followed:
candidates.append(edge)
if len(candidates) == 0:
raise ValueError("No more edges to follow")
elif len(candidates) == 1 or last_edge is None:
return candidates[0]
last_edge_cross = edge_normal(last_edge, None)
edge_cross = [edge_normal(edge, last_edge) for edge in candidates]
edge_cross = np.asanyarray(edge_cross)
dot = gca.inner1d(edge_cross, last_edge_cross)
schwartz = zip(dot, candidates)
schwartz = sorted(schwartz, key=lambda x: x[0])
middle = len(candidates) // 2
return schwartz[middle][1]
polygons = []
edges = set(self._edges) # copy
for edge in self._edges:
edge._followed = False
while len(edges):
points = []
edge = edges.pop()
edge._followed = True
start_node = node = edge._nodes[0]
points.append(node._point)
node = edge._nodes[1]
points.append(node._point)
while True:
if not np.array_equal(points[-1], node._point):
points.append(node._point)
edge = pick_next_edge(node, edge)
edge._followed = True
edges.discard(edge)
node = edge.follow(node)
if node is start_node:
points.append(node._point)
break
polygon = SingleSphericalPolygon(points)
polygons.append(polygon)
return polygons
def _trace(self):
"""
Given a graph that has had cutlines removed and all
intersections found, traces it to find a resulting single
polygon.
"""
return SphericalPolygon(self._trace_polygons())