随着不断增长的监控指标与数据流量,监控系统变得越来越复杂,同时对监控系统的时效性提出了更高的要求。本文介绍基于TairTS轻松搭建高并发场景的秒级监控系统。
TairTS简介
TairTS为Tair自研的Module,依托Tair(企业版)提供实时且高并发的查询、写入性能,支持对历史时序数据的更新或累加,支持时间线级别的TTL设定,保证每条时间线都可以按时间窗口自动滚动,同时采用高效的Gorilla压缩算法与特定存储,极大降低存储成本。更多信息,请参见TS。
秒级监控介绍
本文以上图为例介绍秒级监控架构。Console发布秒级监控配置,对应APP接收配置后通过MQTT协议写入到Collector中,Collector处理数据后写入到Tair数据库中。
高并发查询场景
在高并发查询场景中,TairTS不仅可以保证查询的性能,还支持降采样、属性过滤、分批查询、多种数值函数等条件下的聚合操作,满足不同业务进行多维度筛选与查看,同时支持将批量查询与聚合计算集成到单条命令中,减少网络交互,实现毫秒级响应,帮助您及时定位问题。
高并发写入场景
在高并发写入场景中,随着APP规模的增大,单点的Collector将会成为瓶颈。为此,TairTS支持对历史时序数据的原地更新或累加,保障多Collector并发写入正确性,同时节省内存空间。并发写入的代码示例如下:
import com.aliyun.tair.tairts.TairTs; import com.aliyun.tair.tairts.params.ExtsAggregationParams; import com.aliyun.tair.tairts.params.ExtsAttributesParams; import com.aliyun.tair.tairts.results.ExtsSkeyResult; import redis.clients.jedis.Jedis; public class test { protected static final String HOST = "127.0.0.1"; protected static final int PORT = 6379; public static void main(String[] args) { try { Jedis jedis = new Jedis(HOST, PORT, 2000 * 100); if (!"PONG".equals(jedis.ping())) { System.exit(-1); } TairTs tairTs = new TairTs(jedis); //Cluster模式时如下: //TairTsCluster tairTsCluster = new TairTsCluster(jedisCluster); String pkey = "cpu_load"; String skey1 = "app1"; long startTs = (System.currentTimeMillis() - 100000) / 1000 * 1000; long endTs = System.currentTimeMillis() / 1000 * 1000; String startTsStr = String.valueOf(startTs); String endTsStr = String.valueOf(endTs); tairTs.extsdel(pkey, skey1); long num = 5; //Collector A 并发更新。 for (int i = 0; i < num; i++) { double val = i; long ts = startTs + i*1000; String tsStr = String.valueOf(ts); ExtsAttributesParams params = new ExtsAttributesParams(); params.dataEt(1000000000); String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params); } ExtsAggregationParams paramsAgg = new ExtsAggregationParams(); paramsAgg.maxCountSize(10); paramsAgg.aggAvg(1000); System.out.println("Collector A并发更新后结果:"); ExtsSkeyResult rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg); for (int i = 0; i < num; i++) { System.out.println(" ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue()); } //Collector B并发更新。 for (int i = 0; i < num; i++) { double val = i; long ts = startTs + i*1000; String tsStr = String.valueOf(ts); ExtsAttributesParams params = new ExtsAttributesParams(); params.dataEt(1000000000); String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params); } System.out.println("Collector B 并发更新后结果:"); rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg); for (int i = 0; i < num; i++) { System.out.println(" ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue()); } } catch (Exception e) { e.printStackTrace(); } } }
执行结果:
Collector A并发更新后结果: ts: 1597049266000, value: 0.0 ts: 1597049267000, value: 1.0 ts: 1597049268000, value: 2.0 ts: 1597049269000, value: 3.0 ts: 1597049270000, value: 4.0 Collector B并发更新后结果: ts: 1597049266000, value: 0.0 ts: 1597049267000, value: 2.0 ts: 1597049268000, value: 4.0 ts: 1597049269000, value: 6.0 ts: 1597049270000, value: 8.0