准备工作
该工具通过文件流接口快速导入数据到Cassandra集群,是最快地将线下数据迁移到线上Cassandra集群方法之一,准备工作如下:
线上Cassandra集群
线下数据,SSTable格式或者csv格式。
同VPC准备一台独立的ECS,开放安全组,确保能访问Cassandra集群
1. 准备同VPC下客户端ECS
建议独立的ECS,不要和线上Cassandra集群混用,混用会影响线上服务。
2. 创建schema
$ cqlsh -f schema.cql -u USERNAME -p PASSWORD [host]
3. 准备数据
3.1 SSTable数据格式
按data/${keyspace}/${table} 格式组织目录,将SSTable数据放入目录,如下示例
ls /tmp/quote/historical_prices/
md-1-big-CompressionInfo.db md-1-big-Data.db md-1-big-Digest.crc32 md-1-big-Filter.db md-1-big-Index.db md-1-big-Statistics.db md-1-big-Summary.db md-1-big-TOC.txt
上述示例中keyspace为quota,table为historical_prices
导入数据
执行sstableloader
,在Cassandra发行包bin目录下,指定数据目录 data/${ks}/${table}
${cassandra_home}/bin/sstableloader -d <ip address of the node> data/${ks}/${table}
静候SSTable数据导入成功,使用cqlsh检查 bin/cqlsh -u USERNAME -p PASSWORD [host]
$ bin/cqlsh
cqlsh> select * from quote.historical_prices;
ticker | date | adj_close | close | high | low | open | volume
--------+---------------------------------+-----------+-----------+-----------+-----------+-----------+--------
ORCL | 2019-10-29 16:00:00.000000+0000 | 26.160000 | 26.160000 | 26.809999 | 25.629999 | 26.600000 | 181000
ORCL | 2019-10-28 16:00:00.000000+0000 | 26.559999 | 26.559999 | 26.700001 | 22.600000 | 22.900000 | 555000
3.2 csv数据格式
csv格式数据需要先将csv数据转成SSTable格式,Cassandra给我们提供了CQLSSTableWriter工具,用于生成SSTable,通过它可以将任意格式数据转化成SSTable格式。 因为csv格式也是需要自己预先组织,所以需要自己编写csv格式解析代码,然后编译执行。 该工具使用示例代码如下,完整工具参考git repo
// Prepare SSTable writer
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
// set output directory
builder.inDirectory(outputDir)
// set target schema
.forTable(SCHEMA)
// set CQL statement to put data
.using(INSERT_STMT)
// set partitioner if needed
// default is Murmur3Partitioner so set if you use different one.
.withPartitioner(new Murmur3Partitioner());
CQLSSTableWriter writer = builder.build();
//TODO: 读取csv文件,迭代读取每一行
while ((line = csvReader.read()) != null)
{
writer.addRow(ticker,
DATE_FORMAT.parse(line.get(0)),
new BigDecimal(line.get(1)),
new BigDecimal(line.get(2)),
new BigDecimal(line.get(3)),
new BigDecimal(line.get(4)),
Long.parseLong(line.get(6)),
new BigDecimal(line.get(5)));
}
writer.close();
执行自定义程序生成SSTable格式数据后,照3.1 章节导入数据。