全部產品
Search
文件中心

ApsaraDB for Cassandra - Deprecated:BulkLoad資料匯入

更新時間:Jul 06, 2024

準備工作

該工具通過檔案流介面快速匯入資料到Cassandra叢集,是最快地將線下資料移轉到線上Cassandra叢集方法之一,準備工作如下:

  • 線上Cassandra叢集

  • 線下資料,SSTable格式或者csv格式。

  • 同VPC準備一台獨立的ECS,開放安全性群組,確保能訪問Cassandra叢集

1. 準備同VPC下用戶端ECS

建議獨立的ECS,不要和線上Cassandra叢集混用,混用會影響線上服務。

2. 建立schema

$ cqlsh -f schema.cql  -u USERNAME -p PASSWORD [host]
                        

3. 準備資料

3.1 SSTable資料格式

按data/${keyspace}/${table} 格式組織目錄,將SSTable資料放入目錄,如下樣本

ls /tmp/quote/historical_prices/
md-1-big-CompressionInfo.db md-1-big-Data.db        md-1-big-Digest.crc32       md-1-big-Filter.db      md-1-big-Index.db       md-1-big-Statistics.db      md-1-big-Summary.db     md-1-big-TOC.txt
                        

上述樣本中keyspace為quota,table為historical_prices

匯入資料

執行sstableloader,在Cassandra發行包bin目錄下,指定資料目錄 data/${ks}/${table}

${cassandra_home}/bin/sstableloader -d <ip address of the node> data/${ks}/${table}
                        

靜候SSTable資料匯入成功,使用cqlsh檢查 bin/cqlsh -u USERNAME -p PASSWORD [host]

$ bin/cqlsh 
cqlsh> select * from quote.historical_prices;
​
 ticker | date                            | adj_close | close     | high      | low       | open      | volume
--------+---------------------------------+-----------+-----------+-----------+-----------+-----------+--------
   ORCL | 2019-10-29 16:00:00.000000+0000 | 26.160000 | 26.160000 | 26.809999 | 25.629999 | 26.600000 | 181000
   ORCL | 2019-10-28 16:00:00.000000+0000 | 26.559999 | 26.559999 | 26.700001 | 22.600000 | 22.900000 | 555000
                        

3.2 csv資料格式

csv格式資料需要先將csv資料轉成SSTable格式,Cassandra給我們提供了CQLSSTableWriter工具,用於產生SSTable,通過它可以將任意格式資料轉化成SSTable格式。 因為csv格式也是需要自己預先組織,所以需要自己編寫csv格式解析代碼,然後編譯執行。 該工具使用範例程式碼如下,完整工具參考git repo

        // Prepare SSTable writer
        CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
        // set output directory
        builder.inDirectory(outputDir)
               // set target schema
               .forTable(SCHEMA)
               // set CQL statement to put data
               .using(INSERT_STMT)
               // set partitioner if needed
               // default is Murmur3Partitioner so set if you use different one.
               .withPartitioner(new Murmur3Partitioner());
        CQLSSTableWriter writer = builder.build();
        //TODO: 讀取csv檔案,迭代讀取每一行
        while ((line = csvReader.read()) != null)
                {
                    writer.addRow(ticker,
                                  DATE_FORMAT.parse(line.get(0)),
                                  new BigDecimal(line.get(1)),
                                  new BigDecimal(line.get(2)),
                                  new BigDecimal(line.get(3)),
                                  new BigDecimal(line.get(4)),
                                  Long.parseLong(line.get(6)),
                                  new BigDecimal(line.get(5)));
                }
                writer.close();
                        

執行自訂程式產生SSTable格式資料後,照3.1 章節匯入資料。