程序员

云学堂在使用 MaxCompute + DataWorks 中的降费提效实践

作者:admin 2021-07-08 我要评论

本文作者 云学堂大数据研发部 一、企业背景 云学堂 全称 江苏云学堂网络科技有限公司 作为一家教育科技企业 为客户提供数字化企业学习解决方案服务 核心产品和服...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)

本文作者 云学堂大数据研发部

一、企业背景

云学堂 全称 江苏云学堂网络科技有限公司 作为一家教育科技企业 为客户提供数字化企业学习解决方案服务 核心产品和服务包括 软件SaaS服务、内容解决方案服务、数字化企业学习咨询和运营服务 是企业培训领域的创新型平台公司。

自2011年成立以来 云学堂已服务数万家中国企业 其中包括招行、中粮、一汽等世界500强及新希望六和、同仁堂、贝壳、OPPO等在内的数千家行业头部企业 覆盖千万学员用户 获得业界一致认可与好评。

云学堂现有员工千余人 汇集互联网、企业服务、人才培养与发展等各个领域的精英人才。公司实行北京和苏州双总部运行 营销中心和内容中心设于北京 研发中心设于苏州 同时在上海、广州、深圳、南京、杭州、武汉、青岛、成都、厦门、重庆、西安、济南、石家庄、南昌、合肥、郑州、长沙、天津、沈阳等主要城市设有分支机构。截止2021年3月 云学堂已完成E轮融资 估值超10亿美元。


二、业务需求和痛点

目前云学堂数据团队 负责构建覆盖全公司所有产品数据的大一统数据仓库、数据湖 全面满足内部BI数据分析 以及服务客户的业务数据计算及各种数据应用的开发工作 需求的量比较大也比较复杂 受制于团队规模 为了满足多且复杂的数据计算需求。需要数据团队能更多从数据集群运维方面解放出来 更多的关注业务本身 从而提高业务需求的承载面及落地上线效率 更好的服务内外部的数据用户。


三、实现方案

从2020年下半年我们开始使用 MaxCompute DataWorks 最开始是探索试验 做了一些ETL以及报表的迁移。后来因为业务的需求直接开始使用 MaxCompute DataWorks 开发新的业务 由于前期使用经验不足 所以在项目出现大量的不规范以及不合理的资源配置 导致使用成本比较高 且在调度上时间分配不合理 效率比较低。后来项目进展到一定阶段后 我们又回头对之前不足地方进行了以下改进

数据集成任务 通过对表数据量大小的统计估算 以及源表的网络以及配置信息的分析 合理划分独享调度资源组 独享数据集成资源组 以及按照表优先级顺序调度等多种方式 压缩了总同步时间到合理的范围。也结合下游数据使用job的需求对表的存储形式 di/df/dd 进行了调整 降低了存储成本 提升了下游job对表数据的使用效率。


网络带宽的优化 通过分析数据集成的甘特图以及对应十点网络带宽的使用情况 发现多个数据集成并发打满了整个数据通道的带宽 我们也对宽带进行了调整优化。


Spark任务 我们的报表业务中大部分都是spark任务 我们分析了任务的扫描数据量 运行时间和成本 对资源的使用进行了优化 代码也进行了相应的重构 在保证效率的前提下 大幅降低了开销。


MaxCompute SQL任务 这个类型的前期占比不高 但是都是一些有大数据量扫描的任务 这方面主要是结合上游表数据存储方式的调整以及对分区剪裁的注重 也实现了降费提效 也为后期更多的 SQL 优化提供了方向。


下面对各个点进行更详细的介绍。

数据集成任务的优化

数据集成任务的费用主要来自于独享集成资源组 独享调度资源组。由于 DataWorks 没有开放数据集成的执行日志api 所以我们每日爬取作业运维中心界面的周期任务和周期实例信息并写入 MaxCompute 表进行统计分析 统计每个时间段的同步速率和资源使用情况 使用阿里生态的DATAV生成每日数据集成实例的甘特图和时段曲线图。

CREATE TABLE IF NOT EXISTS odps_di_daily_instance_info_stat_di 
 stat_date STRING COMMENT 统计日期 
 ,project_name STRING COMMENT project name 
 ,table_name STRING COMMENT table name 
 ,node_name STRING COMMENT node name 
 ,begin_waitres_time DATETIME COMMENT 实例开始等待资源的时间 
 ,begin_running_time DATETIME COMMENT 实例开始运行的时间 
 ,finish_time DATETIME COMMENT 实例运行结束的时间 
 ,waitres_times BIGINT COMMENT 实例等待资源花费的时间 
 ,runtimes BIGINT COMMENT 实例完成花费的时间 
 ,res_group_name STRING COMMENT 调度资源 
 ,dires_group_name STRING COMMENT 数据集成资源 
 ,direction STRING COMMENT 进出odps的方向 
 ,concurrent INT COMMENT 数据集成并发数量 
 ,total_records BIGINT COMMENT 传输总记录数 
 ,error_records BIGINT COMMENT 传输错误记录数 
 ,total_bytes BIGINT COMMENT 传输总byte数 
 ,speed_records DOUBLE COMMENT 每秒传输记录数 
 ,speed_bytes DOUBLE COMMENT 每秒传输byte数 
 ,create_time DATETIME COMMENT create time 
 ,update_time DATETIME COMMENT update time 
COMMENT 每日数据集成实例运行情况表 
PARTITIONED BY 
 ds STRING COMMENT 分区 
LIFECYCLE 7200
;


甘特图

image2021-7-1_16-39-24.png

通过甘特图 我们把占用时间较长的数据集成任务的调度时间往前调整 并适当调整同步并发数和调度资源组 降低作业的等待时间 提高资源组的利用率。


曲线图image2021-7-1_16-42-44.png
通过曲线图我们获取不同数据源在不同时间点的同步速度 结合实际的业务需求 对数据源进行分流 提高并发 提高数据集成资源组的利用率。


存储优化
结合下游的使用需求 我们对一些进行增量计算的大表 拆解成dd或di表 同时将其每日完整df表改成周/月快照 这样既减少了存储费用 同时也加快了下游增量计算job的读取速度 减少了下游的数据读取量 减少了下游的成本。


网络带宽的优化

我们产品的数据源 有的位于aliyun VPC 有的位于UCloud的机房 拉取Ucloud的数据源时 多个大表同时拉取数据 会直接将200M带宽占满 导致数据传输速度严重受限 我们考虑费用因素 我们采用3条独立的200M数据通道 并将Ucloud机房的数据集成任务均匀分布到3个200M数据通道中。解决了宽带不足的问题 同时也比租用单条600M带宽要节省很多费用。


Spark作业的优化

目前我们的每日作业中 大约80%都是Spark脚本。按量计费的Spark脚本成本主要来自于核时和内存时 作业使用的核数越多 内存越大 使用时长越长 费用越高。这个就需要借助阿里提供的数据 每个项目空间下的Information Schema视图中的TASKS_HISTORY会记录项目空间内已完成的作业详情 且保留近14天数据。我们将TASKS_HISTORY中的数据备份到ods_ops空间中 基于该数据统计TOP费用的job 并针对其进行优化。同时对Information Schema视图中的TASKS的实时状况每隔5分钟进行统计并落地 以便后期对job进行优化。

INSERT OVERWRITE TABLE ods_cu.odps_task_cost_info_di PARTITION(ds)
SELECT b.task_schema
 ,b.task_name
 ,b.task_type
 ,b.inst_id
 ,b. status 
 ,b.owner_id
 ,u.user_name AS owner_name
 ,b.node_id
 ,b.node_name
 ,b.start_time
 ,b.end_time
 ,b.signature
 ,b.input_gb
 ,b.complexity
 ,b.cost_cpu_hours
 ,b.cost_mem_gb_hours
 ,CASE WHEN b.task_type SQL THEN CAST(b.input_gb * b.complexity * 0.3 AS DECIMAL ( 20 , 2) )
 WHEN b.task_type SQLRT THEN CAST(b.input_gb * b.complexity * 0.3 AS DECIMAL ( 20 , 2) )
 WHEN b.task_type CUPID AND b. status Terminated AND b.cost_cpu_hours b.cost_mem_gb_hours THEN CAST(b.cost_cpu_hours * 0.66 AS DECIMAL ( 20 , 2) )
 WHEN b.task_type CUPID AND b. status Terminated AND b.cost_cpu_hours b.cost_mem_gb_hours THEN CAST(b.cost_mem_gb_hours * 0.66 AS DECIMAL ( 20 , 2) ) 
 ELSE 0 
 END cost_sum
 ,b.settings
 ,TO_CHAR(b.end_time, yyyymmdd ) AS ds
FROM (
 SELECT a.task_schema
 ,a.task_name
 ,a.task_type
 ,a.inst_id
 ,a. status 
 ,CASE WHEN GET_JSON_OBJECT(a.settings, $.SKYNET_ID ) IS NULL THEN NULL 
 ELSE GET_JSON_OBJECT(a.settings, $.SKYNET_ID ) 
 END node_id
 ,CASE WHEN GET_JSON_OBJECT(a.settings, $.SKYNET_NODENAME ) IS NULL THEN NULL 
 ELSE GET_JSON_OBJECT(a.settings, $.SKYNET_NODENAME ) 
 END node_name
 ,CASE WHEN GET_JSON_OBJECT(a.settings, $.SKYNET_ONDUTY ) IS NULL THEN owner_id 
 ELSE GET_JSON_OBJECT(a.settings, $.SKYNET_ONDUTY ) 
 END owner_id
 ,a.start_time
 ,a.end_time
 ,a.signature
 ,CAST(a.input_bytes / 1024 / 1024 / 1024 AS DECIMAL ( 20 , 2) ) AS input_gb
 ,a.complexity
 ,CAST(a.cost_cpu / 100 / 3600 AS DECIMAL ( 20 , 2) ) AS cost_cpu_hours
 ,CAST(a.cost_mem / 1024 / 3600 / 4 AS DECIMAL ( 20 , 2) ) AS cost_mem_gb_hours
 ,a.settings
 FROM ods_cu.odps_tasks_history_di a
 WHERE a.ds ${gmtdate} 
 AND a.signature IS NOT NULL
 AND a.start_time ! a.end_time
LEFT JOIN ods_cu.odps_user_ram_all u
ON b.owner_id u.user_id
-- 其中ods_cu.odps_tasks_history_di为tasks_history的备份表
INSERT INTO TABLE ods_ops.odps_tasks_resource_info_five_min_di PARTITION(ds)
SELECT project_name
 ,task_type
 ,COALESCE(GET_JSON_OBJECT(settings, $.SKYNET_ONDUTY ), - ) AS owner_id
 ,sum(cpu_usage) AS cpu_usage
 ,sum(mem_usage) AS mem_usage
 ,dateadd(
 DATETRUNC(GETDATE(), hh )
 ,CEIL(DATEPART(GETDATE(), mi )/5) * 5
 , mi 
 ) AS stat_time
 , ${gmtdate} AS ds
FROM information_schema.tasks
GROUP BY project_name
 ,task_type
 ,GET_JSON_OBJECT(settings, $.SKYNET_ONDUTY )
;

image2021-7-1_17-13-49.png

image2021-7-1_17-8-37.png

基于上表统计的信息 我们可以得出每日费用的top n 再结合lancher中设置的job的资源和job在运行过程中实时使用的资源折线图 现在 MaxCompute 官方也提供类似的功能 来评估lancher中对资源配置的合理性 再做出优化。资源参数优化后费用仍然比较高的job 我们会进行代码层面的优化 优化流程和分区剪裁 降低job的数据扫描量 减少数据的shuffle 最终实现参数和代码层面的双优化。同时由上表计算出的甘特图也方便我们对job进行编排 提高调度资源的使用率 压缩整体的时间窗口。同时随着后期作业的稳定 不再频繁进行初始化全量跑job 我们这边的费用就很快出现较大的下降 并稳定下来。


ODPS SQL 和PYODPS优化

两者收费的标准是一致的 都是看sql的复杂度和扫描的数据量。目前我们的报表业务需求中 能用sql实现的现在尽量优先使用odps sql实现 所以这部分的占比越来越高。原因如下

我们目前的报表业务中的需求本身也是sql表达 很少有opds/hive sql 无法实现的功能。


Spark job的调优本身就是更复杂的事 而odps sql 的资源调配不用自己设置 只需要自己配置响应的调优参数和在sql层面做优化 相对比较简单。


odps sql 的计费方式看数据量和复杂度 和我们大量的离线sql计算场景比较match 同样的sql作业odps sql能在同样时间内比spark sql调度更多的资源 计算更快。


这方面的作业的开销 在上面统计spark的作业的表中一并被统计了。由于计费维度的不同 这里的优化 主要集中于对sql的优化。

扫描数据量 这个主要结合阿里官方提供的分区剪裁的经验以及源表的存储做优化。准则就是尽量把数据的扫描集中在少数分区 尽量避免分区剪裁失效的写法。


sql复杂度 结合官方的文档说明 这里主要是做了sql拆解简化 降低复杂度。


四、使用 MaxCompute 带来的价值

使用MaxCompute 带来的价值 主要是免去了大数据集群运维的工作 能够做到开箱即用 以及算力和存储的弹性伸缩问题。
使用全托管的MaxCompute 使得数据团队 能够更加关注业务问题本身 能够更高效的解决业务问题 也能够比较好的应付短期需要增加大量算力的情况。
另外在创建数据仓库、数据湖等方面 也能够提供比较全面的解决方案。


更多关于大数据计算、云数仓技术交流 可查看 MaxCompute 官网或者可点击邀请链接或扫码加入 “MaxCompute开发者社区” 钉钉群

2群.jpg


本文转自网络,原文链接:https://developer.aliyun.com/article/785147

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • 云学堂在使用 MaxCompute + DataWorks

    云学堂在使用 MaxCompute + DataWorks

  • 云原生不仅颠覆了技术栈,背后的每个岗

    云原生不仅颠覆了技术栈,背后的每个岗

  • 让保险更保险,TOP 5 财险新一代核心系

    让保险更保险,TOP 5 财险新一代核心系

  • 在阿里云活动中选购云服务器时,我们应

    在阿里云活动中选购云服务器时,我们应

腾讯云代理商
海外云服务器