隨著不斷增長的監控指標與資料流量,監控系統變得越來越複雜,同時對監控系統的時效性提出了更高的要求。本文介紹基於TairTS輕鬆搭建高並發情境的秒級監控系統。
TairTS簡介
TairTS為Tair自研的Module,依託Tair(企業版)提供即時且高並發的查詢、寫入效能,支援對歷史時序資料的更新或累加,支援時間軸層級的TTL設定,保證每條時間軸都可以按時間視窗自動滾動,同時採用高效的Gorilla壓縮演算法與特定儲存,極大降低儲存成本。更多資訊,請參見TS。
秒級監控介紹
本文以上圖為例介紹秒級監控架構。Console發布秒級監控配置,對應APP接收配置後通過MQTT協議寫入到Collector中,Collector處理資料後寫入到Tair資料庫中。
高並發查詢情境
在高並發查詢情境中,TairTS不僅可以保證查詢的效能,還支援降採樣、屬性過濾、分批查詢、多種數值函數等條件下的彙總操作,滿足不同業務進行多維度篩選與查看,同時支援將批量查詢與彙總計算整合到單條命令中,減少網路互動,實現毫秒級響應,協助您及時定位問題。
高並發寫入情境
在高並發寫入情境中,隨著APP規模的增大,單點的Collector將會成為瓶頸。為此,TairTS支援對歷史時序資料的原地更新或累加,保障多Collector並發寫入正確性,同時節省記憶體空間。並發寫入的程式碼範例如下:
import com.aliyun.tair.tairts.TairTs; import com.aliyun.tair.tairts.params.ExtsAggregationParams; import com.aliyun.tair.tairts.params.ExtsAttributesParams; import com.aliyun.tair.tairts.results.ExtsSkeyResult; import redis.clients.jedis.Jedis; public class test { protected static final String HOST = "127.0.0.1"; protected static final int PORT = 6379; public static void main(String[] args) { try { Jedis jedis = new Jedis(HOST, PORT, 2000 * 100); if (!"PONG".equals(jedis.ping())) { System.exit(-1); } TairTs tairTs = new TairTs(jedis); //Cluster模式時如下: //TairTsCluster tairTsCluster = new TairTsCluster(jedisCluster); String pkey = "cpu_load"; String skey1 = "app1"; long startTs = (System.currentTimeMillis() - 100000) / 1000 * 1000; long endTs = System.currentTimeMillis() / 1000 * 1000; String startTsStr = String.valueOf(startTs); String endTsStr = String.valueOf(endTs); tairTs.extsdel(pkey, skey1); long num = 5; //Collector A 並發更新。 for (int i = 0; i < num; i++) { double val = i; long ts = startTs + i*1000; String tsStr = String.valueOf(ts); ExtsAttributesParams params = new ExtsAttributesParams(); params.dataEt(1000000000); String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params); } ExtsAggregationParams paramsAgg = new ExtsAggregationParams(); paramsAgg.maxCountSize(10); paramsAgg.aggAvg(1000); System.out.println("Collector A並發更新後結果:"); ExtsSkeyResult rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg); for (int i = 0; i < num; i++) { System.out.println(" ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue()); } //Collector B並發更新。 for (int i = 0; i < num; i++) { double val = i; long ts = startTs + i*1000; String tsStr = String.valueOf(ts); ExtsAttributesParams params = new ExtsAttributesParams(); params.dataEt(1000000000); String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params); } System.out.println("Collector B 並發更新後結果:"); rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg); for (int i = 0; i < num; i++) { System.out.println(" ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue()); } } catch (Exception e) { e.printStackTrace(); } } }
執行結果:
Collector A並發更新後結果: ts: 1597049266000, value: 0.0 ts: 1597049267000, value: 1.0 ts: 1597049268000, value: 2.0 ts: 1597049269000, value: 3.0 ts: 1597049270000, value: 4.0 Collector B並發更新後結果: ts: 1597049266000, value: 0.0 ts: 1597049267000, value: 2.0 ts: 1597049268000, value: 4.0 ts: 1597049269000, value: 6.0 ts: 1597049270000, value: 8.0