在追求数据时效性的今天,如何高效处理低延时的流数据,逐渐成为大家越来越关注的问题。 流数据处理能力已经成为衡量大数据平台计算实力的一个重要指标。Greenplum作为最先进的开源大数据平台,天生具备处理复杂问题的优势。Pivotal的研发团队在开源Greenplum的基础上,提供了新的高速流数据引擎gpKafka,从而将Greenplum强大的SQL处理能力引入到流计算领域。本文重点介绍目前主要的流计算模式,以及gpKafka如何将Greenplum打造成近实时的流计算引擎。
今天,有越来越多的人在讨论流计算和流数据,如同十年前讨论的大数据一样。对大数据的含义,直到今天,不同人对它仍然有不同的理解。有人认为大数据就是Hadoop,有人说大数据是机器学习和AI,有人说大数据是数据在云上等等。这些有差异的理解不利于我们对问题本身的研究和交流。所以,在我们开始讨论流数据和流计算之前,要先对其具体含义做一个相对精确的描述和解释,从而形成一个共识。所以我们依次解释这三个基本概念:流数据,流计算引擎及流计算模式。
什么是流数据
对它下定义的话,非常简单,没有边界的数据。比如车辆的位置信息,设备的运行状态报告,网站的用户点击信息等。尽管它的定义很简单,流数据有几个比较重要的特点。第一个是流数据从产生到处理,存在延迟。因此流数据有两个时间属性:事件时间和处理时间。而处理时间的延迟,并没有严格要求,可能很大,可能很小,可能时大时小变化很大;而这是流数据区别于实时数据的重要方面。流数据不是实时数据,实时数据不考虑事件时间和处理时间的差别。尽管随着硬件性能的提升,很多原生的流处理引擎已经可以支持部分软实时的应用场景,但流数据和实时数据本身并没有什么必然联系,二者之间有交集,但属于不同的应用范畴。流数据第二个特点,它本身是可以做到强一致性的。认为流数据是不可靠的是一种偏见,或者只是为技术上难以实现强一致性找到借口。但根据具体使用场景的不同,应用可以根据实际需求,来决定自己需要达到的一致性目标,比如强一致,最终一致,或者最多一次,最少一次等等。
什么是流计算引擎
定义完流数据,再理解流数据引擎就比较容易。流数据引擎是专门处理无边界数据的计算引擎。它也有两个特点,首先是流计算引擎必须可以满足强一致性要求。如果某个流计算引擎在设计上没有将强一致性作为考虑的目标,那它必定无法保证结果的准确性,那也就不是一个真正的流计算引擎。第二个特点是流计算引擎需要支持所有的或者大部分流计算模式。流计算模式主要有三个不同的属性。我们下面来依次介绍。
首先是时间属性, 一共分三类,事件时间,处理时间及时间无关。那为什么要把将事件时间和处理时间分开呢?原因是通常事件时间和处理时间之间的延时,是不固定的,且变化可能很大。举个例子来说有一个设备在记录我们的位置信息,并将记录通过移动网络实时上传。然而当我们坐上飞机起飞的时候,这个设备仍在工作,但无法将数据上传。而在飞机降落时,它会把累计的数据一起上传。类似的情况在实际应用中非常常见。
窗口,或者叫开窗(英文windowing)指的是如何对流数据添加虚拟的边界,从而将无边界的流数据转变成一个个有边界的数据集;它是处理无边界数据的最常用方法。添加的边界通常有两类,时间边界和事件边界,分别叫做时间窗口和会话窗口;时间窗口有两种类型,固定窗口和滑动窗口;下面用个简单的例子解释下三者的区别。
Key表示我们需要观察的对象,每一行表示这个对象发生的事件,所以,这里我们有来自这三个事件数据流。固定窗口,表示以固定的时间间隔划分数据流,每次处理的是这段时间内的数据。滑动窗口每次也是按照固定时间间隔执行处理函数,假设这个时间间隔为T1,每次处理的数据是从当前时间开始,之前一段时间之内的数据,假设这个时间段为T2。通常T1小于T2。当T1等于T2时,滑动窗口退化为固定窗口。当T1大于T2,滑动窗口等价于对数据进行下采样。而会话窗口的含义是,需要处理的事件,有明确的开始和结束的标志。开始和结束标志之间的一系列事件称为一个会话,而对数据的处理则是以会话为单位。基于会话的场景是使用最多情况, 实现时可以转换为滑动窗口的方式。
介绍完时间和窗口,最后一个属性就是计算的类型,我们称它为运算,也就是我们需要对流数据执行什么样的处理。从简单到复杂,依次是,流数据的内连接,也就是在两个流数据中找到共同的事件或类似的事件;第二个是数据的变换和过滤,比如简单去重,单位转换,到复杂的加密,脱敏等;第三种是最复杂的流数据聚合,即需要执行某种聚合函数从而识别出数据流的某些特征。目前来看,执行流运算最佳的工具就是SQL。
为什么SQL是适合流计算的最佳工具?SQL有强大的表达和计算能力,有完善的标准和众多的厂商支持。它从诞生那一天就遇到各种各样的挑战者,SQL解决复杂分析问题的能力是经过检验的。最重要的一点,这些问题其实都是SQL早已解决过一遍的问题。这也不难理解为什么当下趋势就是越来越多的所谓“真正的流计算引擎”也陆陆续续提供了SQL的支持,例如spark,flink,ksql等。
现在来总结流数据处理的模式,简单说就是这三个属性的排列组合。 绝大部分流计算问题都可以划分到这里面的某一个类别,这些也是流计算引擎需要支持的计算模式。例如在基于事件时间的会话窗口上,进行网络攻击检测;或者以固定窗口,对时间无关的数据进行加工变换等等。
既然,大家都希望在流计数据上执行SQL,一个简单的思路当就是让已有的SQL引擎来支持对流数据的查询。比如大家熟悉的Pipelinedb,Timescaledb等等,他们都是在Postgres上进行扩展,将其改造成流数据引擎。Greenplum也是同样,经过一些增强,就可以将它变成分布式流计算引擎。 接下来我们从gpKafka开始看一下Greenplum如何满足流计算的应用场景。
Greenplum是最先进的开源大数据平台,原生支持数仓分析,机器学习,文本分析,地理信息系统等功能,广泛应用在各个行业。
Kafka最初是设计为一个分布式的日志系统,并广泛用于流数据的消息中间件场景。随着Kafka扩展组件越来越多,它也慢慢演变成为一个完整的流数据计算平台。Kafka的核心组件遵循Unix设计哲学“do one thing and do it well”,因而在实现时牺牲了不重要的功能,确保了最重要的三个功能:高速,可靠,可扩展。
这是kafka的逻辑结构,topic是存放消息的队列。每个topic包含一个或多个partition,partition的数量决定了消费时的并行程度。producer生成消息,消息发送给某个topic,或者直接发给topic中特定的partition。consumer消费消息,consumer以组(consumer group)为单位消费同一个topic的消息。同一个topic可以有多组消费者同时消费,每个组对应不同的app。在同一个组内部,每个partition的消息,同时只能由一个消费者进行消费。不存在同一组内的多个消费者消费同一个partition的情况。因此,增加producer的数量一定会提高发送端的并行度,但增加consumer的数量,则不一定会增加接受端的并行度,因为实际消费端的并行度的上限是由partition的数据决定的。
Kafka是分布式集群,Kafka集群由Broker组成,每一个Broker都是可以独立提供服务的进程实例。消息以分区为组织单元,保存在Broker上。数据会有备份,称为replica, 备份保存在另一个broker上,因此备份的数目不会超过Broker的数目,而分区数没有这个限制,同一Broker上可以有同一topic的多个分区。
在所有的备份中,有一个Leader对外提供读写服务,其它称为replica。只有在leader挂掉的情况下,才会从剩下的replica中选出新的Leader提供服务。为最大化Kafka的吞吐,在流处理管道架构设计上必须考虑如何与Kafka本身的设计相匹配。例如在Broker数目固定的情况下,增加topic和增加partition,本质上都是横向扩展,更需要关注的是如何避免各个partition中数据的倾斜。
gpKafka是Greenplum的Kafka数据加载组件,官方名称为“Greenplum-Kafka integration”。它把Kafka高速的流处理能力和Greenplum强大的SQL执行能力联合起来,大大降低了数据处理的延时,从而将Greenplum引入到近实时应用的场合。gpKafka是经Confluent官方认证的数据加载解决方案,支持Kafka从基本的数据加载到高级的元数据管理功能。gpKafka支持exactly-one等强一致性的使用场景,可运行在Cloudfoundry或者K8s上。 此外在接下来的版本里,gpkafka会增加的两个最重要功能,横向扩展和向Kafka卸载数据。
gpKafka利用gpss从Kafka的特定topic中读取数据,然后转发给Greenplum集群。Gpss全称是Greenplum Stream Server,是Greenplum的下一代数据加载解决方案,相比于gpfdist,GPSS会提供流数据支持及API接口,有更好的扩展性,支持更丰富的功能,并开放更细粒度的任务控制接口。
Gpss在读取Kafka中的消息时,为topic中每一个partition创建一个reader作为consumer。然后把来自于同一个topic的消息汇集到一起,再通过Gpfdist协议转发给各个Segment。Gpfdist协议是在HTTP协议基础上做了增强,用于实现向Greenplum的Segment节点直接发送数据。Gpfdist和Gpss都使用的Gpfdist协议。
Gpss自己实现了Gpfdsit协议的服务端,本身并不包含Gpfdist可执行程序。Gpss有专门的controller服务来管理batch,controller决定何时开始或结束一个加载batch,何时执行数据转换函数。GpKafka目前支持两种定义batch的方式,基于时间的和基于消息数目的。GpKafka把从一个Kafka topic到一个Greenplum表的加载任务,定义为一个job,每个job通过yaml格式的配置来文件定义。每个Gpss进程可以同时执行多个job。最大同时运行的job数量取决于运行Gpss进程的机器的系统资源。
gpKafka支持的运算类型,包括消息数据进入Greenplum之前,经过的所有变换和处理。
灰色部分是正在实现中的功能、会在之后版本支持,主要用于数据脱敏、解密等。Formatter用于解析消息,除了消息长度外,Kafka本身并不对消息的内容做任何额外要求,不会修改和查看消息内容,消息可以是任何格式,比如json,csv,avro,甚至二进制数据。应用程序完全可以根据自己的场景和优化目标决定使用哪种文件格式。Json或csv文本可读性强,但没有压缩,数据量大;压缩格式会明显的增加吞吐,但会增加收发端的CPU负载。通常在Kafka的绝大部分使用场景,网络是明显的瓶颈,比较推荐使用压缩格式。Confluent官方推荐的格式avro。
Formatter的目的就是将这些不同格式的消息进行相应的变换,从而得到Greenplum可以识别的内容。例如对csv是直接解析,而avro格式则要先转换成json。
Formatter之后的处理流程是transform,指的是具体对数据执行何种操作,在接下来会对其有专门的介绍。 最后一步操作是后处理,其主要目的是根据前一步变换的结果,对需要入库的数据进行筛选。它通过指定的过滤条件,来保留需要的列或者数据。例如我们只需要跟踪分析某一个特定用户行为,就可以在加载前识别出有效数据从而避免额外的数据清洗工作。
Transform是gpKafka中最强大最灵活的功能。它通过在外部表上执行函数的方式,在落盘之前将数据进行必要的转换。例如提取图像信息,非结构化到结构化数据的转换,甚至执行机器学习函数等。Greenplum支持用各种常用语言来实现自定义函数,除了SQL外,还包括pl/C,pl/R,pl/python,pl/Java,pl/perl等,方便不同背景的用户在实现时充分权衡开发效率和运行效率。此外,Transform的变换函数在各个segment上并行执行,不存在单点瓶颈。
介绍完运算,接下来介绍时间和窗口。 数据处理时间就是执行transform的时间,可以通过now()函数来获取。而事件时间通常保存在消息的记录中,可以在Mapping中指定。gpKafka通过minibatch的控制数据加载,batch的时间间隔就是数据加载时每次窗口移动的长度,通过配置文件的MINIMAL_INTERVAL指定。POST_BATCH_SQL用来指定窗口函数。窗口函数与transform函数类似,可以是任意合法的SQL或者自定义函数,窗口大小可以通过窗口函数的参数决定,或者由窗口函数本身来控制。
gpKafka默认将数据进行转换后加载到Greenplum中,因此可以保留所有的流原始数据。从而可以方便的进行滑动窗口或者session运算。换句话说,gpKafka是通过滑动窗口的方式来实现对会话窗口的支持。
这里用一个简单的例子来展示滑动窗口究竟是怎么实现的。这个窗口函数非常简单,只是计算窗口时间的消息个数。它的输入参数i,表示了滑动窗口的窗口长度,通过一个简单的where条件实现过滤。
现在让我们简单总结一下:gpKafka完整支持流计算的批处理模式,支持区分事件时间和处理时间,支持固定窗口及滑动窗口,可以通过时间窗口模拟会话窗口, 并可以在这些窗口上执行各种高级的SQL操作,使用Greenplum强大的分析引擎。因此Greenplum可以胜任很多流计算的应用场景,下面我们看一个典型的例子。
这是一个用Greenplum5和gpKafka进行网络日志分析的典型应用,用于找到潜在的安全风险,识别网络攻击等。客户监控所有的网络通信,将抓到的pcap包交给Kafka,然后通过gpKafka持续加载到Greenplum中利用Madlib进行分析训练。最后将训练好的模型发送给spark,进行低延时的各种运算。在Greenplum6中,对并发性和短小查询做了大幅的性能优化,因此在Greenplum6以后的版本中,配合resource group做好资源隔离,很多计算也可以直接在Greenplum中完成。如今,Greenplum正慢慢演进为一个全功能的大数据计算平台,在传统的数仓之外,Greenplum必定会在更多领域发挥越来越大的作用。
本文根据李阳在2019年PostgreSQL杭州沙龙活动中的演讲内容整理。
参考文献
[1] Akidau, Tyler, Slava Chernyak, and Reuven Lax. Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing. First edition. Beijing Boston Farnham Sebastopol Tokyo: O’Reilly, 2018.
[2] Psaltis, Andrew G. Streaming Data: Understanding the Real-Time Pipeline. Shelter Island, NY: Manning Publications, 2017.
[3] https://sookocheff.com/post/k…
[4] https://medium.com/@rem.baba/…