程序员

【详谈 Delta Lake 】系列技术专题 之 Streaming(流式计算)

作者:admin 2021-07-05 我要评论

前言 本文翻译自大数据技术公司 Databricks 针对数据湖 Delta Lake 系列技术文章。众所周知 Databricks 主导着开源大数据社区 Apache Spark、Delta Lake 以及 ML...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)
前言

本文翻译自大数据技术公司 Databricks 针对数据湖 Delta Lake 系列技术文章。众所周知 Databricks 主导着开源大数据社区 Apache Spark、Delta Lake 以及 ML Flow 等众多热门技术 而 Delta Lake 作为数据湖核心存储引擎方案给企业带来诸多的优势。

此外 阿里云和 Apache Spark 及 Delta Lake 的原厂 Databricks 引擎团队合作 推出了基于阿里云的企业版全托管 Spark 产品——Databricks 数据洞察 该产品原生集成企业版 Delta Engine 引擎 无需额外配置 提供高性能计算能力。有兴趣的同学可以搜索 Databricks 数据洞察 或 阿里云 Databricks 进入官网 或者直接访问 https://www.aliyun.com/product/bigdata/spark 了解详情。


译者 冯加亮 加亮 阿里云计算平台事业部大数据工程师


image

Delta Lake 技术系列 - 流式计算目录Chapter-01 ?使用 Delta Lake 解决流式数据入湖的难题Chapter-02 ?使用 Delta Lake 简化股票行情数据的分析Chapter-03 ?Tilting Point 游戏公司是如何使用 Delta Lake 处理流数据Chapter-04 使用 Delta Lake 构建流媒体视频的解决方案本文介绍内容

Delta Lake 系列电子书由 Databricks 出版 阿里云计算平台事业部大数据生态企业团队翻译 旨在帮助领导者和实践者了解 Delta Lake 的全部功能以及它所处的场景。在本文中 Delta Lake 系列-实时流处理场景 The Delta Lake Series Streaming ,通过客户最佳实践案例 介绍使用 Delta Lake 做流式数据计算的场景。

imageChapter-01 使用 Delta Lake 解决流式数据计算的难题

传统流式数据和数仓数据可分成数据湖和数据仓库两部分。

数据湖的不足很难合并来自不同系统的流数据。在数据湖中想要更新数据几乎是不可能的 尤其在涉及到财务对帐 数据调整 使得流式数据更新尤为重要。数据湖的查询速度通常很慢。优化存储非常困难 并且通常需要复杂的逻辑数据仓库的不足局限于使用 SQL 分析在场景需要的情况下同时访问流数据和存储数据是非常困难的数据仓库的可伸缩性不好计算和存储无法分离 使得数据仓库使用起来较为昂贵Delta Lake的优势

Databrick 官网 Delta Lake 指南 https://docs.databricks.com/delta/index.html

以上内容介绍了数据湖和数据仓库的局限性 那么 Delta Lake 是如何解决以上的问题呢

Delta Lake 中的表既可作为数据源进行大数据分析又可作为目的源表进行流式实时写入 真正实现批流一体。Delta 表支持增删操作Delta Lake 支持 ACID 使得创建兼容的数据解决方案很容易具有专业的机器学习 ETL 处理 数据分析和查询简单高效计算和存储分离 支持更高效的解决方案


image

Chapter-02 Delta Lake 在股票行情分析的应用

实时分析股票数据是一项复杂的工作 这其中有大量的流数据需要实时维护同时还要处理历史数据 事务一致性方面面临很大挑战 这些问题在 Delta Lake 架构中通过Apache Spark 的可伸缩性、流计算和灵活的数据分析的能力 以及 ACID 事务性可以轻松解决。

使用 Delta Lake 解决方案架构图通过此架构图 我们清晰的看到将股票价格数据和金融数据实时数据写入 Delta Lake 的两张表里面。然后我们通过读取 Delta Lake 表中的数据做 ETL 数据清洗 并将清洗后的数据写入第三个 Delta Lake 表 用于下游分析

image

实时的流数据有两类 Fundamentals data 和 Price data 为了模拟这两种数据 我们在Delta Lake 中创建了 Delta 表 使用 .format(‘delta’)并指向 OSS 数据存储

%pyspark
# Create Fundamental Data (Databricks Delta table)
dfBaseFund spark \
.read \
.format(‘delta’) \
.load(‘/delta/stocksFundamentals’)
%pyspark
# Create Price Data (Databricks Delta table)
dfBasePrice spark \
.read \
.format(‘delta’) \
.load(‘/delta/stocksDailyPrices’)

接下来 我们通过开始和结束日期筛选出来有用的数据 然后将该日期范围的价格和基本数据合并到 OSS

%pyspark
# Determine start and end date of available data
row dfBasePrice.agg(
 func.max(dfBasePrice.price_date).alias(“maxDate”),
 func.min(dfBasePrice.price_date).alias(“minDate”)
).collect()[0]
startDate row[“minDate”]
endDate row[“maxDate”]
# Define our date range function
def daterange(start_date, end_date):
 for n in range(int ((end_date - start_date).days)):
 yield start_date datetime.timedelta(n)
# Define combinePriceAndFund information by date and
def combinePriceAndFund(theDate):
 dfFund dfBaseFund.where(dfBaseFund.price_date theDate)
 dfPrice dfBasePrice
 .where(dfBasePrice.price_date theDate)
 .drop(‘price_date’)
# Drop the updated column
 dfPriceWFund dfPrice.join(dfFund, [‘ticker’]).drop(‘updated’)
# Save data to OSS
 dfPriceWFund
 .write
 .format(‘delta’)
 .mode(‘append’)
 .save(‘/delta/stocksDailyPricesWFund’)
# Loop through dates to complete fundamentals price ETL process
for single_date in daterange(
startDate, (endDate datetime.timedelta(days 1))
 print ‘Starting ’ single_date.strftime(‘%Y-%m-%d’)
 start datetime.datetime.now()
 combinePriceAndFund(single_date)
 end datetime.datetime.now()
 print (end - start)

现在我们有一系列价格数据写入到 oss 中的 /delta/stocksDailyPricesWFund。我们通过读取 OSS 指定路径的数据.format(“Delta”)来创建 Delta Lake 表。

%pyspark
dfPriceWithFundamentals 
spark.readStream \
 .format(“delta”) \
 .load(“/delta/stocksDailyPricesWFund”)
// Create temporary view of the data
dfPriceWithFundamentals.createOrReplaceTempView(“priceWithFundamentals”)

创建一个视图允许我们实时计算价格/收益比率进行分析。

%sql
CREATE OR REPLACE TEMPORARY VIEW viewPE AS
select ticker,
price_date,
first(close) as price,
(close/eps_basic_net) as pe
from priceWithFundamentals
where eps_basic_net 0
group by ticker, price_date, pe

实时分析股票流数据

%sql
select *
from viewPE
where ticker “AAPL”
order by price_date

image

由于整合数据集源数据是 Delta Lake 表 所以这个视图不仅仅显示批处理数据 同时也要展示新的数据流。如下图所示

image

与此同时 使用 Structured Streaming 不仅仅只是实时的将数据写入 Delta Lake 同时也要保持主键的唯一性。

image

最后 我们演示了如何使用 Delta Lake 简化股票实时数据分析。通过 Spark Structured Streaming 和 Delta Lake 我们可以使用 Databricks 集成的 Workspace 来创建一个具有数据湖和数据仓库优点的高性能、可扩展的解决方案。

image

Databricks 统一数据平台消除了通常与流和事务一致性相关的数据工程 使数据工程和数据科学团队能够专注于他们的股票数据。

image

Chapter-03 Tilting Point 游戏公司使用 Delta Lake 处理流数据背景

Tilting Point 新一代游戏运营商 在为顶级开发工作室提供专家资源、服务和运营支持 优化高质量的实时游戏等方面 做的都很成功。Tilting Point 通过其用户获取基金和世界级的技术平台 通过绩效营销和游戏直播 帮助开发商实现盈利。通过使用Delta Lake 架构, Tilting Point 公司可以实时的利用高质量的数据进行数据分析。

业务需求分析

Tilting Point 的团队之前按小时进行游戏分析报告的批处理工作 由于业务需要 他们希望能够在5-10分钟内完成实时报告。

他们还希望根据玩家的实时行为做出游戏内的 LiveOps 决策 将实时数据提供给服务系统 提供关于 LiveOps 变化的实时告警 这些告警会对游戏体验产生很大的影响 我们通过采集这些实时的变化参数将游戏体验尽可能做到极致。

此外 他们必须单独存储加密的个人身份信息 (PII) 数据 以保持 GDPR 的合规。

实时处理数据流的挑战

Tilting Point 有一个专有的软件开发工具 开发人员可以与之集成 将数据从游戏服务器发送到 AWS 中托管的服务器。该服务会删除所有 PII 数据 然后将原始数据发送到 Amazon Firehose 终端。然后 Firehose 将 JSON 格式的数据持续转储到 S3

为了清洗原始数据并使其快速用于分析 团队考虑将连续数据从 Firehose 推送到消息队列(例如 Kafka, Kinesis ) 然后使用 Spark 的 Structured Streaming Databricks Structured Streaming 来连续处理数据并写入 Delta Lake 表。

虽然这种架构听起来非常适合以秒为单位处理低延迟的数据 但实际上 Tilting Point 对它们的 pipeline 并没有这么低的延迟需求。他们希望在分钟级 就能得到可供分析的数据。因此 他们决定取消消息队列来简化架构 而是使用 S3 作为结构化流作业的数据源。使用 S3 作为连续流数据源的关键挑战是如何实时识别最近更新的文件。

每隔几分钟列出所有文件有两个主要问题

高延迟 列出包含大量文件的目录中的所有文件会有很高的性能开销 并增加处理时间。高成本 每隔几分钟列出大量文件会迅速增加S3成本。利用 Structured Streaming 使用 blob 作为 Delta Lake 数据存储。

为了从 S3 云 blob 存储中获取连续流数据 Tilting Point 使用了 Databricks 的“S3- sqs”数据源选项。S3- sqs 提供了从 S3 增量流数据的封装 而不需要对最近处理的文件再编写任何状态管理的代码。

以下是 Tilting Point pipeline:

配置 Amazon S3 事件通知 通过 SNS 向 SQS 发送新的数据。Tilting Point 使用 S3-SQS 方式源读取到达S3中的新数据。方式如下:
%pyspark
spark.readStream \
 .format(“s3-sqs”) \
 .option(“fileFormat”, “json”) \
 .option(“queueUrl”, ...) \
 .schema(...) \
 .load()
Tilting Point 使用 Structured Streaming 进行数据清洗和转换 基于游戏实时流数据 使用 Spark Streaming foreachBatch API 写入到30个不同的 Delta Lake 表中。流作业过程中会生成大量小文件,这将影响下游消费者的性能 因此 每天都会运行一个优化作业来合并表中的小文件 以便 Delta Lake 表在读取数据时具有良好的性能。

image

使用 Delta Lake 架构 带来的便利 增加选配项“s3-sqs” 可以增量加载 S3 中的新文件 有助于快速处理新文件 而不会在列出文件时产生太多开销不会显示文件状态管理 无需显示的进行文件状态管理。更低的操作成本:由于我们使用 S3 作为 Firehose 和 Structured Streaming 作业之间的 checkpoint 停止和执行数据的操作负担相对较低数据读写可靠 Delta Lake 提供 ACID(optimistic concurrency control)事务保证。这有助于读写更加可靠。文件压缩:在流处理过程中 会产生很多临时小文件,这会影响读写性能。在 Delta 之前我们必须建立一个不同的表来编写压缩数据。在 Delta Lake 中 由于 ACID 事务 我们可以压缩文件并安全地将数据重写回相同的表中。快照隔离: Delta Lake 的快照允许我们在流作业执行时 修改和压缩数据时 Delta 表也可以正常读取。回滚:写入错误的情况下 Delta Lake 的 Time Travel(Time Travel) 可以帮助我们回滚到表的前一个版本。

image

Chapter-04 使用 Delta Lake 构建流媒体视频的解决方案

随着传统的付费电视继续停滞不前 内容所有者开始接受直接消费者( D2C )订阅和广告支持的流媒体服务 以从他们的内容库中赚钱。对于那些整个商业模式都围绕着生产优质内容 然后将其授权给分销商的公司来说 向现在玻璃体验的转变需要新的创新力 比如为向消费者提供内容建立媒体供应链 支持各种设备和操作系统的应用程序 并执行帐单和客户服务等客户关系功能。

由于大多数服务都是按月更新的 订阅服务运营商需要随时向用户证明其价值。流媒体视频的一般质量问题(包括缓冲、延迟、像素化、抖动、丢包和空白屏幕)会对业务产生重大影响 无论是增加用户流失率还是降低视频参与度。

浏览频道 点击进入和退出应用程序 从不同的设备同时登录等等。而且 由于电视的本质 最重要、最引人注目、吸引最多观众的活动往往会出问题。如果你开始在社交媒体上收到投诉 你如何判断这些投诉是某个用户独有的 还是地区性或全国性的问题?如果是全国性的 它是跨所有设备还是只跨特定类型(例如 OEM 可能更新了旧设备类型上的操作系统 最终导致与客户端的兼容性问题)?

当考虑到用户的数量、他们正在采取的操作的数量以及体验中的切换(服务器到CDN到ISP到家庭网络到客户端)的数量时 识别、纠正和防止查看者体验质量问题成为一个大数据问题。服务质量( QoS )有助于分析这些数据流 以便您能够理解哪里出了问题、在哪里出了问题以及为什么出了问题。最终 你可以进行预测分析 了解可能出现的问题以及如何提前作出补救措施。

服务质量解决方案概述

这个解决方案的目的是为了统一改善其 QoS 系统的流媒体视频平台。它基于 AWS 实验室提供的 AWS 流媒体分析解决方案 我们随后在此基础上添加了 Databricks 作为统一数据分析平台 用于实时分析和高级分析功能。

通过使用 Databricks(Databricks cumstomers) 流媒体平台可以通过始终利用由健壮和可靠的数据管道提供的最完整和最新的数据集来获得更快的见解。通过使用协作环境加速数据科学 这减少了新特性上市的时间。它为管理端到端机器学习生命周期提供支持 并通过为数据工程和数据科学提供统一的平台 降低软件开发所有周期的运营成本

视频 QoS 解决方案架构

由于低延迟的监控警报和视频流量高峰时所需的高度可伸缩的基础设施等复杂性 直接的架构选择是Delta Architecture -像Lambda和Kappa架构这样的标准大数据架构在维护多种类型管道(流和批处理)所需的操作方面都存在缺点 并且缺乏对统一的数据工程和数据科学方法的支持。

Delta 架构优势 数据工程师可以经济有效的方式连续开发数据管道 而不必在批处理和流式之间进行选择 真正的批流一体。数据分析师可以获得接近实时的数据分析结果 来帮助他们做 BI 查询。数据科学家可以开发更好的机器学习模型 使用更可靠的数据集 支持版本回退 便于计算和查询。

image


以下是 Delta Lake 经典的三级数据表架构。我们针对每一层级的数据表分别做了如下定义:


image

Bronze 表 存储原生数据 Raw Data 存放的表或摄入表通常是原生格式的原始数据集 JSON, CSV or txt )。Silver 表 该表是在对 Bronze 表的数据进行加工处理的基础上生成的中间表 对Bronze 表做了清洗/转换可以作为数据科学训练的数据。Gold 表 基于业务数据表 表数据已经高度集成 可以用于 BI 报表展示的数据。

在完全使用流数据计算的场景里 在 Delta Lake 中间表 DataFrames 的选择上是在延迟 /sla 成本之间做的权衡 例如实时监控报警和基于新内容的推荐系统更新 。

QoS 体系结构是集中在数据处理的解决方案,他不是一个完整的视频点播( VoD )解决方案,通过与一些服务主件的结合例如结合亚马逊网管服务 避免其他的运维工作为数据分析师专注于数据和分析提供保证。

image

数据写入 Delta Lake数据准备

在 QoS 解决方案中的两个数据源(应用程序事件和 CDN 日志)都使用了 JSON 格式。

为了让整个组织能够直接查询数据 Bronze to Silver Pipeline 将所有原始数据格式转换为Delta格式。

视频 APP 事件日志

基于该体系结构 视频应用程序事件被直接推送到 Kinesis Stream 然后使用模式 append 写入到 Delta Lake。

在流处理场景下会产生大量的小文件 大量小文件的存在会严重影响数据系统的读性能。Delta Lake 提供了 OPTIMIZE optimize 性能优化 命令 可以将小文件进行合并压缩。

时间戳和消息类型都是从 JSON 事件中提取的 以便能够对数据进行分区 以及选择想要处理的事件类型。将事件的单个 Kinesis 流与 Delta Lake“events” 表结合在一起 降低操作难度。

CDN 日志

CDN 日志被传送到 S3 所以处理它们的最简单的方法是 Databricks Auto Loader 它在新数据文件到达 S3 时增量地、高效地处理它们 而不需要任何额外的设置。

%pyspark
auto_loader_df spark.readStream.format(“cloudFiles”) \
 .option(“cloudFiles.format”, “json”) \
 .option(“cloudFiles.region”, region) \
 .load(input_location)
 anonymized_df auto_loader_df.select(‘*’, ip_
anonymizer(‘requestip’).alias(‘ip’))\
 .drop(‘requestip’)\
 .withColumn(“origin”, map_ip_to_location(col(‘ip’)))

本文转自网络,原文链接:https://developer.aliyun.com/article/785029

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • 【详谈 Delta Lake 】系列技术专题 之

    【详谈 Delta Lake 】系列技术专题 之

  • Serverless 崛起背后的五大挑战

    Serverless 崛起背后的五大挑战

  • SAP(HANA+S/4)上云基础环境部署最佳

    SAP(HANA+S/4)上云基础环境部署最佳

  • 云起体验实验室全新发布,完成体验领定

    云起体验实验室全新发布,完成体验领定

腾讯云代理商
海外云服务器