全部产品
Search
文档中心

实时数仓Hologres:Hologres Dynamic Table运维刷新任务

更新时间:Sep 18, 2024

当Dynamic Table的数据源基表发生变化时,需要通过刷新Dynamic Table来更新数据。Dynamic Table将根据设定的刷新开始时间和刷新间隔,自动在后台执行刷新任务。本文将为您介绍如何查看和维护Dynamic Table的刷新任务。

查看刷新任务

查看运行中的刷新任务

通过hologres.hg_dynamic_table_refresh_activity查看

您可以通过hologres.hg_dynamic_table_refresh_activity查看正在运行中的刷新任务(包括全量刷新和增量刷新),以及资源消耗等。hologres.hg_dynamic_table_refresh_activity系统表字段介绍详情,请参见hologres.hg_dynamic_table_refresh_activity系统表

--查看当前正在运行的刷新任务
SELECT
    pid,
    query_id,
    refresh_mode,
    'RUNNING' as status,
    refresh_start,
    extract(epoch from duration) as duration, -- seconds
    serverless_queue_time_ms::bigint / 1000 AS serverless_queue_time_sec,
    serverless_resource_used_time_ms::bigint / 1000 AS serverless_resource_used_time_sec,
    serverless_allocated_cores
FROM
    hologres.hg_dynamic_table_refresh_activity
WHERE datname = '${database}'
  AND table_write = quote_ident('${schema}') || '.' || quote_ident('${tableName}')
ORDER BY refresh_start DESC
limit 2000;

通过hg_stat_activity查看

您可以通过hg_stat_activity系统视图查看正在运行中的刷新任务,在hg_stat_activity中不同的刷新模式显示存在差异:

  • 全量刷新:会展示INSERT语句。

  • 增量刷新:会展示Refresh任务。

通过监控指标查看刷新任务

您可以通过查看QPS、RPS、Latency等指标,确认Dynamic Table刷新任务的执行情况,其中Command Type为refresh,即为Dynamic Table的刷新任务。关于监控指标详情,请参见Hologres管控台的监控指标

若Dynamic Table刷新任务,在Serverless Computing资源中运行,您也可以Serverless Computing相关指标中进行查看。

说明

Dynamic Table刷新任务支持通过云监控,创建告警规则,详情请参见云监控

查看历史刷新任务

通过hologres.hg_dynamic_table_refresh_history查看

hologres.hg_dynamic_table_refresh_history系统表会记录近一个月,所有Dynamic Table刷新任务(包括全量刷新、增量刷新、手动刷新)的历史信息。hologres.hg_dynamic_table_refresh_history系统表字段介绍详情,请参见hologres.hg_dynamic_table_refresh_history 系统表

说明
  • 数据默认保留近一个月的记录,无法查询一个月之前的数据。

  • 表Owner仅能查看自身的刷新历史,而Superuser则具备查看所有刷新记录的权限。

  • 示例1:查看增量刷新过去一天的记录。

    --示例1:查看增量刷新过去一天的记录
    SELECT
        query_id,
        refresh_mode,
        status,
        refresh_start,
        duration,
        refresh_latency / 1000 AS refresh_latency_second,
        serverless_allocated_cores,
        queue_time_ms::bigint / 1000 AS queue_time_second,
        serverless_resource_used_time_ms::bigint / 1000 AS serverless_resource_used_time_second
    FROM
        hologres.hg_dynamic_table_refresh_history
    WHERE
        refresh_start >= CURRENT_DATE - INTERVAL '1 day'
        AND dynamic_table_name = '<dynamic_table>'
        AND refresh_mode = 'incremental'
    ORDER BY
        refresh_start DESC
    limit 100;
    
  • 示例2:查看当前实例,过去1天所有的刷新任务。

    --查看实例内过去1天所有的刷新记录
    SELECT 
    query_id,
        refresh_mode,
        status,
        refresh_start,
        duration,
        refresh_latency / 1000 AS refresh_latency_second,
        serverless_allocated_cores,
        queue_time_ms::bigint / 1000 AS queue_time_second,
        serverless_resource_used_time_ms::bigint / 1000 AS serverless_resource_used_time_second
    FROM hologres.hg_dynamic_table_refresh_history where refresh_start >= CURRENT_DATE - INTERVAL '1 day'
  • 示例3:查看指定表过去一天的刷新记录。

    --查看某个表过去一天的刷新记录
    SELECT 
    query_id,
        refresh_mode,
        status,
        refresh_start,
        duration,
        refresh_latency / 1000 AS refresh_latency_second,
         serverless_allocated_cores,
        queue_time_ms::bigint / 1000 AS queue_time_second,
        serverless_resource_used_time_ms::bigint / 1000 AS serverless_resource_used_time_second
    FROM hologres.hg_dynamic_table_refresh_history where schema_name='<scehma_name>' and dynamic_table_name='<dynamic_table>' and  refresh_start >= CURRENT_DATE - INTERVAL '1 day'

通过慢Query日志查看

您可以通过慢Query日志查看Dynamic Table的刷新任务,Command Type为refresh。通过慢Query日志查看详情,请参见慢Query日志查看与分析

查看刷新任务的执行计划

与普通Query一致,支持通过EXPLAIN和EXPLAIN ANALYZE查看刷新任务的执行计划和运行信息,以便于分析刷新任务的性能瓶颈,帮助业务进一步Query调优。示例如下:

 explain refresh dynamic table hmtest.dt_order_lineitem;
                                                                                                  QUERY PLAN                                                 
                                                 
-------------------------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------
 Gather  (cost=0.00..10.13 rows=1 width=16)
   ->  Insert  (cost=0.00..10.13 rows=1 width=16)
         ->  Redistribution  (cost=0.00..10.11 rows=1 width=16)
               ->  Final HashAggregate  (cost=0.00..10.11 rows=1 width=16)
                     Group Key: orders.o_orderpriority
                     ->  Redistribution  (cost=0.00..10.11 rows=10 width=16)
                           Hash Key: orders.o_orderpriority
                           ->  Partial HashAggregate  (cost=0.00..10.11 rows=10 width=16)
                                 Group Key: orders.o_orderpriority
                                 ->  Hash Left Semi Join  (cost=0.00..10.11 rows=1000 width=8)
                                       Hash Cond: (orders.o_orderkey = lineitem.l_orderkey)
                                       ->  Redistribution  (cost=0.00..5.03 rows=1000 width=16)
                                             Hash Key: orders.o_orderkey
                                             ->  Local Gather  (cost=0.00..5.01 rows=1000 width=16)
                                                   ->  Seq Scan on orders  (cost=0.00..5.01 rows=1000 width=16)
                                                         Filter: ((o_orderdate >= '1996-07-01 00:00:00+08'::timestamp with time zone) AND (o_orderdate < '199
6-10-01 00:00:00+08'::timestamp with time zone))
                                       ->  Hash  (cost=5.03..5.03 rows=1000 width=8)
                                             ->  Redistribution  (cost=0.00..5.03 rows=1000 width=8)
                                                   Hash Key: lineitem.l_orderkey
                                                   ->  Local Gather  (cost=0.00..5.03 rows=1000 width=8)
                                                         ->  Seq Scan on lineitem  (cost=0.00..5.03 rows=1000 width=8)
                                                               Filter: (l_commitdate < l_receiptdate)
 Optimizer: HQO version 2.1.0
(23 rows)

手动刷新

支持对Dynamic Table执行手动刷新,语法如下:

refresh DYNAMIC TABLE <dynamic_schema_name.dynamic_table_name>;
说明

如果表属性中设置了自动刷新,同时又执行了手动刷新,则相当于并行执行刷新操作。两者均可正常执行,系统最终将确保只有一份最新的数据,不会产生多份数据的情况。

设置刷新超时时长

与普通Query一样,Dynamic Table的刷新任务也支持设置超时时长。

表级别设置

创建Dynamic Table表时,设置刷新任务超时时长,对该表的所有刷新任务都生效。以下SQL代码以tpch_10g公共数据集为例,在执行之前,请确保已成功导入tpch_10g公共数据集。具体操作,请参见新建公共数据集导入任务

--创建表时设置刷新任务超时时间。
CREATE DYNAMIC TABLE tpch_q1_batch
  WITH (
    refresh_mode='full',
    auto_refresh_enable='true',
    full_auto_refresh_interval='1 hours',
    refresh_guc_statement_timeout='30 mins'--刷新超时时间为30 mins
       ) 
AS
  SELECT
        l_returnflag,
        l_linestatus,
        SUM(l_quantity) AS sum_qty,
        SUM(l_extendedprice) AS sum_base_price,
        SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
        SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
        AVG(l_quantity) AS avg_qty,
        AVG(l_extendedprice) AS avg_price,
        AVG(l_discount) AS avg_disc,
        COUNT(*) asAScount_order
FROM
        hologres_dataset_tpch_10.lineitem
WHERE
        l_shipdate <= DATE '1998-12-01' - INTERVAL '120' DAY
GROUP BY
        l_returnflag,
        l_linestatus;

Session级别设置

手动设置超时时长,SQL示例如下。

SET statement_timeout = <time>;
refresh DYNAMIC TABLE <dynamic_schema_name.dynamic_table_name>;

关于更多设置详情,请参见修改活跃Query超时时间

取消刷新任务

取消正在运行的刷新任务

如果您发现刷新任务长时间运行未结束或出现卡顿等问题,可以通过pg_cancel_backend取消正在运行的刷新任务。

您可以通过如下命令,取消单个正在运行的刷新任务。

// pid为刷新任务ID
SELECT pg_cancel_backend(<pid>);

上述参数pid,为刷新任务ID,您可以通过查看刷新任务,获取刷新任务ID(query_id),详情请参见查看刷新任务

说明

批量取消正在运行的刷新任务,同普通Query的方式相同,详情请参见终止Query

取消表的所有刷新任务

如果Dynamic Table已经设置了刷新任务,您可以通过ALTER DYNAMIC TABLE命令,在表级别取消后续所有的刷新任务。

ALTER DYNAMIC TABLE [IF EXISTS ] [<schema>.]<table_name> set (auto_refresh_enable=false);
重要

请谨慎操作,否则可能导致后续数据无法更新。