class GraphImpl[VD, ED] extends Graph[VD, ED] with Serializable
An implementation of org.apache.spark.graphx.Graph to support computation on graphs.
Graphs are represented using two RDDs: vertices
, which contains vertex attributes and the
routing information for shipping vertex attributes to edge partitions, and
replicatedVertexView
, which contains edges and the vertex attributes mentioned by each edge.
- Source
- GraphImpl.scala
- Alphabetic
- By Inheritance
- GraphImpl
- Graph
- Serializable
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Instance Constructors
-
new
GraphImpl()(implicit arg0: ClassTag[VD], arg1: ClassTag[ED])
Default constructor is provided to support serialization
Default constructor is provided to support serialization
- Attributes
- protected
-
new
GraphImpl(vertices: VertexRDD[VD], replicatedVertexView: ReplicatedVertexView[VD, ED])(implicit arg0: ClassTag[VD], arg1: ClassTag[ED])
- Attributes
- protected
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
aggregateMessages[A](sendMsg: (EdgeContext[VD, ED, A]) ⇒ Unit, mergeMsg: (A, A) ⇒ A, tripletFields: TripletFields = TripletFields.All)(implicit arg0: ClassTag[A]): VertexRDD[A]
Aggregates values from the neighboring edges and vertices of each vertex.
Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
sendMsg
function is invoked on each edge of the graph, generating 0 or more messages to be sent to either vertex in the edge. ThemergeMsg
function is then used to combine all messages destined to the same vertex.- A
the type of message to be sent to each vertex
- sendMsg
runs on each edge, sending messages to neighboring vertices using the EdgeContext.
- mergeMsg
used to combine messages from
sendMsg
destined to the same vertex. This combiner should be commutative and associative.- tripletFields
which fields should be included in the EdgeContext passed to the
sendMsg
function. If not all fields are needed, specifying this can improve performance.
- Definition Classes
- Graph
We can use this function to compute the in-degree of each vertex
val rawGraph: Graph[_, _] = Graph.textFile("twittergraph") val inDeg: RDD[(VertexId, Int)] = rawGraph.aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
- Note
By expressing computation at the edge level we achieve maximum parallelism. This is one of the core functions in the Graph API that enables neighborhood level computation. For example this function can be used to count neighbors satisfying a predicate or implement PageRank.
Example: -
def
aggregateMessagesWithActiveSet[A](sendMsg: (EdgeContext[VD, ED, A]) ⇒ Unit, mergeMsg: (A, A) ⇒ A, tripletFields: TripletFields, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)])(implicit arg0: ClassTag[A]): VertexRDD[A]
Aggregates values from the neighboring edges and vertices of each vertex.
Aggregates values from the neighboring edges and vertices of each vertex. The user-supplied
sendMsg
function is invoked on each edge of the graph, generating 0 or more messages to be sent to either vertex in the edge. ThemergeMsg
function is then used to combine all messages destined to the same vertex.This variant can take an active set to restrict the computation and is intended for internal use only.
- A
the type of message to be sent to each vertex
- sendMsg
runs on each edge, sending messages to neighboring vertices using the EdgeContext.
- mergeMsg
used to combine messages from
sendMsg
destined to the same vertex. This combiner should be commutative and associative.- tripletFields
which fields should be included in the EdgeContext passed to the
sendMsg
function. If not all fields are needed, specifying this can improve performance.- activeSetOpt
an efficient way to run the aggregation on a subset of the edges if desired. This is done by specifying a set of "active" vertices and an edge direction. The
sendMsg
function will then run on only edges connected to active vertices by edges in the specified direction. If the direction isIn
,sendMsg
will only be run on edges with destination in the active set. If the direction isOut
,sendMsg
will only be run on edges originating from vertices in the active set. If the direction isEither
,sendMsg
will be run on edges with *either* vertex in the active set. If the direction isBoth
,sendMsg
will be run on edges with *both* vertices in the active set. The active set must have the same index as the graph's vertices.
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
cache(): Graph[VD, ED]
Caches the vertices and edges associated with this graph at the previously-specified target storage levels, which default to
MEMORY_ONLY
. -
def
checkpoint(): Unit
Mark this Graph for checkpointing.
Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. It is strongly recommended that this Graph is persisted in memory, otherwise saving it on a file will require recomputation.
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
val
edges: EdgeRDDImpl[ED, VD]
An RDD containing the edges and their associated attributes.
An RDD containing the edges and their associated attributes. The entries in the RDD contain just the source id and target id along with the edge data.
- returns
an RDD containing the edges in this graph
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] )
-
def
getCheckpointFiles: Seq[String]
Gets the name of the files to which this Graph was checkpointed.
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
groupEdges(merge: (ED, ED) ⇒ ED): Graph[VD, ED]
Merges multiple edges between two vertices into a single edge.
Merges multiple edges between two vertices into a single edge. For correct results, the graph must have been partitioned using
partitionBy
.- merge
the user-supplied commutative associative function to merge edge attributes for duplicate edges.
- returns
The resulting graph with a single edge for each (source, dest) vertex pair.
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
isCheckpointed: Boolean
Return whether this Graph has been checkpointed or not.
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
def
mapEdges[ED2](f: (PartitionID, Iterator[Edge[ED]]) ⇒ Iterator[ED2])(implicit arg0: ClassTag[ED2]): Graph[VD, ED2]
Transforms each edge attribute using the map function, passing it a whole partition at a time.
Transforms each edge attribute using the map function, passing it a whole partition at a time. The map function is given an iterator over edges within a logical partition as well as the partition's ID, and it should return a new iterator over the new values of each edge. The new iterator's elements must correspond one-to-one with the old iterator's elements. If adjacent vertex values are desired, use
mapTriplets
.- ED2
the new edge data type
-
def
mapEdges[ED2](map: (Edge[ED]) ⇒ ED2)(implicit arg0: ClassTag[ED2]): Graph[VD, ED2]
Transforms each edge attribute in the graph using the map function.
Transforms each edge attribute in the graph using the map function. The map function is not passed the vertex value for the vertices adjacent to the edge. If vertex values are desired, use
mapTriplets
.- ED2
the new edge data type
- map
the function from an edge object to a new edge value.
- Definition Classes
- Graph
This function might be used to initialize edge attributes.
- Note
This graph is not changed and that the new graph has the same structure. As a consequence the underlying index structures can be reused.
Example: -
def
mapTriplets[ED2](f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) ⇒ Iterator[ED2], tripletFields: TripletFields)(implicit arg0: ClassTag[ED2]): Graph[VD, ED2]
Transforms each edge attribute a partition at a time using the map function, passing it the adjacent vertex attributes as well.
Transforms each edge attribute a partition at a time using the map function, passing it the adjacent vertex attributes as well. The map function is given an iterator over edge triplets within a logical partition and should yield a new iterator over the new values of each edge in the order in which they are provided. If adjacent vertex values are not required, consider using
mapEdges
instead.- ED2
the new edge data type
- tripletFields
which fields should be included in the edge triplet passed to the map function. If not all fields are needed, specifying this can improve performance.
-
def
mapTriplets[ED2](map: (EdgeTriplet[VD, ED]) ⇒ ED2, tripletFields: TripletFields)(implicit arg0: ClassTag[ED2]): Graph[VD, ED2]
Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well.
Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well. If adjacent vertex values are not required, consider using
mapEdges
instead.- ED2
the new edge data type
- map
the function from an edge object to a new edge value.
- tripletFields
which fields should be included in the edge triplet passed to the map function. If not all fields are needed, specifying this can improve performance.
- Definition Classes
- Graph
This function might be used to initialize edge attributes based on the attributes associated with each vertex.
val rawGraph: Graph[Int, Int] = someLoadFunction() val graph = rawGraph.mapTriplets[Int]( edge => edge.src.data - edge.dst.data)
- Note
This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
Example: -
def
mapTriplets[ED2](map: (EdgeTriplet[VD, ED]) ⇒ ED2)(implicit arg0: ClassTag[ED2]): Graph[VD, ED2]
Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well.
Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well. If adjacent vertex values are not required, consider using
mapEdges
instead.- ED2
the new edge data type
- map
the function from an edge object to a new edge value.
- Definition Classes
- Graph
This function might be used to initialize edge attributes based on the attributes associated with each vertex.
val rawGraph: Graph[Int, Int] = someLoadFunction() val graph = rawGraph.mapTriplets[Int]( edge => edge.src.data - edge.dst.data)
- Note
This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
Example: -
def
mapVertices[VD2](f: (VertexId, VD) ⇒ VD2)(implicit arg0: ClassTag[VD2], eq: =:=[VD, VD2] = null): Graph[VD2, ED]
Transforms each vertex attribute in the graph using the map function.
Transforms each vertex attribute in the graph using the map function.
- VD2
the new vertex data type
- Definition Classes
- GraphImpl → Graph
We might use this operation to change the vertex values from one type to another to initialize an algorithm.
val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") val root = 42 var bfsGraph = rawGraph.mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)
- Note
The new graph has the same structure. As a consequence the underlying index structures can be reused.
Example: -
def
mask[VD2, ED2](other: Graph[VD2, ED2])(implicit arg0: ClassTag[VD2], arg1: ClassTag[ED2]): Graph[VD, ED]
Restricts the graph to only the vertices and edges that are also in
other
, but keeps the attributes from this graph.Restricts the graph to only the vertices and edges that are also in
other
, but keeps the attributes from this graph.- other
the graph to project this graph onto
- returns
a graph with vertices and edges that exist in both the current graph and
other
, with vertex and edge data from the current graph
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
val
ops: GraphOps[VD, ED]
The associated GraphOps object.
-
def
outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])(updateF: (VertexId, VD, Option[U]) ⇒ VD2)(implicit arg0: ClassTag[U], arg1: ClassTag[VD2], eq: =:=[VD, VD2] = null): Graph[VD2, ED]
Joins the vertices with entries in the
table
RDD and merges the results usingmapFunc
.Joins the vertices with entries in the
table
RDD and merges the results usingmapFunc
. The input table should contain at most one entry for each vertex. If no entry inother
is provided for a particular vertex in the graph, the map function receivesNone
.- U
the type of entry in the table of updates
- VD2
the new vertex value type
- other
the table to join with the vertices in the graph. The table should contain at most one entry for each vertex.
- Definition Classes
- GraphImpl → Graph
This function is used to update the vertices with new values based on external data. For example we could add the out-degree to each vertex record:
val rawGraph: Graph[_, _] = Graph.textFile("webgraph") val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees val graph = rawGraph.outerJoinVertices(outDeg) { (vid, data, optDeg) => optDeg.getOrElse(0) }
Example: -
def
partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
Repartitions the edges in the graph according to
partitionStrategy
. -
def
partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
Repartitions the edges in the graph according to
partitionStrategy
. -
def
persist(newLevel: StorageLevel): Graph[VD, ED]
Caches the vertices and edges associated with this graph at the specified storage level, ignoring any target storage levels previously set.
- val replicatedVertexView: ReplicatedVertexView[VD, ED]
-
def
reverse: Graph[VD, ED]
Reverses all edges in the graph.
-
def
subgraph(epred: (EdgeTriplet[VD, ED]) ⇒ Boolean = x => true, vpred: (VertexId, VD) ⇒ Boolean = (a, b) => true): Graph[VD, ED]
Restricts the graph to only the vertices and edges satisfying the predicates.
Restricts the graph to only the vertices and edges satisfying the predicates. The resulting subgraph satisfies
V' = {v : for all v in V where vpred(v)} E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)}
- epred
the edge predicate, which takes a triplet and evaluates to true if the edge is to remain in the subgraph. Note that only edges where both vertices satisfy the vertex predicate are considered.
- vpred
the vertex predicate, which takes a vertex object and evaluates to true if the vertex is to be included in the subgraph
- returns
the subgraph containing only the vertices and edges that satisfy the predicates
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
lazy val
triplets: RDD[EdgeTriplet[VD, ED]]
Return an RDD that brings edges together with their source and destination vertices.
-
def
unpersist(blocking: Boolean = false): Graph[VD, ED]
Uncaches both vertices and edges of this graph.
-
def
unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
Uncaches only the vertices of this graph, leaving the edges alone.
Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative algorithms that modify the vertex attributes but reuse the edges. This method can be used to uncache the vertex attributes of previous iterations once they are no longer needed, improving GC performance.
- blocking
Whether to block until all data is unpersisted (default: false)
-
val
vertices: VertexRDD[VD]
An RDD containing the vertices and their associated attributes.
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()