このトピックでは、スケジュールされた SQL ジョブを別のプロジェクトに同期する方法について説明します。
構成ファイル
[
{
"target_schedule_sql_config": {
"s_sql_job_name": "sql-1704166982-166294",
"project": "sls-ml-demo",
"endpoint": "ap-southeast-1.log.aliyuncs.com"
},
"newly_job_config": {
"description": "",
"displayName": "",
"fromTime": 0,
"toTime": 0,
"source": {
"project": "sls-ml-demo",
"logstore": "cdn_access_log",
"endpoint": "ap-southeast-1.log.aliyuncs.com",
"roleArn": "acs:ram::xxxxx:role/aliyunlogetlrole"
},
"destination": {
"project": "sls-ml-demo",
"logstore": "test_temp",
"endpoint": "ap-southeast-1-intranet.log.aliyuncs.com",
"roleArn": "acs:ram::xxxxx:role/aliyunlogetlrole"
}
}
}
]target_schedule_sql_configは、スケジュールされた SQL ジョブの基本情報を指定します。s_sql_job_nameはジョブ名です。newly_job_configは、スケジュールされた SQL ジョブをコピーするプロジェクトを指定します。descriptionとdisplayNameが空の文字列の場合、元のジョブの対応するフィールドが使用されます。fromTimeとtoTimeは、それぞれ新しく作成されたタスクの開始時刻と終了時刻を設定するために使用されます。値が 0 以下の場合、元のジョブの時間間隔が使用されます。sourceは、新しいジョブが作成されるプロジェクトを設定するために使用されます。destinationは、新しいジョブの SQL 結果データが格納されるログストアを設定するために使用されます。destinationとsourceが同じリージョンにある場合は、トラフィックコストを削減するためにdestinationのendpointを 内部エンドポイント に設定します。roleArnは、コピー先に書き込む権限を持つロールの ARN を指定します。
サンプルコード
# -*- coding: utf-8 -*-
import json
import time
from aliyun.log import *
from aliyun.log.scheduled_sql import *
global_ak_id = ""
global_ak_key = ""
client_map = {}
def get_sls_client(endpoint: str) -> LogClient:
# エンドポイントに対応する LogClient を取得または作成します
sls_client = LogClient(endpoint, global_ak_id, global_ak_key)
if endpoint in client_map.keys():
return client_map[endpoint]
client_map[endpoint] = sls_client
return sls_client
def check_store_item(store_item: dict):
# store_item のキーと値をチェックします
key_names = ["project", "logstore", "endpoint"]
for key in key_names:
if key not in store_item.keys():
raise ValueError(f"logstore config miss key {key}")
if len(store_item[key]) == "":
raise ValueError(f"logstore config miss value [{key}]")
def get_schedule_sql_job_config(endpoint: str, project: str, ssql_job_name: str) -> dict:
# スケジュールされた SQL ジョブの構成を取得します
sls_client = get_sls_client(endpoint)
ssql_job_resp = sls_client.get_scheduled_sql(project, ssql_job_name)
ssql_job = ssql_job_resp.get_scheduled_sql()
# print(type(ssql_job))
# print(json.dumps(ssql_job))
return ssql_job
def make_schedule_sql_name() -> str:
# スケジュールされた SQL ジョブの名前を作成します
# sql-1704166982-166294
now_stamp = int(time.time())
postfix = time.time_ns() % 1000000
job_name = f"sql-{now_stamp}-{postfix}"
return job_name
def create_schedule_sql(s_sql_config: dict):
"""
1. 元の S-SQL からクエリ部分の構成のみをコピーします。新しいタスクでは、対応する開始時刻と終了時刻を確認する必要があります。
2. ソースとデスティネーションのログストアが同じリージョンにあるかどうかを確認します。同じリージョンにある場合は、内部アドレスを使用します。
異なるリージョンにある場合は、パブリックエンドポイントを使用する必要がありますが、パブリックエンドポイントにはコストがかかります。
"""
def make_scheduled_sql_schedule(origin_ssql_job: dict):
# スケジュールされた SQL ジョブのスケジュールを作成します
origin_job_schedule = origin_ssql_job["schedule"]
job_schedule = JobSchedule()
job_schedule.setJobName("")
job_schedule.setDisplayName("")
job_schedule.setDescription("")
job_schedule.setType(origin_job_schedule["type"])
job_schedule.setInterval(origin_job_schedule["interval"])
job_schedule.setDelay(origin_job_schedule["delay"])
job_schedule.setRunImmediately(origin_job_schedule["runImmediately"])
if "hour" in origin_job_schedule.keys():
job_schedule.setHour(origin_job_schedule["hour"])
if "dayOfWeek" in origin_job_schedule.keys():
job_schedule.setDayOfWeek(origin_job_schedule["dayOfWeek"])
if "timeZone" in origin_job_schedule.keys():
job_schedule.setTimeZone(origin_job_schedule["timeZone"])
if "cronExpression" in origin_job_schedule.keys():
job_schedule.setCronExpression(origin_job_schedule["cronExpression"])
return job_schedule
def make_scheduled_sql_config(origin_ssql_job: dict, from_time: int, to_time: int, source_store_config: dict, dest_store_config: dict):
# スケジュールされた SQL ジョブの構成を作成します
source_role_arn = source_store_config["roleArn"]
dest_role_arn = dest_store_config["roleArn"]
origin_job_config = origin_ssql_job["configuration"]
schedule_sql_config = ScheduledSQLConfiguration()
schedule_sql_config.setScript(origin_job_config["script"])
schedule_sql_config.setSqlType(origin_job_config["sqlType"])
schedule_sql_config.setRoleArn(origin_job_config["roleArn"])
if len(source_role_arn) > 0:
schedule_sql_config.setRoleArn(source_role_arn)
schedule_sql_config.setDestRoleArn(origin_job_config["destRoleArn"])
if len(dest_role_arn) > 0:
schedule_sql_config.setDestRoleArn(dest_role_arn)
schedule_sql_config.setSourceLogstore(origin_job_config["sourceLogstore"])
if len(source_store_config["logstore"]) > 0:
schedule_sql_config.setSourceLogstore(source_store_config["logstore"])
schedule_sql_config.setDestEndpoint(origin_job_config["destEndpoint"])
schedule_sql_config.setDestProject(origin_job_config["destProject"])
schedule_sql_config.setDestLogstore(origin_job_config["destLogstore"])
schedule_sql_config.setDestRoleArn(origin_job_config["destRoleArn"])
if len(dest_store_config["project"]) > 0:
schedule_sql_config.setDestProject(dest_store_config["project"])
if len(dest_store_config["logstore"]) > 0:
schedule_sql_config.setDestLogstore(dest_store_config["logstore"])
if len(dest_store_config["endpoint"]) > 0:
schedule_sql_config.setDestEndpoint(dest_store_config["endpoint"])
schedule_sql_config.setMaxRetries(origin_job_config["maxRetries"])
schedule_sql_config.setMaxRunTimeInSeconds(origin_job_config["maxRunTimeInSeconds"])
schedule_sql_config.setDataFormat(origin_job_config["dataFormat"])
schedule_sql_config.setResourcePool(origin_job_config["resourcePool"])
schedule_sql_config.setFromTime(origin_job_config["fromTime"])
schedule_sql_config.setFromTimeExpr(origin_job_config["fromTimeExpr"])
if from_time > 0:
schedule_sql_config.setFromTime(from_time)
schedule_sql_config.setToTime(origin_job_config["toTime"])
schedule_sql_config.setToTimeExpr(origin_job_config["toTimeExpr"])
if to_time > 0:
schedule_sql_config.setToTime(to_time)
schedule_sql_config.setParameters(origin_job_config["parameters"])
return schedule_sql_config
target_job_config = s_sql_config["target_schedule_sql_config"]
ssql_job_name = target_job_config["s_sql_job_name"]
project = target_job_config["project"]
endpoint = target_job_config["endpoint"]
ssql_job = get_schedule_sql_job_config(endpoint, project, ssql_job_name)
newly_job_config = s_sql_config["newly_job_config"]
source_config = newly_job_config["source"]
dest_config = newly_job_config["destination"]
check_store_item(source_config)
check_store_item(dest_config)
from_time, to_time = newly_job_config["fromTime"], newly_job_config["toTime"]
schedule_sql_config = make_scheduled_sql_config(ssql_job, from_time, to_time, source_config, dest_config)
job_schedule = make_scheduled_sql_schedule(ssql_job)
scheduled_sql = ScheduledSQL()
job_name = make_schedule_sql_name()
scheduled_sql.setName(job_name)
scheduled_sql.setConfiguration(schedule_sql_config)
scheduled_sql.setSchedule(job_schedule)
if len(newly_job_config["description"]) > 0:
scheduled_sql.setDescription(newly_job_config["description"])
else:
scheduled_sql.setDescription(ssql_job["description"])
if len(newly_job_config["displayName"]) > 0:
scheduled_sql.setDisplayName(newly_job_config["displayName"])
else:
scheduled_sql.setDisplayName(ssql_job["displayName"])
sls_client = get_sls_client(source_config["endpoint"])
sls_client.create_scheduled_sql(source_config["project"], scheduled_sql)
print(f"sync to \n\tsrc_project {source_config}\n\tdest_project {dest_config}\n\tjob_name {job_name}")
if __name__ == "__main__":
sync_store_config_path = "./sls_tools/sync_ssql.json"
with open(sync_store_config_path, "r") as reader:
sync_map = json.load(reader)
for ssql_config in sync_map:
try:
create_schedule_sql(ssql_config)
except Exception as e:
print(e)