import os
import sys
from typing import List, Dict
from alibabacloud_rds20140815.client import Client as Rds20140815Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_rds20140815 import models as rds_20140815_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_console.client import Client as ConsoleClient
from alibabacloud_tea_util.client import Client as UtilClient
from apscheduler.schedulers.blocking import BlockingScheduler
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> Rds20140815Client:
"""
Use your AccessKey ID and AccessKey secret to initialize a client.
@return: Client
@throws Exception
"""
config = open_api_models.Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
config.endpoint = f'rds.aliyuncs.com'
return Rds20140815Client(config)
@staticmethod
def modify_db_instance_spec(config: Dict[str, float]) -> None:
"""
Modify DB instance spec - the core task function
"""
client = Sample.create_client()
serverless_configuration = rds_20140815_models.ModifyDBInstanceSpecRequestServerlessConfiguration(
max_capacity=config['max_capacity'],
min_capacity=config['min_capacity'],
auto_pause=config['auto_pause'],
switch_force=config['switch_force']
)
modify_dbinstance_spec_request = rds_20140815_models.ModifyDBInstanceSpecRequest(
dbinstance_id='rm-bp1t8v93k6e15****',
direction='Serverless',
pay_type='Serverless',
serverless_configuration=serverless_configuration
)
runtime = util_models.RuntimeOptions(
read_timeout=50000,
connect_timeout=50000
)
try:
resp = client.modify_dbinstance_spec_with_options(modify_dbinstance_spec_request, runtime)
ConsoleClient.log(UtilClient.to_jsonstring(resp))
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
def main(args: List[str]) -> None:
"""
:rtype: object
"""
scheduler = BlockingScheduler()
config_8am = {
'max_capacity': 8,
'min_capacity': 4,
'auto_pause': False,
'switch_force': True
}
config_9am = {
'max_capacity': 8,
'min_capacity': 0.5,
'auto_pause': False,
'switch_force': True
}
scheduler.add_job(Sample.modify_db_instance_spec, 'cron', hour=7, minute=50, args=[config_8am])
scheduler.add_job(Sample.modify_db_instance_spec, 'cron', hour=9, minute=0, args=[config_9am])
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
if __name__ == '__main__':
Sample.main(sys.argv[1:])