ここでは、Log Service のデータ変換機能を使用して複雑な JSON データを変換する方法について説明します。
それぞれが配列である複数のサブキーを持つ複雑な JSON データを変換
プログラムで作成されたログは統計的な JSON 形式で書き込まれ、通常は基本情報と、それぞれが配列である複数のサブキーを含みます。 たとえば、あるサーバーは 1 分間隔でログを書き込みます。 ログには、ログを生成するサーバーとクライアントのデータ情報ステータスと統計ステータスが含まれます。
サンプルログ
__source__: 1.2.3.4 __topic__: content:{ "service": "search_service", "overall_status": "yellow", "servers": [ { "host": "1.2.3.4", "status": "green" }, { "host": "1.2.3.5", "status": "green" } ], "clients": [ { "host": "1.2.3.6", "status": "green" }, { "host": "1.2.3.7", "status": "red" } ] }
データ変換要件
生ログを
topic
で分割し、overall_type
、client_status
、およびserver_status
に分けます。各
topic
は次のように異なる情報を保存します。overall_type
:サーバー数、クライアント数、全体的なステータス (色)、およびサービス情報を格納します。client_status
:ホストの IP アドレス、ステータス、サービス情報を保存します。server_status
:ホストの IP アドレス、ステータス、サービス情報を保存します。
予想される結果
__source__: 1.2.3.4 __topic__: overall_type client_count: 2 overall_status: yellow server_count: 2 service: search_service __source__: 1.2.3.4 __topic__: client_status host: 1.2.3.7 status: red service: search_service __source__: 1.2.3.4 __topic__: client_status host: 1.2.3.6 status: green service: search_service __source__: 1.2.3.4 __topic__: server_status host: 1.2.3.4 status: green service: search_service __source__: 1.2.3.4 __topic__: server_status host: 1.2.3.5 status: green service: search_service
ソリューション
生ログを 3 つのログに分割し、さらにトピックごとにログを分割します。 分割後、3 つのログは
topic
フィールドを除いて同じ情報を保有します。e_set("__topic__", "server_status,client_status,overall_type") e_split("__topic__")
処理後のログは次のとおりです。
__source__: 1.2.3.4 __topic__: server_status // The topics in the other two logs are client_status and overall_type. Except for the topic field, all the other information in the three logs is the same. content: { ... Same as that in the raw log... }
最初のレイヤーにある
content
フィールドの JSON データを展開し、content
フィールドを削除します。e_json('content',depth=1) e_drop_fields("content")
処理後のログは次のとおりです。
__source__: 1.2.3.4 __topic__: overall_type // The topics in the other two logs are client_status and server_status. Except for the topic field, all the other information in the three logs is the same. clients: [{"host": "1.2.3.6", "status": "green"}, {"host": "1.2.3.7", "status": "red"}] overall_status: yellow servers: [{"host": "1.2.3.4", "status": "green"}, {"host": "1.2.3.5", "status": "green"}] service: search_service
トピックに
overall_type
を含むログの場合、client_count
とserver_count
の値を計算します。e_if(e_search("__topic__==overall_type"), e_compose( e_set("client_count", json_select(v("clients"), "length([*])", default=0)), e_set("server_count", json_select(v("servers"), "length([*])", default=0)) ))
処理後のログは次のとおりです。
__topic__: overall_type server_count: 2 client_count: 2
clients と servers のフィールドを削除します。
e_if(e_search("__topic__==overall_type"), e_drop_fields("clients", "servers"))
ログを
server_status
トピックでさらに分割します。e_if(e_search("__topic__==server_status"), e_compose( e_split("servers"), e_json("servers", depth=1) ))
ログは次の 2 つのログに分割されます。
__topic__: server_status servers: {"host": "1.2.3.4", "status": "green"} host: 1.2.3.4 status: green
__topic__: server_status servers: {"host": "1.2.3.5", "status": "green"} host: 1.2.3.5 status: green
servers フィールドを削除します。
e_if(e_search("__topic__==overall_type"), e_drop_fields("servers"))
ログを
client_status
トピックでさらに分割し、clients フィールドを削除します。e_if(e_search("__topic__==client_status"), e_compose( e_split("clients"), e_json("clients", depth=1), e_drop_fields("clients") ))
ログは次の 2 つのログに分割されます。
__topic__: client_status host: 1.2.3.6 status: green
__topic__: clients host: 1.2.3.7 status: red
まとめると、次のように LOG ドメイン固有言語 (DSL) ルールを使用します。
# Split the raw log by topic. e_set("__topic__", "server_status,client_status,overall_type") e_split("__topic__") e_json('content',depth=1) e_drop_fields("content") # Process the log with the topic overall_type. e_if(e_search("__topic__==overall_type"), e_compose( e_set("client_count", json_select(v("clients"), "length([*])", default=0)), e_set("server_count", json_select(v("servers"), "length([*])", default=0)) )) # Process the log with the topic server_status. e_if(e_search("__topic__==server_status"), e_compose( e_split("servers"), e_json("servers", depth=1) )) e_if(e_search("__topic__==overall_type"), e_drop_fields("servers")) # Process the log with the topic client_status. e_if(e_search("__topic__==client_status"), e_compose( e_split("clients"), e_json("clients", depth=1), e_drop_fields("clients") ))
ソリューションの最適化
上記のソリューションは、content.clients
または content.servers
フィールドが空の場合、うまく動作しません。 生ログが次のとおりであると仮定します。
__source__: 1.2.3.4
__topic__:
content:{
"service": "search_service",
"overall_status": "yellow",
"servers": [ ],
"clients": [ ]
}
上記のソリューションを使用してこの生ログを 3 つのログに分割した場合、トピック client_status
および server_status
を含むログは空になります。
__source__: 1.2.3.4
__topic__: overall_type
client_count: 0
overall_status: yellow
server_count: 0
service: search_service
__source__: 1.2.3.4
__topic__: client_status
service: search_service
__source__: 1.2.3.4
__topic__: server_status
host: 1.2.3.4
status: green
service: search_service
最適化されたソリューション 1
生ログが分割された後、
server_status
およびclient_status
トピックのログが空になるかどうかを確認します。 空である場合、ログを破棄します。# Check whether the log with the topic server_status is empty. If so, discard it. If not, retain it. e_keep(op_and(e_search("__topic__==server_status"), json_select(v("servers"), "length([*])"))) # Check whether the log with the topic client_status is empty. If so, discard it. If not, retain it. e_keep(op_and(e_search("__topic__==client_status"), json_select(v("clients"), "length([*])")))
まとめると、次のように LOG DSL ルールを使用します。
# Split the raw log by topic. e_set("__topic__", "server_status,client_status,overall_type") e_split("__topic__") e_json('content',depth=1) e_drop_fields("content") # Process the log with the topic overall_type. e_if(e_search("__topic__==overall_type"), e_compose( e_set("client_count", json_select(v("clients"), "length([*])", default=0)), e_set("server_count", json_select(v("servers"), "length([*])", default=0)) )) # (New) Check whether the log with the topic server_status is empty. If so, discard it. If not, retain it. e_keep(op_and(e_search("__topic__==server_status"), json_select(v("servers"), "length([*])"))) # Process the log with the topic server_status. e_if(e_search("__topic__==server_status"), e_compose( e_split("servers"), e_json("servers", depth=1) )) e_if(e_search("__topic__==overall_type"), e_drop_fields("servers")) # (New) Check whether the log with the topic client_status is empty. If so, discard it. If not, retain it. e_keep(op_and(e_search("__topic__==client_status"), json_select(v("clients"), "length([*])"))) # Process the log with the topic client_status. e_if(e_search("__topic__==client_status"), e_compose( e_split("clients"), e_json("clients", depth=1), e_drop_fields("clients") ))
最適化されたソリューション 2
生ログを分割する前に、フィールドが空かどうかを確認します。 フィールドが空でない場合は、フィールドに基づいて生ログを分割します。
# Set the initial topic. e_set("__topic__", "server_status") # If the content.servers field is not empty, split the raw log to obtain a log with the topic server_status. e_if(json_select(v("content"), "length(servers[*])"), e_compose( e_set("__topic__", "server_status,overall_type"), e_split("__topic__") )) # If the content.clients field is not empty, further split the raw log to obtain a log with the topic client_status. e_if(op_and(e_search("__topic__==overall_type"), json_select(v("content"), "length(clients[*])")), e_compose( e_set("__topic__", "client_status,overall_type"), e_split("__topic__") ))
まとめると、次のように LOG DSL ルールを使用します。
# Split the raw log. e_set("__topic__", "server_status") # If the content.servers field is not empty, split the raw log to obtain a log with the topic server_status. e_if(json_select(v("content"), "length(servers[*])"), e_compose( e_set("__topic__", "server_status,overall_type"), e_split("__topic__") )) # If the content.clients field is not empty, further split the raw log to obtain a log with the topic client_status. e_if(op_and(e_search("__topic__==overall_type"), json_select(v("content"), "length(clients[*])")), e_compose( e_set("__topic__", "client_status,overall_type"), e_split("__topic__") )) # Process the log with the topic overall_type. e_if(e_search("__topic__==overall_type"), e_compose( e_set("client_count", json_select(v("clients"), "length([*])", default=0)), e_set("server_count", json_select(v("servers"), "length([*])", default=0)) )) # Process the log with the topic server_status. e_if(e_search("__topic__==server_status"), e_compose( e_split("servers"), e_json("servers", depth=1) )) e_if(e_search("__topic__==overall_type"), e_drop_fields("servers")) # Process the log with the topic client_status. e_if(e_search("__topic__==client_status"), e_compose( e_split("clients"), e_json("clients", depth=1), e_drop_fields("clients") ))
ソリューションの比較
ソリューション 1 は、未処理のログから空のログを取得した後にそれらを削除するため、ロジックが冗長です。 ただし、ルールはシンプルで簡単に保守できます。 デフォルトではこのソリューションを使用することをお勧めします。
ソリューション 2 は、分割する前に空のフィールドをチェックするため、処理効率が良くなります。 ただし、このソリューションは冗長なルールを使用します。 このソリューションは、特定のシナリオでのみ使用することを推奨します。たとえば、生ログが分割された後に多数の追加イベントが生成される可能性がある場合などです。
複数レイヤーのネストされた配列を持つ複雑な JSON データの変換
例として、複数レイヤーのネストされた配列を持つ、次のような複雑な JSON データを取り上げます。 login_histories
に保存されているログイン情報の中で、users
フィールド内のさまざまなオブジェクトを、ログインイベント別に分割するとします。
生ログ
__source__: 1.2.3.4 __topic__: content:{ "users": [ { "name": "user1", "login_histories": [ { "date": "2019-10-10 0:0:0", "login_ip": "1.1.1.1" }, { "date": "2019-10-10 1:0:0", "login_ip": "1.1.1.1" }, { ... More logon information... } ] }, { "name": "user2", "login_histories": [ { "date": "2019-10-11 0:0:0", "login_ip": "1.1.1.2" }, { "date": "2019-10-11 1:0:0", "login_ip": "1.1.1.3" }, { ... More logon information... } ] }, { ... More users... } ] }
予想される分割後のログ
__source__: 1.2.3.4 name: user1 date: 2019-10-11 1:0:0 login_ip: 1.1.1.1 __source__: 1.2.3.4 name: user1 date: 2019-10-11 0:0:0 login_ip: 1.1.1.1 __source__: 1.2.3.4 name: user2 date: 2019-10-11 0:0:0 login_ip: 1.1.1.2 __source__: 1.2.3.4 name: user2 date: 2019-10-11 1:0:0 login_ip: 1.1.1.3 ... More logs...
ソリューション
ログを分割し、
content
フィールドのデータを、users
の内容を元に展開します。e_split("content", jmes='users[*]', output='item') e_json("item",depth=1)
処理後のログは次のとおりです。
__source__: 1.2.3.4 __topic__: content:{... Same as that in the raw log...} item: {"name": "user1", "login_histories": [{"date": "2019-10-10 0:0:0", "login_ip": "1.1.1.1"}, {"date": "2019-10-10 1:0:0", "login_ip": "1.1.1.1"}]} login_histories: [{"date": "2019-10-10 0:0:0", "login_ip": "1.1.1.1"}, {"date": "2019-10-10 1:0:0", "login_ip": "1.1.1.1"}] name: user1 __source__: 1.2.3.4 __topic__: content:{... Same as that in the raw log...} item: {"name": "user2", "login_histories": [{"date": "2019-10-11 0:0:0", "login_ip": "1.1.1.2"}, {"date": "2019-10-11 1:0:0", "login_ip": "1.1.1.3"}]} login_histories: [{"date": "2019-10-11 0:0:0", "login_ip": "1.1.1.2"}, {"date": "2019-10-11 1:0:0", "login_ip": "1.1.1.3"}] name: user2
ログを分割し、データを
login_histories
に基づいて展開します。e_split("login_histories") e_json("login_histories", depth=1)
処理後のログは次のとおりです。
__source__: 1.2.3.4 __topic__: content: {... Same as that in the raw log...} date: 2019-10-11 0:0:0 item: {"name": "user2", "login_histories": [{"date": "2019-10-11 0:0:0", "login_ip": "1.1.1.2"}, {"date": "2019-10-11 1:0:0", "login_ip": "1.1.1.3"}]} login_histories: {"date": "2019-10-11 0:0:0", "login_ip": "1.1.1.2"} login_ip: 1.1.1.2 name: user2 __source__: 1.2.3.4 __topic__: content: {... Same as that in the raw log...} date: 2019-10-11 1:0:0 item: {"name": "user2", "login_histories": [{"date": "2019-10-11 0:0:0", "login_ip": "1.1.1.2"}, {"date": "2019-10-11 1:0:0", "login_ip": "1.1.1.3"}]} login_histories: {"date": "2019-10-11 1:0:0", "login_ip": "1.1.1.3"} login_ip: 1.1.1.3 name: user2 __source__: 1.2.3.4 __topic__: content: {... Same as that in the raw log...} date: 2019-10-10 1:0:0 item: {"name": "user1", "login_histories": [{"date": "2019-10-10 0:0:0", "login_ip": "1.1.1.1"}, {"date": "2019-10-10 1:0:0", "login_ip": "1.1.1.1"}]} login_histories: {"date": "2019-10-10 1:0:0", "login_ip": "1.1.1.1"} login_ip: 1.1.1.1 name: user1 __source__: 1.2.3.4 __topic__: content: {... Same as that in the raw log...} date: 2019-10-10 0:0:0 item: {"name": "user1", "login_histories": [{"date": "2019-10-10 0:0:0", "login_ip": "1.1.1.1"}, {"date": "2019-10-10 1:0:0", "login_ip": "1.1.1.1"}]} login_histories: {"date": "2019-10-10 0:0:0", "login_ip": "1.1.1.1"} login_ip: 1.1.1.1 name: user1
無関係なフィールドを削除します。
e_drop_fields("content", "item", "login_histories")
処理後のログは次のとおりです。
__source__: 1.2.3.4 __topic__: name: user1 date: 2019-10-11 1:0:0 login_ip: 1.1.1.1 __source__: 1.2.3.4 __topic__: name: user1 date: 2019-10-11 0:0:0 login_ip: 1.1.1.1 __source__: 1.2.3.4 __topic__: name: user2 date: 2019-10-11 0:0:0 login_ip: 1.1.1.2 __source__: 1.2.3.4 __topic__: name: user2 date: 2019-10-11 1:0:0 login_ip: 1.1.1.3
まとめると、次のように LOG DSL ルールを使用します。
e_split("content", jmes='users[*]', output='item') e_json("item",depth=1) e_split("login_histories") e_json("login_histories", depth=1) e_drop_fields("content", "item", "login_histories")
結論:上記と同様の要件がある場合は、ログを分割し、指定されたフィールドに基づいてデータを展開してから、無関係なフィールドを削除します。