PageRank is an algorithm used to rank web pages. The input of PageRank is a directed graph. Each vertex represents a web page. Each edge represents a link between two web pages. If the web pages are not connected, no edge exists between the vertices.
How it works
How the PageRank algorithm works:
Initialization: A vertex value indicates the rank value of PageRank. The rank value is of the DOUBLE type. During initialization, the value of each vertex is
1/TotalNumVertices
.Iteration formula:
PageRank(i) = 0.15/TotalNumVertices + 0.85 × Value of sum
sum is used to add up
PageRank(j)/out_degree(j)
of each vertex that points to center i. j in the formula refers to each vertex that points to center i.
The PageRank algorithm is suitable for MaxCompute Graph programs. Each vertex j maintains its PageRank value and sends PageRank(j)/out_degree(j)
to its adjacent vertices (for voting) in each iteration. In the next iteration, each vertex recalculates the PageRank value by using the iteration formula.
Prerequisites
You have set up the required testing environment by writing a Graph job.
Test preparation
This test is conducted by submitting jobs to the cluster from the MaxCompute client. You can also perform local testing first. For more details, see Local debugging.
Prepare the JAR file for the test program, named graph-examples.jar, and store it in the data\resources folder within the MaxCompute client's bin directory.
Prepare the test tables and resources for the PageRank.
Create test tables
CREATE TABLE pagerank_in(vertex STRING, des_1 STRING, des_2 STRING); CREATE TABLE pagerank_out(vertex_id STRING, vertex_value DOUBLE);
Add test sources
-- Use the -f override option to ignore files on the first addition. add jar data\resources\graph-examples.jar -f;
Run the Tunnel command to import the data.txt file from the MaxCompute client bin directory into the
pagerank_in
table.tunnel upload data.txt pagerank_in;
The following shows the data from the
data.txt
file:1,2,4 2,1,3 4,2,3 3,1,2
Test procedure
Execute the PageRank test in MaxCompute client.
jar -resources graph-examples.jar -classpath data\resources\graph-examples.jar
com.aliyun.odps.graph.PageRank pagerank_in pagerank_out
Expected results
After the job successfully completes, the contents of the output table pagerank_out
are as follows.
+------------+--------------+
| vertex_id | vertex_value |
+------------+--------------+
| 1 | 0.2781238395149928 |
| 2 | 0.3245614688676814 |
| 3 | 0.24161225195637787 |
| 4 | 0.155702636559485 |
+------------+--------------+
Sample code
import java.io.IOException;
import org.apache.log4j.Logger;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
public class PageRank {
private final static Logger LOG = Logger.getLogger(PageRank.class);
public static class PageRankVertex extends
Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void compute(
ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
Iterable<DoubleWritable> messages) throws IOException {
if (context.getSuperstep() == 0) {
setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
} else if (context.getSuperstep() >= 1) {
double sum = 0;
for (DoubleWritable msg : messages) {
sum += msg.get();
}
DoubleWritable vertexValue = new DoubleWritable(
(0.15f / context.getTotalNumVertices()) + 0.85f * sum);
setValue(vertexValue);
}
if (hasEdges()) {
context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
.get() / getEdges().size()));
}
}
@Override
public void cleanup(
WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class PageRankVertexReader extends
GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
PageRankVertex vertex = new PageRankVertex();
vertex.setValue(new DoubleWritable(0));
vertex.setId((Text) record.get(0));
System.out.println(record.get(0));
for (int i = 1; i < record.size(); i++) {
Writable edge = record.get(i);
System.out.println(edge.toString());
if (!( edge.equals(NullWritable.get()))) {
vertex.addEdge(new Text(edge.toString()), NullWritable.get());
}
}
LOG.info("vertex edgs size: "
+ (vertex.hasEdges() ? vertex.getEdges().size() : 0));
context.addVertexRequest(vertex);
}
}
private static void printUsage() {
System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
System.exit(-1);
}
public static void main(String[] args) throws IOException {
if (args.length < 2)
printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
Description:
Row 23: Define the PageRankVertex class.
The vertex value indicates the current PageRank value of the vertex (web page).
The compute() method uses the following iteration formula to update the vertex value:
PageRank(i) = 0.15/TotalNumVertices + 0.85 × Value of sum
The cleanup() method writes the vertex and its PageRank value to the result table.
Row 55: Define the PageRankVertexReader class, load a graph, and resolve each record in the table into a vertex. The first column of the table is the source vertices and other columns are the destination vertices.
Row 88: Include the main function, define the GraphJob class, and specify the maximum number of iterations, the input and output tables, and the implementation of Vertex and GraphLoader. By default, a maximum of 30 iterations can be performed.