大数据运营怎么做(数据抓取和能力详解)

seoxin 09-11 17:55 9次浏览

在得到的大数据产品体系中,用户分群是数据化精准运营方向的一个重要功能。该项目的落地为数据产品驱动业务发展提供了新的思路。在架构设计中,大数据中心采用了目前业界应用较少的新型数据库同步技术,尽最大可能的保障参与计算的数据素材的实时性,以此保障运营动作的精准性。

本文将围绕数据驱动型项目的技术选型、架构设计、性能优化等方面展开。本文分为四个部分:

第一部分介绍一切事情的起源,以及需求理解后的分层设计;

第二部分是本次分享的重点,介绍数据层的选型方案和整体架构;

第三部分为产品层和应用层的设计;

第四部分为踩坑回顾和未来规划。

用户分群 -- 得到实时大数据运营探索

一、故事的开始是这样的

有一天,运营老师提出这样一个需求:

用户分群 -- 得到实时大数据运营探索

“我想要做这样一个运营动作,筛选出‘目前’买过X课程的但没有买过Y课程的,有Z勋章的且昨天看过Y课程详情页的人群。给该人群中的每一个用户发送一个push,以及一张Y课程的优惠券,促使该用户群体买Y课程。如果用户满足以上条件,说明用户已经有很大的倾向发生购买行为了,这时候推波助澜一下,deal。”

以上需求中,有一个关键字:“目前”。因为该运营动作天生要求实时性,想象一下,如果用户昨天还没有买Y课程,今天早上刚下单,如果基于延迟计算的数据结果(如T+1)执行该运营动作,结果就是push和优惠券恰好姗姗来迟,这对用户来说已经没有促进,而是伤害了。故这种运营动作,数据一定要尽可能的保证实时。

从技术角度来理解下这个需求:从表面上来看,该需求主要是需要实时的计算数据,最后应用计算出的人群在各类运营平台上进行联动操作,比如消息中心、优惠券系统。实际上,这只是表层的,隐藏的技术难点有很多:

  1. 需要将已购表、勋章表等不同结构数据库与用户行为数据进行关联计算,这就要求我们具备异构数据实时聚合以及计算的能力;
  2. 产品层面上,需要提供给运营老师易用的人群定义系统,以及给上层系统提供高吞吐人群数据服务。

如此,我们按职能可以将架构从下到上分为四层:

  1. 最下面一层是负责将各种各样的的数据源实时同步并存储的数据构造层。
  2. 第二层是在第一层准备好的数据素材基础上,进行各种计算的数据计算层。
  3. 第三层为交付给需求方使用的产品层,落地为数据门户中的易用系统,通过点选即可完成复杂的人群生产。
  4. 最上层为用户分群系统与各上层应用分群的业务联动的数据服务层,在约定好的边界下,应用产品层生产好的人群做各类运营动作。

以故事起点的需求为例:数据构造层将已购表、勋章表汇集到一处存储,运营老师在产品层定义人群,数据计算层在数据构造层的基础上做交集并集运算计算出人群uid列表,最后消息中心、优惠劵系统拉走人群文件,遍历发送push并发出优惠券。

用户分群 -- 得到实时大数据运营探索

目前该分层架构图还很空,随着章节的推进,我们会一点一点将该分层架构图填满。

另外需要划重点的是,在四层中,数据构造层和数据计算层是紧密绑定在一起的。因为在大数据领域中,各类计算引擎的执行都是建立在文件的基础上的。比如得到数据门户中的行为分析系统应用的impala计算引擎,其高效的运算就依赖于 parquet列式文件存储。所以,下面我们会先结合这两层一起来做架构选型。

二、数据构造层和数据计算层

综合第一部分的需求分析,我们需要在这两层解决的问题有三个:

  1. 异构数据的实时同步;
  2. 计算引擎;
  3. 承上启下的存储。
用户分群 -- 得到实时大数据运营探索

在大数据中心已有的架构中是否有可以复用的呢?

稳定的数据同步场景,目前大数据中心离线数据仓库采用的是阿里巴巴开源的DataxT+1同步。DataX是阿里巴巴集团内被广泛作为离线大数据仓库和各类存储系统的正向和逆向的传输工具。而T+1同步,是指延迟一日进行数据同步的方式,这种方式相当在凌晨将数据表做一个此时此刻的镜像,每日的镜像按分区保存在数据仓库中,使用时指定最新的分区即可。那我们是否可以通过细化DataX执行周期,来做到准实时呢。理论上是可行的,但是每次的全量同步都会消耗大量的带宽资源和时间。从各种维度来看,都过于粗暴并且不合理。

首先是性能瓶颈:随着业务规模的增长,Select * From MySQL -> Save to LocalFile -> Load to Hive 这种数据流花费的时间将越来越长,流量也将越来越大。从MySQL中Select全量数据,对MySQL的资源消耗必然很大,可能产生连锁反应,影响线上业务的正常服务,即便通过建立专用从库解决,对资源的需求却会大大增加。

为何不采用增量同步?这是因为业务数据库通常是存在删改的,在Hive仓库数据不可更改的背景下,增量拉数据会导致历史数据不一致的问题。

用户分群 -- 得到实时大数据运营探索

故聚合业务数据库的数据,不应该面向数据,合理的技术选型当然是使用数据库 CDC (Change Data Capture)方案,基于各类数据库的变更日志,可以逆流程的还原数据。比如MySQL的Binlog,MongoDB的OPlog,PostgreSQL WAL。CDC的本质是有序且源源不断的数据流,为了便于程序处理,需要有一个能承载有序流的载体。这个载体是毫无争议的,必然是天生为此诞生的Kafka。确认了有序流的载体,基于此,需求可以细化为两个小点:

  1. 如何提取CDC进Kafka;
  2. 如何从Kafka消费数据,逆流程还原数据并存储?

在这条链路中,Kafka是核心枢纽,细化的需求1和2其实就是Kafka的进和出。对于这种场景,Kafka的原始团队推出的Confluent套件中的核心组件Kafka Conncet专注于此。Kafka Connector,可以简单理解为链接Kafka的管道,管道的两端必然有一个是Kafka。

用户分群 -- 得到实时大数据运营探索

如何提取CDC进Kafka?

由于业界业务库仍以MySql为主,故我们以MySql的重心进行选型。下表中,总结了市面上主流的可用工具的特点。其中,省略了很多共性:

  1. 抓取原理均为伪MySql的slave,实时抓取CDC数据;
  2. 必要的功能均支持:支持DDL同步, 支撑基于GTID的 HA;
  3. 均为大数据主语言Java开发,方便后续的原理探索和最佳实践。

所以,我们需要考虑其他方面,包括:运维成本、易用性、与既定架构契合度、活跃度、架构设计理念等。

用户分群 -- 得到实时大数据运营探索

首先被pass的是领英的DataBus,其各方面都落后太多,且官方早已经不维护了。接着就是新生代Maxwell和Debezium,与老兵Canal之争,由于Canal多个维度的模型均落下风,包括不支持Bootstrap,架构和应用的复杂度较复杂。最终我们选择了新生代中更优秀的Debezium,这个架构非常新,由2016初开始迭代,选择它在技术选型上可以称得上比较大胆了。为何做出这个冒险呢?原因如下:

  1. 先进的 Snapshot + Binlog智能切换模式;
  2. 与Confluent集成度非常好,天生以Kafka Connector的模式运行,与我们已确定的Confluent架构非常契合;
  3. 专注做CDC抓取,在官网的描述中,其诞生的初心就是建立一个连接器库,捕获来自各种结构化数据库的更改;
  4. 与得到的存储系统高度契合,包括稳定的Mysql,MongoDB,Postgre的连接器且持续迭代优化;
  5. 最后,红帽出品,根红苗正,热度非常好。

重点介绍一下Debezimu 的核心特性:

首先就是Snapshot + Binlog 智能切换模式,下面的流程图即为一次dibezimu初始化的流程:

用户分群 -- 得到实时大数据运营探索

关键流程点:

  1. 当一个Debezium任务启动时,首先根据Kafka Conncet状态信息判定自己是否是新的任务,如果是,初始化状态信息(由Kafka Conncet服务维护),开始Snapshot模式拉取数据表存量数据;
  2. 获取一个读锁(这将阻塞其他客户端的写),开启一个可重复读事务,以确保Snapshot过程中后续读到的数据都是一致的;
  3. 记录Binlog当前offset。拉取关注表的Schema信息至SchemaDDL Topic,释放读锁,2和3的操作虽然产生了读阻塞,但是由于很轻量,基本是无感知的;
  4. 准备工作完成,Snapshot正式开始,在事务内分页scan 数据至 CDC Topic,在scan完成后提交事务,从之前记录的offset开始Binlog模式;
  5. 在Binlog模式运行过程中持续维护状态信息以及SchemaDDL信息,一旦发生异常,Debezium也可以通过状态信息自愈,如此,Debezium可以按预期稳定持续的跑下去。

以上的操作,仅仅在Confluent Connetor服务中提交一个Debezium任务即可,相当的丝般顺滑。另外,强大的Snapshot+Binlog智能模式,以及其DDL语句的持续跟踪的特性,带来的高扩展性对于平台化建设绝对是强有力的加成。Debezium Connector拉取到的CDC数据以及流程中提到的状态信息和Schema信息,均保存在Kafka独立的Topic中。这是Kafka Connector的设计,不引用其他存储介质,完全封闭在Confluent的生态内,降低了架构的复杂性,另外,存储的完全隔离也对稳定性更有保障。综上,是得到选择Debezium的原因。当然,也承担了这个选择带来的坑。

用户分群 -- 得到实时大数据运营探索

如此,我们已经确定了数据构造层的大部分选型,如该图中,黄色的是已经确认的,Debezium Connector负责CDC数据抓取,抓取到的数据被打入Kafka。对于这两个核心组件,我们还需要额外两个基础组件的支持。Kafka依赖的Zookeeper,以及 Connector依赖的Schema-Registry,其中Schema-Registry是Confluent的另外一个基础组件,用于支撑各个Connector传输进Kafka数据的系列化和反系列化操作。

如何从Kafka消费数据,逆流程还原数据并存储?

在整个数据构造层中,目前唯一不确认的是存储。回到这一章节的开始梳理的,整个数据层需要解决的另外两个问题,就是承上启下的还原存储,和计算引擎的确定。数据存储的所谓启下就是从Kafka中消费CDC数据还原真实数据,所谓呈上就是定制计算引擎使用的格式。由于这两点是绑定的,确定了计算引擎,也就确定了还原存储方案。

现今大数据计算引擎类型多种多样,引擎的确认必须满足真实计算需求,回到需求本身,运营老师发起的人群计算,是可预见的基于规则的自由组合。自由选择是指,对于条件的定义且、或、非的关系,完全由使用者随时思考且定义。这是典型的Ad Hoc即席查询场景。故下表中即为我们可以考虑的可以满足该场景的计算架构。

  1. Hive:Hive是Apache开源的基于HDFS组织数据,基于sql语法提供计算能力的离线大数据仓库计算架构。通过定义描述即可知:该方案的劣势非常明显,不支撑数据更新,需要调度系统驱动密集批计算任务构建还原分区完整数据,只能尽量通过缩小调度任务时间粒度保证尽量实时,但优点也非常明显:计算稳定,开发维护成本低,团队底蕴好。
  2. HBase+ Hive: 这是对方案1的一种升级方案,使用HBase保存全量数据,应用HBase的高吞吐能力处理CDC数据实时逆流程还原。另一面,由于HBase是将数据存储在HDFS,Hive可以通过门面包装HBase表的方式提供计算能力。该方案的最大优点是,可以保证数据实时存储,需要挑战的问题是:Hive分析时会影响HBase的读能力,可能会造成实时写程序背压导致连锁反应,另外,需要多维护两套实时的架构,包括HBase和消息驱动写入,复杂性较高。
  3. Imapla+kudu: 这一对组合是Cloudera开源的MMP架构,在追加场景下(只有增,没有删改)是绝对的黄金组合,使用Imapla保存无变更的归档冷数据,kudu保存新增追加数据,在数据计算时,冷数据路由到Impala引擎,实时数据路由到kudu引擎,已视图的形式提供服务,该组合实现的冷热分离方案能达到相当好的实时分析能力。但在其他包括删改的场景下必须排除不支持变更的Impala单独使用kudu,即便如此,kudu在同时实时写和分析的能力在AD HOC场景也是表现也是相当优异的。但是对比HBase,kudu自己组织数据存储,相对使用HDFS的HBase,需要对数据文件保障投入大量资源。最后,同方案2一样,需要多维护两套实时的架构,复杂性较高。目前得到的行为分析系统已经使用了Imapla,还没有引入kudu,主要的困扰就是运维成本。
用户分群 -- 得到实时大数据运营探索

总结看来,只使用kudu是我们的最佳选择。但是最终,我们在第一版本中选用了方案一,这个只能达到准实时有弊端的方案。为何要选看似最不好的选择呢?

原因相信大家都经历过,就是:时间紧任务重人手不足。当时投入项目的开发资源,在已经引入全新架构Debezium, Connect的情况下,很难在分配精力在另一个新架构。由于基础架构的限制,故选择了团队最熟悉的Hive+调度系统的来实现30分钟延迟的数据准备,当然,这是在需求方认可的情况下!共识是:数据驱动型产品落地的初心是驱动业务发展,在初期,相比上来就平台化的思路,更应该选择快速验证推进。

用户分群 -- 得到实时大数据运营探索

确定了使用Hive + 调度系统的方式后,文件存储也确定下来,必然是HDFS。此时数据层的逻辑基本上已经齐整了,此时只需要处理两个关键点就可以将整个逻辑串起来:

  1. 如何将从Kafka取出的数据库CDC数据放到HDFS供后续的还原,这个自然而然的就选择Confluent官方的 HDFS Connector;
  2. HDFS中已存在的历史数据,如何与从Kafka中取出的Binlog数据做逆流程还原为最新的原始数据?

答案是:根据数据的性质分层组织数据,再基于分层仓库进行Hive merge还原计算。

用户分群 -- 得到实时大数据运营探索

首先,Hive数据仓库使用了简化的三层数仓分层模型:

第一层:ODS数据缓冲层,保存的是HDFS Conector从Kafka拉取到的Binlog文件。

第二层:DWS数据汇总层,这层承载的是完全还原回来的业务数据库,由于Hive每次还原都是按时间点分区保存的全量数据镜像,故可以称之为事实分区表。最新的镜像依赖于上一次全量镜像与Binlog merge计算生成。如此,在使用DWS表必须指定最新分区。

第三层:为了屏蔽DWS层必须指定分区的问题,增加了ADS应用层,ADS表是对DWS表创建的一个逻辑表,其永远通过软连接指向DWS表的最新分区。

DWS和ADS的建设是需要有调度支撑的,这里使用团队底蕴较好的Azkaban。

如何做最关键的 Merge计算?

Hive场景下的Merge计算,是在最近一次镜像的数据集上合并新增Binlog的批计算。这种操作更像是阶段性的执行双流合并。下面的流程图即为一个简单的逻辑演示,左侧是有序的Binlog,右侧是作为基准数据集的存量数据。

用户分群 -- 得到实时大数据运营探索
  1. 首先对Binlog按 主键+事件类型 group by,取出delete 事件与存量数据进行left outer join,这样就从存量数据中剔除了删除数据;
  2. 对剩余Binlog数据进行二次筛选,按主键 group by 取最新的Binlog,无论是insert或是update;
  3. 将步骤2中生成的数据与存量数据合并,生成最新数据。如图中的黄色的1、2是受Binlog的update重放影响的,绿色的4是insert新增的。

至此,数据构造层和数据计算层都已经确定,各种素材都已经准备好了,可以通过Hive Server组件提供人群计算服务了。如何基于这些数据定义人群并应用起来呢?就是产品层和服务层这两层要解决的。

用户分群 -- 得到实时大数据运营探索

三、产品层和应用层

产品层核心问题主要有两个:

  1. 如何将人群构建流程抽象为易用系统;
  2. 如何将复杂的人群定义条件转为优化的Hive SQL语句?

先讨论产品层的第一步问题:复杂的人群定义条件,如何向上抽象易用的系统,向下转化为高效的Hive sql调用数据计算层?再次分析初始需求:(满足X,满足Y,满足Z,不满足M),进行计算机语言转化后 -> (X且Y且Z且) 非 M ,其本质是明显的集合运算。故计算流程可以抽象为下方的图,共分三步:

第一步:计算所有满足人群的交集;

第二步: 计算过滤的集合;

第三步:从满足人群的交集中剔除过滤集合,最终的灰白色即为我们的目标人群。

用户分群 -- 得到实时大数据运营探索

集合条件的自由组合,适合的产品形态当然是下拉框点选,最终向需求方交付的数据门户中的用户分群系统就是这样的形态。初始需求中的点选方案 “且或否”,只是一个简单的例子,且或非集合操作能达成非常灵活的组合,这对转化成的Hive sql的复杂性和执行性能也会有极大的挑战。如果在Hive计算时发生倾斜情况,将拖慢整个查询,计算时长将远超出我们的可接受范围。

而集合运算场景如果按照标准的join思路去解决,就容易发生倾斜问题。比如< 多且> 场景,如果按照多次inner join解决,如图中的演示,蓝色和黄色的join,结果与绿色的join,结果再与紫色的join,多个inner join将不断重复这一流程,在Hive计算引擎将sql转化为MapReduce时,stage的个数将于且条件个数成正比。同时,Hive在进行join计算时,当进行关联的两张表数据量差异过大,很容易产生严重的数据倾斜问题。

用户分群 -- 得到实时大数据运营探索

所以,必须在条件转化为Hivesql时优化处理,消除stage过多和数据倾斜问题,这两点也是Hive工程中必须解决的。考虑最终输出结果是单列的uid列表,最终的解决方案是使用(uid+随机数)Union all + Group By +HavingCount 。对于上面的例子,筛选各个满足的子条件人群时,在各个子人群的用户上增加随机数标识。接着直接将各个子人群Union all合并为一个大数据集。最后对这个大数据集按(uid+随机数)Group By并且Having Count = 子条件个数。这样的操作,有效优化了计算资源消耗和计算时间。

应用层要解决的事情是如何确定好边界,为上层应用提供高效易用的分群数据服务。由于服务层是负责系统之间的交互的,所以确定好边界是最先要做的事情。通过和需求方运营老师和上游系统讨论后,确定最终的边界是:运营老师在数据门户的用户分群系统创建人群,在其他的运营动作系统联动用户分群系统提供的Api服务,将运营动作与人群绑定。人群被产品层构建好后,会被同步到Ali 对象存储和MongoDB两种存储,基于这两种存储引擎,又构建了相应的Api服务。

用户分群 -- 得到实时大数据运营探索

离线场景下,如消息中心系统的push,站内信,以及优惠券系统的发劵,通过构建在对象存储上的Api获取人群CDN地址,下载到系统内遍历文件进行相关处理。

在线场景下,如开机弹窗,首页banner的策略判断。调用基于MongoDB构建的Api,依赖于MongoDB高扩展性,构建合理的索引支撑高吞吐,服务线上的大量实时低RT请求。

至此,分层架构完全确认了。产品层的用户分群系统,将分群条件转化为高效的 Hive sql语句,通过HiveServer 服务调起分群计算任务。最终将结果,也就是uid列表,转存至对象存储OSS和MongoDB,在这两个存储介质之上构建对应的Api分别处理离线和实时两类人群需求。

用户分群 -- 得到实时大数据运营探索

四、踩坑和未来规划

正如前面所述,由于应用了新架构,在项目落地中踩了很多坑。

用户分群 -- 得到实时大数据运营探索

在Debezium方面,由于任务的本质是持续的流式任务,由于各种因素导致的Binlog丢失,是很难能找回来的。对于这种异常,采用对每张表每周定时一次的稳定 Snapshot 来尽量减小可能的损失,从原理和实际情况看,Debezium Snapshot是足够稳定的。另外,紧急情况下出现存量数据不一致问题时,使用备用线路Datax拉取全量数据替代Snapshot,因为datax的全量拉取速度比Debezium Snapshot更快,当然,该操作都需要做更多的人工运维操作。

另外由于架构太新且持续迭代,无法避免官方bug的影响,目前已经累计触发3次官方issue,原因均为对同步数据表的规范验证过于严格,比如Date类型不允许默认值为公元起点,这类issue通过源码分析都可以修复掉,但再尝试向官方提交时发现总是晚了一步,尝试成为commit失败。所以在长期运维的过程中,数据中心会保持各类组件与官网的版本一致。在升级过程中,也踩过版本匹配的坑导致平滑升级失败,如Debezium和Connetor,以及HDFS Connect的版本均必须一致。

在Connetor应用方面,由于我们采用的是集群模式,影响我们最大的是集群的重平衡机制导致任务频繁banlance,尤其在快速提交多个Debezium任务时,在Snapshot的阶段下,任务重新banlance很大概率会导致数据丢失,甚至导致数据库死锁。目前初期的解决方案仍以运维规范为主,不会短时间提交大量connect任务。而长期的优质解决方案,是放弃Connetor集群模式,采用standalone+ k8s的容器模式,完全实现任务隔离。

当然,在发生问题的第一步,及时向用户同步问题并道歉,是我们从《程序员的职业素养》学到的,“要练习的第一件事就是道歉”。

用户分群 -- 得到实时大数据运营探索

在项目的落地过程中,得到还没有做到真正实时的分群,止步在延迟30分钟的准实时。在未来数据团队的规模和能力能cover更复杂的架构时,必然会做一次升级,做到真正的精准运营。另一方面,在新型架构的探索和应用过程中,对异构数据同步场景的平台化思路越来越清晰。异构数据同步的需求不仅仅是在用户分群中,相信在互联网开发中,都会有类似的使用数据库CDC数据的场景。而Debezium+Connector的先进特性带来的高扩展性能对平台化是非常强有力的支持。

目前对这个平台的设想是:开发人员只需点点点配置,就能自动化的启动一个资源隔离的Debezium Connector任务,输出一个实时的CDC日志kafka topic,平台在屏蔽底层复杂任务细节上,支撑任务的全生命周期管理和监控报警等。

  • 暂无推荐