全部產品
Search
文件中心

MaxCompute:PageRank

更新時間:Dec 05, 2024

PageRank演算法是計算網頁排名的經典演算法。輸入是一個有向圖G,其中頂點表示網頁。如果存在網頁A到網頁B的連結,則存在串連A到B的邊。

基本原理

演算法的基本原理如下:

  • 初始化:點值表示PageRank的rank值(DOUBLE類型)。初始時,所有點取值為1/TotalNumVertices

  • 迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum。其中sum為所有指向i點的點(設為j)PageRank(j)/out_degree(j)的累加值。

由演算法基本原理可以看出,此演算法非常適合使用MaxCompute Graph程式進行求解。每個點j維護其PageRank值,每一輪迭代都將PageRank(j)/out_degree(j)發給其鄰接點(向其投票)。下一輪迭代時,每個點根據迭代公式重新計算PageRank取值。

前提條件

您已通過編寫Graph完成測試所需的環境配置。

測試準備

本測試是在MaxCompute用戶端,提交作業到叢集進行測試。您也可以先進行本地測試,詳情請參見本地調試

  1. 準備好測試程式的JAR包,假設名字為graph-examples.jar,本地存放路徑為MaxCompute用戶端bin目錄下data\resources

  2. 準備好PageRank的測試表和資源。

    1. 建立測試表。

      CREATE TABLE pagerank_in(vertex STRING, des_1 STRING, des_2 STRING);
      CREATE TABLE pagerank_out(vertex_id STRING, vertex_value DOUBLE);
    2. 添加測試資源。

      -- 首次添加忽略-f覆蓋指令。
      add jar data\resources\graph-examples.jar -f;
  3. 使用Tunnel將MaxCompute用戶端bin目錄下data.txt匯入pagerank_in表中。

    tunnel upload data.txt pagerank_in;

    data.txt資料如下:

    1,2,4
    2,1,3
    4,2,3
    3,1,2

測試步驟

在MaxCompute用戶端中執行PageRank測試。

jar -resources graph-examples.jar -classpath data\resources\graph-examples.jar
com.aliyun.odps.graph.PageRank pagerank_in pagerank_out

預期結果

作業成功結束後,輸出表pagerank_out中的內容如下。

+------------+--------------+
| vertex_id  | vertex_value |
+------------+--------------+
| 1          | 0.2781238395149928 |
| 2          | 0.3245614688676814 |
| 3          | 0.24161225195637787 |
| 4          | 0.155702636559485 |
+------------+--------------+

程式碼範例

import java.io.IOException;

import org.apache.log4j.Logger;

import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;

public class PageRank {

  private final static Logger LOG = Logger.getLogger(PageRank.class);

  public static class PageRankVertex extends
      Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {

    @Override
    public void compute(
        ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
        Iterable<DoubleWritable> messages) throws IOException {
      if (context.getSuperstep() == 0) {
        setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
      } else if (context.getSuperstep() >= 1) {
        double sum = 0;
        for (DoubleWritable msg : messages) {
          sum += msg.get();
        }
        DoubleWritable vertexValue = new DoubleWritable(
            (0.15f / context.getTotalNumVertices()) + 0.85f * sum);
        setValue(vertexValue);
      }
      if (hasEdges()) {
        context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
            .get() / getEdges().size()));
      }
    }

    @Override
    public void cleanup(
        WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
        throws IOException {
      context.write(getId(), getValue());
    }
  }

  public static class PageRankVertexReader extends
      GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {

    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
        throws IOException {
      PageRankVertex vertex = new PageRankVertex();
      vertex.setValue(new DoubleWritable(0));
      vertex.setId((Text) record.get(0));
      System.out.println(record.get(0));

      for (int i = 1; i < record.size(); i++) {
        Writable edge = record.get(i);
        System.out.println(edge.toString());
        if (!(edge.equals(NullWritable.get()))) {
          vertex.addEdge(new Text(edge.toString()), NullWritable.get());
        }
      }
      LOG.info("vertex edgs size: "
          + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
      context.addVertexRequest(vertex);
    }

  }

  private static void printUsage() {
    System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
    System.exit(-1);
  }

  public static void main(String[] args) throws IOException {
    if (args.length < 2)
      printUsage();

    GraphJob job = new GraphJob();

    job.setGraphLoaderClass(PageRankVertexReader.class);
    job.setVertexClass(PageRankVertex.class);
    job.addInput(TableInfo.builder().tableName(args[0]).build());
    job.addOutput(TableInfo.builder().tableName(args[1]).build());

    // default max iteration is 30
    job.setMaxIteration(30);
    if (args.length >= 3)
      job.setMaxIteration(Integer.parseInt(args[2]));

    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  }
}
			

上述代碼,說明如下:

  • 第 23 行:定義PageRankVertex類,其中:

    • 點值表示該點(網頁)的當前PageRank取值。

    • compute()方法使用迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum更新點值。

    • cleanup()方法把點及其PageRank取值寫到結果表中。

  • 第 55 行:定義PageRankVertexReader類,載入圖,將表中每一條記錄解析為一個點,記錄的第一列是起點,其他列為終點。

  • 第 88 行:主程式(main函數),定義GraphJob,指定Vertex/GraphLoader等的實現,以及最大迭代次數(預設30),並指定輸入輸出表。