Package org.apache.spark.graphx
Class Graph<VD,ED>
Object
org.apache.spark.graphx.Graph<VD,ED>
- Type Parameters:
VD
- the vertex attribute typeED
- the edge attribute type
- All Implemented Interfaces:
Serializable
,scala.Serializable
- Direct Known Subclasses:
GraphImpl
The Graph abstractly represents a graph with arbitrary objects
associated with vertices and edges. The graph provides basic
operations to access and manipulate the data associated with
vertices and edges as well as the underlying structure. Like Spark
RDDs, the graph is a functional data-structure in which mutating
operations return new graphs.
- See Also:
- Note:
GraphOps
contains additional convenience operations and graph algorithms.
-
Method Summary
Modifier and TypeMethodDescription<A> VertexRDD<A>
aggregateMessages
(scala.Function1<EdgeContext<VD, ED, A>, scala.runtime.BoxedUnit> sendMsg, scala.Function2<A, A, A> mergeMsg, TripletFields tripletFields, scala.reflect.ClassTag<A> evidence$11) Aggregates values from the neighboring edges and vertices of each vertex.static <VD,
ED> Graph<VD, ED> apply
(RDD<scala.Tuple2<Object, VD>> vertices, RDD<Edge<ED>> edges, VD defaultVertexAttr, StorageLevel edgeStorageLevel, StorageLevel vertexStorageLevel, scala.reflect.ClassTag<VD> evidence$18, scala.reflect.ClassTag<ED> evidence$19) Construct a graph from a collection of vertices and edges with attributes.cache()
Caches the vertices and edges associated with this graph at the previously-specified target storage levels, which default toMEMORY_ONLY
.abstract void
Mark this Graph for checkpointing.edges()
An RDD containing the edges and their associated attributes.static <VD,
ED> Graph<VD, ED> fromEdges
(RDD<Edge<ED>> edges, VD defaultValue, StorageLevel edgeStorageLevel, StorageLevel vertexStorageLevel, scala.reflect.ClassTag<VD> evidence$16, scala.reflect.ClassTag<ED> evidence$17) Construct a graph from a collection of edges.fromEdgeTuples
(RDD<scala.Tuple2<Object, Object>> rawEdges, VD defaultValue, scala.Option<PartitionStrategy> uniqueEdges, StorageLevel edgeStorageLevel, StorageLevel vertexStorageLevel, scala.reflect.ClassTag<VD> evidence$15) Construct a graph from a collection of edges encoded as vertex id pairs.abstract scala.collection.Seq<String>
Gets the name of the files to which this Graph was checkpointed.static <VD,
ED> GraphOps<VD, ED> graphToGraphOps
(Graph<VD, ED> g, scala.reflect.ClassTag<VD> evidence$20, scala.reflect.ClassTag<ED> evidence$21) Implicitly extracts theGraphOps
member from a graph.groupEdges
(scala.Function2<ED, ED, ED> merge) Merges multiple edges between two vertices into a single edge.abstract boolean
Return whether this Graph has been checkpointed or not.Transforms each edge attribute in the graph using the map function.mapEdges
(scala.Function2<Object, scala.collection.Iterator<Edge<ED>>, scala.collection.Iterator<ED2>> map, scala.reflect.ClassTag<ED2> evidence$5) Transforms each edge attribute using the map function, passing it a whole partition at a time.mapTriplets
(scala.Function1<EdgeTriplet<VD, ED>, ED2> map, TripletFields tripletFields, scala.reflect.ClassTag<ED2> evidence$7) Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well.mapTriplets
(scala.Function1<EdgeTriplet<VD, ED>, ED2> map, scala.reflect.ClassTag<ED2> evidence$6) Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well.mapTriplets
(scala.Function2<Object, scala.collection.Iterator<EdgeTriplet<VD, ED>>, scala.collection.Iterator<ED2>> map, TripletFields tripletFields, scala.reflect.ClassTag<ED2> evidence$8) Transforms each edge attribute a partition at a time using the map function, passing it the adjacent vertex attributes as well.mapVertices
(scala.Function2<Object, VD, VD2> map, scala.reflect.ClassTag<VD2> evidence$3, scala.Predef.$eq$colon$eq<VD, VD2> eq) Transforms each vertex attribute in the graph using the map function.mask
(Graph<VD2, ED2> other, scala.reflect.ClassTag<VD2> evidence$9, scala.reflect.ClassTag<ED2> evidence$10) Restricts the graph to only the vertices and edges that are also inother
, but keeps the attributes from this graph.ops()
The associatedGraphOps
object.outerJoinVertices
(RDD<scala.Tuple2<Object, U>> other, scala.Function3<Object, VD, scala.Option<U>, VD2> mapFunc, scala.reflect.ClassTag<U> evidence$13, scala.reflect.ClassTag<VD2> evidence$14, scala.Predef.$eq$colon$eq<VD, VD2> eq) Joins the vertices with entries in thetable
RDD and merges the results usingmapFunc
.partitionBy
(PartitionStrategy partitionStrategy) Repartitions the edges in the graph according topartitionStrategy
.partitionBy
(PartitionStrategy partitionStrategy, int numPartitions) Repartitions the edges in the graph according topartitionStrategy
.persist
(StorageLevel newLevel) Caches the vertices and edges associated with this graph at the specified storage level, ignoring any target storage levels previously set.reverse()
Reverses all edges in the graph.Restricts the graph to only the vertices and edges satisfying the predicates.abstract RDD<EdgeTriplet<VD,
ED>> triplets()
An RDD containing the edge triplets, which are edges along with the vertex data associated with the adjacent vertices.unpersist
(boolean blocking) Uncaches both vertices and edges of this graph.unpersistVertices
(boolean blocking) Uncaches only the vertices of this graph, leaving the edges alone.vertices()
An RDD containing the vertices and their associated attributes.
-
Method Details
-
fromEdgeTuples
public static <VD> Graph<VD,Object> fromEdgeTuples(RDD<scala.Tuple2<Object, Object>> rawEdges, VD defaultValue, scala.Option<PartitionStrategy> uniqueEdges, StorageLevel edgeStorageLevel, StorageLevel vertexStorageLevel, scala.reflect.ClassTag<VD> evidence$15) Construct a graph from a collection of edges encoded as vertex id pairs.- Parameters:
rawEdges
- a collection of edges in (src, dst) formdefaultValue
- the vertex attributes with which to create vertices referenced by the edgesuniqueEdges
- if multiple identical edges are found they are combined and the edge attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enableuniqueEdges
, aPartitionStrategy
must be provided.edgeStorageLevel
- the desired storage level at which to cache the edges if necessaryvertexStorageLevel
- the desired storage level at which to cache the vertices if necessaryevidence$15
- (undocumented)- Returns:
- a graph with edge attributes containing either the count of duplicate edges or 1
(if
uniqueEdges
isNone
) and vertex attributes containing the total degree of each vertex.
-
fromEdges
public static <VD,ED> Graph<VD,ED> fromEdges(RDD<Edge<ED>> edges, VD defaultValue, StorageLevel edgeStorageLevel, StorageLevel vertexStorageLevel, scala.reflect.ClassTag<VD> evidence$16, scala.reflect.ClassTag<ED> evidence$17) Construct a graph from a collection of edges.- Parameters:
edges
- the RDD containing the set of edges in the graphdefaultValue
- the default vertex attribute to use for each vertexedgeStorageLevel
- the desired storage level at which to cache the edges if necessaryvertexStorageLevel
- the desired storage level at which to cache the vertices if necessaryevidence$16
- (undocumented)evidence$17
- (undocumented)- Returns:
- a graph with edge attributes described by
edges
and vertices given by all vertices inedges
with valuedefaultValue
-
apply
public static <VD,ED> Graph<VD,ED> apply(RDD<scala.Tuple2<Object, VD>> vertices, RDD<Edge<ED>> edges, VD defaultVertexAttr, StorageLevel edgeStorageLevel, StorageLevel vertexStorageLevel, scala.reflect.ClassTag<VD> evidence$18, scala.reflect.ClassTag<ED> evidence$19) Construct a graph from a collection of vertices and edges with attributes. Duplicate vertices are picked arbitrarily and vertices found in the edge collection but not in the input vertices are assigned the default attribute.- Parameters:
vertices
- the "set" of vertices and their attributesedges
- the collection of edges in the graphdefaultVertexAttr
- the default vertex attribute to use for vertices that are mentioned in edges but not in verticesedgeStorageLevel
- the desired storage level at which to cache the edges if necessaryvertexStorageLevel
- the desired storage level at which to cache the vertices if necessaryevidence$18
- (undocumented)evidence$19
- (undocumented)- Returns:
- (undocumented)
-
graphToGraphOps
public static <VD,ED> GraphOps<VD,ED> graphToGraphOps(Graph<VD, ED> g, scala.reflect.ClassTag<VD> evidence$20, scala.reflect.ClassTag<ED> evidence$21) Implicitly extracts theGraphOps
member from a graph.To improve modularity the Graph type only contains a small set of basic operations. All the convenience operations are defined in the
GraphOps
class which may be shared across multiple graph implementations.- Parameters:
g
- (undocumented)evidence$20
- (undocumented)evidence$21
- (undocumented)- Returns:
- (undocumented)
-
vertices
An RDD containing the vertices and their associated attributes.- Returns:
- an RDD containing the vertices in this graph
- Note:
- vertex ids are unique.
-
edges
An RDD containing the edges and their associated attributes. The entries in the RDD contain just the source id and target id along with the edge data.- Returns:
- an RDD containing the edges in this graph
- See Also:
-
Edge
for the edge type.Graph#triplets
to get an RDD which contains all the edges along with their vertex data.
-
triplets
An RDD containing the edge triplets, which are edges along with the vertex data associated with the adjacent vertices. The caller should useedges()
if the vertex data are not needed, i.e. if only the edge data and adjacent vertex ids are needed.- Returns:
- an RDD containing edge triplets
- Example:
- This operation might be used to evaluate a graph
coloring where we would like to check that both vertices are a
different color.
type Color = Int val graph: Graph[Color, Int] = GraphLoader.edgeListFile("hdfs://file.tsv") val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
-
persist
Caches the vertices and edges associated with this graph at the specified storage level, ignoring any target storage levels previously set.- Parameters:
newLevel
- the level at which to cache the graph.- Returns:
- A reference to this graph for convenience.
-
cache
Caches the vertices and edges associated with this graph at the previously-specified target storage levels, which default toMEMORY_ONLY
. This is used to pin a graph in memory enabling multiple queries to reuse the same construction process.- Returns:
- (undocumented)
-
checkpoint
public abstract void checkpoint()Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext.setCheckpointDir() and all references to its parent RDDs will be removed. It is strongly recommended that this Graph is persisted in memory, otherwise saving it on a file will require recomputation. -
isCheckpointed
public abstract boolean isCheckpointed()Return whether this Graph has been checkpointed or not. This returns true iff both the vertices RDD and edges RDD have been checkpointed.- Returns:
- (undocumented)
-
getCheckpointFiles
Gets the name of the files to which this Graph was checkpointed. (The vertices RDD and edges RDD are checkpointed separately.)- Returns:
- (undocumented)
-
unpersist
Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that build a new graph in each iteration.- Parameters:
blocking
- Whether to block until all data is unpersisted (default: false)- Returns:
- (undocumented)
-
unpersistVertices
Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative algorithms that modify the vertex attributes but reuse the edges. This method can be used to uncache the vertex attributes of previous iterations once they are no longer needed, improving GC performance.- Parameters:
blocking
- Whether to block until all data is unpersisted (default: false)- Returns:
- (undocumented)
-
partitionBy
Repartitions the edges in the graph according topartitionStrategy
.- Parameters:
partitionStrategy
- the partitioning strategy to use when partitioning the edges in the graph.- Returns:
- (undocumented)
-
partitionBy
Repartitions the edges in the graph according topartitionStrategy
.- Parameters:
partitionStrategy
- the partitioning strategy to use when partitioning the edges in the graph.numPartitions
- the number of edge partitions in the new graph.- Returns:
- (undocumented)
-
mapVertices
public abstract <VD2> Graph<VD2,ED> mapVertices(scala.Function2<Object, VD, VD2> map, scala.reflect.ClassTag<VD2> evidence$3, scala.Predef.$eq$colon$eq<VD, VD2> eq) Transforms each vertex attribute in the graph using the map function.- Parameters:
map
- the function from a vertex object to a new vertex valueevidence$3
- (undocumented)eq
- (undocumented)- Returns:
- (undocumented)
- Example:
- We might use this operation to change the vertex values
from one type to another to initialize an algorithm.
val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file") val root = 42 var bfsGraph = rawGraph.mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)
- Note:
- The new graph has the same structure. As a consequence the underlying index structures can be reused.
-
mapEdges
public <ED2> Graph<VD,ED2> mapEdges(scala.Function1<Edge<ED>, ED2> map, scala.reflect.ClassTag<ED2> evidence$4) Transforms each edge attribute in the graph using the map function. The map function is not passed the vertex value for the vertices adjacent to the edge. If vertex values are desired, usemapTriplets
.- Parameters:
map
- the function from an edge object to a new edge value.evidence$4
- (undocumented)- Returns:
- (undocumented)
- Example:
- This function might be used to initialize edge attributes.
- Note:
- This graph is not changed and that the new graph has the same structure. As a consequence the underlying index structures can be reused.
-
mapEdges
public abstract <ED2> Graph<VD,ED2> mapEdges(scala.Function2<Object, scala.collection.Iterator<Edge<ED>>, scala.collection.Iterator<ED2>> map, scala.reflect.ClassTag<ED2> evidence$5) Transforms each edge attribute using the map function, passing it a whole partition at a time. The map function is given an iterator over edges within a logical partition as well as the partition's ID, and it should return a new iterator over the new values of each edge. The new iterator's elements must correspond one-to-one with the old iterator's elements. If adjacent vertex values are desired, usemapTriplets
.- Parameters:
map
- a function that takes a partition id and an iterator over all the edges in the partition, and must return an iterator over the new values for each edge in the order of the input iteratorevidence$5
- (undocumented)- Returns:
- (undocumented)
- Note:
- This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
-
mapTriplets
public <ED2> Graph<VD,ED2> mapTriplets(scala.Function1<EdgeTriplet<VD, ED>, ED2> map, scala.reflect.ClassTag<ED2> evidence$6) Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well. If adjacent vertex values are not required, consider usingmapEdges
instead.- Parameters:
map
- the function from an edge object to a new edge value.evidence$6
- (undocumented)- Returns:
- (undocumented)
- Example:
- This function might be used to initialize edge
attributes based on the attributes associated with each vertex.
val rawGraph: Graph[Int, Int] = someLoadFunction() val graph = rawGraph.mapTriplets[Int]( edge => edge.src.data - edge.dst.data)
- Note:
- This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
-
mapTriplets
public <ED2> Graph<VD,ED2> mapTriplets(scala.Function1<EdgeTriplet<VD, ED>, ED2> map, TripletFields tripletFields, scala.reflect.ClassTag<ED2> evidence$7) Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well. If adjacent vertex values are not required, consider usingmapEdges
instead.- Parameters:
map
- the function from an edge object to a new edge value.tripletFields
- which fields should be included in the edge triplet passed to the map function. If not all fields are needed, specifying this can improve performance.evidence$7
- (undocumented)- Returns:
- (undocumented)
- Example:
- This function might be used to initialize edge
attributes based on the attributes associated with each vertex.
val rawGraph: Graph[Int, Int] = someLoadFunction() val graph = rawGraph.mapTriplets[Int]( edge => edge.src.data - edge.dst.data)
- Note:
- This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
-
mapTriplets
public abstract <ED2> Graph<VD,ED2> mapTriplets(scala.Function2<Object, scala.collection.Iterator<EdgeTriplet<VD, ED>>, scala.collection.Iterator<ED2>> map, TripletFields tripletFields, scala.reflect.ClassTag<ED2> evidence$8) Transforms each edge attribute a partition at a time using the map function, passing it the adjacent vertex attributes as well. The map function is given an iterator over edge triplets within a logical partition and should yield a new iterator over the new values of each edge in the order in which they are provided. If adjacent vertex values are not required, consider usingmapEdges
instead.- Parameters:
map
- the iterator transformtripletFields
- which fields should be included in the edge triplet passed to the map function. If not all fields are needed, specifying this can improve performance.evidence$8
- (undocumented)- Returns:
- (undocumented)
- Note:
- This does not change the structure of the graph or modify the values of this graph. As a consequence the underlying index structures can be reused.
-
reverse
Reverses all edges in the graph. If this graph contains an edge from a to b then the returned graph contains an edge from b to a.- Returns:
- (undocumented)
-
subgraph
public abstract Graph<VD,ED> subgraph(scala.Function1<EdgeTriplet<VD, ED>, Object> epred, scala.Function2<Object, VD, Object> vpred) Restricts the graph to only the vertices and edges satisfying the predicates. The resulting subgraph satisfiesV' = {v : for all v in V where vpred(v)} E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)}
- Parameters:
epred
- the edge predicate, which takes a triplet and evaluates to true if the edge is to remain in the subgraph. Note that only edges where both vertices satisfy the vertex predicate are considered.vpred
- the vertex predicate, which takes a vertex object and evaluates to true if the vertex is to be included in the subgraph- Returns:
- the subgraph containing only the vertices and edges that satisfy the predicates
-
mask
public abstract <VD2,ED2> Graph<VD,ED> mask(Graph<VD2, ED2> other, scala.reflect.ClassTag<VD2> evidence$9, scala.reflect.ClassTag<ED2> evidence$10) Restricts the graph to only the vertices and edges that are also inother
, but keeps the attributes from this graph.- Parameters:
other
- the graph to project this graph ontoevidence$9
- (undocumented)evidence$10
- (undocumented)- Returns:
- a graph with vertices and edges that exist in both the current graph and
other
, with vertex and edge data from the current graph
-
groupEdges
Merges multiple edges between two vertices into a single edge. For correct results, the graph must have been partitioned usingpartitionBy
.- Parameters:
merge
- the user-supplied commutative associative function to merge edge attributes for duplicate edges.- Returns:
- The resulting graph with a single edge for each (source, dest) vertex pair.
-
aggregateMessages
public <A> VertexRDD<A> aggregateMessages(scala.Function1<EdgeContext<VD, ED, A>, scala.runtime.BoxedUnit> sendMsg, scala.Function2<A, A, A> mergeMsg, TripletFields tripletFields, scala.reflect.ClassTag<A> evidence$11) Aggregates values from the neighboring edges and vertices of each vertex. The user-suppliedsendMsg
function is invoked on each edge of the graph, generating 0 or more messages to be sent to either vertex in the edge. ThemergeMsg
function is then used to combine all messages destined to the same vertex.- Parameters:
sendMsg
- runs on each edge, sending messages to neighboring vertices using theEdgeContext
.mergeMsg
- used to combine messages fromsendMsg
destined to the same vertex. This combiner should be commutative and associative.tripletFields
- which fields should be included in theEdgeContext
passed to thesendMsg
function. If not all fields are needed, specifying this can improve performance.evidence$11
- (undocumented)- Returns:
- (undocumented)
- Example:
- We can use this function to compute the in-degree of each
vertex
val rawGraph: Graph[_, _] = Graph.textFile("twittergraph") val inDeg: RDD[(VertexId, Int)] = rawGraph.aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
- Note:
- By expressing computation at the edge level we achieve maximum parallelism. This is one of the core functions in the Graph API that enables neighborhood level computation. For example this function can be used to count neighbors satisfying a predicate or implement PageRank.
-
outerJoinVertices
public abstract <U,VD2> Graph<VD2,ED> outerJoinVertices(RDD<scala.Tuple2<Object, U>> other, scala.Function3<Object, VD, scala.Option<U>, VD2> mapFunc, scala.reflect.ClassTag<U> evidence$13, scala.reflect.ClassTag<VD2> evidence$14, scala.Predef.$eq$colon$eq<VD, VD2> eq) Joins the vertices with entries in thetable
RDD and merges the results usingmapFunc
. The input table should contain at most one entry for each vertex. If no entry inother
is provided for a particular vertex in the graph, the map function receivesNone
.- Parameters:
other
- the table to join with the vertices in the graph. The table should contain at most one entry for each vertex.mapFunc
- the function used to compute the new vertex values. The map function is invoked for all vertices, even those that do not have a corresponding entry in the table.evidence$13
- (undocumented)evidence$14
- (undocumented)eq
- (undocumented)- Returns:
- (undocumented)
- Example:
- This function is used to update the vertices with new values based on external data.
For example we could add the out-degree to each vertex record:
val rawGraph: Graph[_, _] = Graph.textFile("webgraph") val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees val graph = rawGraph.outerJoinVertices(outDeg) { (vid, data, optDeg) => optDeg.getOrElse(0) }
-
ops
The associatedGraphOps
object.- Returns:
- (undocumented)
-