本文由 简悦 SimpRead 转码, 原文地址 www.toutiao.com
作者:尚琎 仁劼什么是人群圈选随着数据时代的发展,各行各业数据平台的体量越来越大,用户个性化运营的诉求也越来越突出,用户标签系统,做为个性化千人
作者:尚琎 仁劼
随着数据时代的发展,各行各业数据平台的体量越来越大,用户个性化运营的诉求也越来越突出,用户标签系统,做为个性化千人千面运营的基础服务,应运而生。如今,几乎所有行业(如互联网、游戏、教育等)都有实时精准营销的需求。通过系统生成用户画像,在营销时通过条件组合筛选用户,快速提取目标群体,例如:
- 电商行业中,商家在运营活动前,需要根据活动的目标群体的特征,圈选出一批目标用户进行广告推送或进行活动条件的判断。
- 游戏行业中,商家需要根据玩家的某些特征进行圈选,针对性地发放大礼包,提高玩家活跃度。
- 教育行业中,需要根据学生不同的特征,推送有针对性的习题,帮助学生查缺补漏。
- 搜索、门户、视频网站等业务中,根据用户的关注热点,推送不同的内容。
以电商平台中一个典型的目标群体圈选场景为例,如服装行业对其潜在客户信息采集,打标,清洗后如下表:
(以上表结构中,第一列为用户身份的唯一标识,往往作为主键,其他列均为标签列。)
如公司想推出一款高端男性运动产品,则可能的圈选条件为:
- 男性,推出产品的受众群体为男性。
- 运动爱好者,运动爱好者更有可能消费运动类产品。
- 一线城市,一线城市用户相比于二三线城市用户,可能更倾向于消费高端产品。
- …
从上述表结构(人群圈选典型表结构,且大都如此,第一列为用户 id,其余皆为标签列)和查询条件可以看出,人群圈选业务都面临一些共同的痛点:
- 用户标签多、标签丰富,标签列可达成百甚至上千列。
- 数据量庞大,用户数多,从而所需运算量也极大。
- 圈选条件组合多样化,没有固定索引可以优化,存储空间占用极大。
- 性能要求高,圈选结果要求及时响应,过长的延时会造成营销人群的不准确。
- 数据更新时效要求高,用户画像要求近实时的更新,过期的人群信息也将直接影响圈选的精准性。
针对以上痛点,本文将从原理层面深度分析,多角度对比讲解如何使用 ClickHouse 搭建人群圈选系统,为何选择 ClickHouse,以及选用 ClickHouse 搭建人群圈选系统的优势。
本文以开源 ElasticSearch(ES)为例,仅针对人群圈选场景与 ClickHouse 做对比。开源版 ES 是一款高效的搜索分析引擎,利用其优秀的索引技术,可以完成各种复杂的条件组合和数据聚合运算。ClickHouse 是最近比较火的一款开源列式存储分析型数据库,它最核心的特点就是极致存储压缩率和查询性能,尤其擅长单个大宽表的查询场景。因此细比两者,相较于 ClickHouse,ES 虽具备人群圈选业务所需的必要能力,但仍有以下 3 方面不足:
成本方面:
开源 ES 的底层存储使用 lucene,主要包含行存储(storefiled),列存储(docvalues)和倒排索引 (invertindex)。行存中_source 字段用于控制 doc 原始数据的存储。在写入数据时,ES 把 doc 原始数据的整个 json 结构体当做一个 string,存储为_source 字段,因此_source 字段对存储占用量大且关闭_source 将不支持 update 操作。同时,索引也是 ES 不可缺少的一部分,ES 默认全列索引,虽可手动设置对特定的列取消索引,但取消索引的列将不可查询。在人群圈选场景下,选取标签过滤条件是任意的,多样的,不断变化的。对任意一条标签列不做索引都是不现实的,因此针对成百上千列的大宽表,全列索引必然使得存储成本翻倍。
ClickHouse 是一款彻底的列式存储数据库,且 ClickHouse 的查询不依赖索引,使用过程中也不强制构建索引,因此不需要保留额外的索引文件。同时 ClickHouse 存储数据的副本数量灵活可配,可将使用成本降至最低。
数据更新与治理方面:
索引为 ES 带来了高效的查询性能,但是索引的构造过程是复杂的,耗时的。每一次索引的构建都需对全列数据进行扫描,排序来生成索引文件。而在人群圈选业务中,人群信息必然是不断增长的。标签的不断更新将会使得 ES 不得不频繁的重构索引,这将对 ES 的性能造成巨大的开销。
ClickHouse 的查询不依赖索引,使用过程中也不强制构建索引。因此对于新增数据,ClickHouse 不涉及索引的更新与维护。
易用性方面:
开源 ES 缺少完备的 sql 支持,查询请求的 json 格式复杂。同时 ES 对多条件过滤聚合的执行策略缺少优化,还以文章开头的典型场景为例,圈出一款高端男性运动产品的受众人群。可得如下 sql:
“SELECT user_id FROM whatever_table WHERE city_level = ‘一线城市’ AND gender = ‘男性’ AND is_like_sports = ‘是’;”
针对以上 sql,ES 的执行会对 3 个标签分别做 3 次索引扫描,之后再将 3 次扫描的结果做 merge,流程如下图所示
而 ClickHouse 的执行则更优雅一些。ClickHouse 采用标准 sql,语法简单且功能强大。在执行 where 语句时,会自动优化形成 prewhere 分层执行,因此二次扫描将基于一次扫描的结果进行,执行流程如下图所示:
显而易见,针对复杂条件过滤的场景,ClickHouse 对多条件筛选流程做出优化,扫描的数据量更小,性能也较 ES 而言更高效。
对比选型完成后,接下来讲解如何基于 ClickHouse 搭建人群圈选系统,回顾文章开头的业务描述和上一部分的典型 sql( “SELECT user_id FROM whatever_table WHERE city_level = ‘一线城市’ AND gender = ‘男性’ AND is_like_sports = ‘是’;” ),再次总结人群圈选业务对数据库能力的要求如下:
- 具备高效的批量数据导入性能。
- 具备处理频繁,实时 update 的能力。
- 具备加列 / 减列的 DDL 能力。
- 可以指定任意列为过滤条件的高效查询能力。
面对以上需求,ClickHouse 如何使用才能在人群圈选场景下物尽其用,扬长避短?
1. insert 代替 update
首先要解决的是 ClickHouse 的异步 update 机制。ClickHouse 对 update 的执行是低效的,ClickHouse 内核中的 MergeTree 存储一旦生成一个 Data Part,这个 Data Part 就不可再更改了。所以从 MergeTree 存储内核层面,ClickHouse 就不擅长做数据更新删除操作。ClickHouse 的语法把 Update 操作也加入到了 Alter Table 的范畴中,它并不支持裸的 Update 操作。
1
|
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;
|
注:* 左右滑动阅览
当用户执行一个如上的 Update 操作获得返回时,ClickHouse 内核其实只做了两件事情:
- 检查 Update 操作是否合法;
- 保存 Update 命令到存储文件中,唤醒一个异步处理 merge 和 mutation 的工作线程;异步线程的工作流程极其复杂,总结其精髓描述如下:先查找到需要 update 的数据所在 datapart,之后对整个 datapart 做扫描,更新需要变更的数据,然后再将数据重新落盘生成新的 datapart,最后用新的 datapart 做替代并 remove 掉过期的 datapart。
这就是 ClickHouse 对 update 指令的执行过程,可以看出,频繁的 update 指令对于 ClickHouse 来说将是灾难性的。
因此,我们使用 insert 语句代替 update 语句。当需要对某一指定 user 更新标签时,就重新插入一条该 user 的数据,如对表中 07 号用户进行数据更新:
最终,每个 user 可能都存在多条记录。针对人群圈选场景,同一 user 错乱冗余的信息显然对查询结果产生误导,无法满足精准圈选的需求。接下来讲解如何使用 ClickHouse 进行主键去重,即同一 user,然后 insert 进来的数据覆盖掉已有的数据,实现 update 的效果。
2. 选用 AggregatingMergeTree 表引擎
MergeTree 是 ClickHouse 中最重要,最核心的存储内核,MergeTree 思想上与 LSM-Tree 相似,其实现原理复杂,不在此展开,因为一篇文章也难以讲解清楚。本篇围绕人群圈选场景,着重从功能层面描述如何在人群圈选场景下使用 MergeTree 的变种 AggregatingMergeTree 以及使用 AggregatingMergeTree 可实现的数据聚合效果。AggregatingMergeTree 继承自 MergeTree,存储上和基础的 MergeTree 其实没有任何差异,而是在数据 Merge 的过程中加入了 “额外的合并逻辑”, AggregatingMergeTree 会将相同主键的所有行(在一个数据片段内)替换为单个存储一系列聚合函数状态的行。以文章开头部分的表结构为例,使用 AggregatingMergeTree 表引擎的建表语句如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
CREATE TABLE IF NOT EXISTS whatever_table ON CLUSTER default
(
user_id UInt64,
city_level SimpleAggregateFunction(anyLast, Nullable(Enum('一线城市' = 0, '二线城市' = 1, '三线城市' = 2, '四线城市' = 3))),
gender SimpleAggregateFunction(anyLast, Nullable(Enum('女' = 0, '男' = 1))),
interest_sports SimpleAggregateFunction(anyLast, Nullable(Enum('否' = 0, '是' = 1))),
reg_date SimpleAggregateFunction(anyLast, Datetime),
comment_like_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),
last30d_share_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),
user_like_consume_trend_type SimpleAggregateFunction(anyLast, Nullable(String)),
province SimpleAggregateFunction(anyLast, Nullable(String)),
last_access_version SimpleAggregateFunction(anyLast, Nullable(String)),
others SimpleAggregateFunction(anyLast,Array(String))
)ENGINE = AggregatingMergeTree() partition by toYYYYMMDD(reg_date) ORDER BY user_id;
|
注:* 左右滑动阅览
就以上建标语句展开分析,AggregatingMergeTree 会将除主键(user)外的其余列,配合 anyLast 函数,替换每行数据为一种预聚合状态。其中 anyLast 聚合函数声明聚合策略为保留最后一次的更新数据。
3. 数据一致性保证
上一部分讲述了如何针对人群圈选场景选择表引擎和聚合函数,但是 AggregatingMergeTree 并不能保证任何时候的查询都是聚合过后的结果,并且也没有提供标志位用于查询数据的聚合状态与进度。因此,为了确保数据在查询前处于已聚合的状态,还需手动下发 optimize 指令强制聚合过程的执行。同时方便起见,可自行配置周期性 optimize 指令的下发。例如每 10 分钟执行一次 optimize 指令。optimize 的执行周期可在业务的实时性需求与计算资源之间做权衡。如数据量过大,optimize 生效慢,可按 partition 级别并行下发做优化。optimize 生效后即可实现去重逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
import java.sql.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.concurrent.TimeoutException;
public class Main {
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT);
public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException, ParseException {
String url = "your url";
String username = "your username";
String password = "your password";
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
String connectionStr = "jdbc:clickhouse://" + url + ":8123";
try {
Connection connection = DriverManager.getConnection(connectionStr, username, password);
Statement stmt = connection.createStatement();
// 创建local表
String createLocalTableDDL = "CREATE TABLE IF NOT EXISTS whatever_table ON CLUSTER default " +
"(user_id UInt64, " +
"city_level SimpleAggregateFunction(anyLast, Nullable(Enum('一线城市' = 0, '二线城市' = 1, '三线城市' = 2, '四线城市' = 3))), " +
"gender SimpleAggregateFunction(anyLast, Nullable(Enum('女' = 0, '男' = 1)))," +
"interest_sports SimpleAggregateFunction(anyLast, Nullable(Enum('否' = 0, '是' = 1)))," +
"reg_date SimpleAggregateFunction(anyLast, Datetime)) " +
"comment_like_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),\n" +
"last30d_share_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),\n" +
"user_like_consume_trend_type SimpleAggregateFunction(anyLast, Nullable(String)),\n" +
"province SimpleAggregateFunction(anyLast, Nullable(String)),\n" +
"last_access_version SimpleAggregateFunction(anyLast, Nullable(String)),\n" +
"others SimpleAggregateFunction(anyLast, Array(String)),\n" +
"ENGINE = AggregatingMergeTree() PARTITION by toYYYYMM(reg_date) ORDER BY user_id;";
stmt.execute(createLocalTableDDL);
System.out.println("create local table done.");
// 创建distributed表
String createDistributedTableDDL = "CREATE TABLE IF NOT EXISTS whatever_table_dist ON cluster default " +
"AS default.whatever_table " +
"ENGINE = Distributed(default, default, whatever_table, intHash64(user_id));";
stmt.execute(createDistributedTableDDL);
System.out.println("create distributed table done");
// 插入mock数据
String insertSQL = "INSERT INTO whatever_table(\n" +
"\tuser_id,\n" +
"\tcity_level,\n" +
"\tgender,\n" +
"\tinterest_sports,\n" +
"\treg_date,\n" +
"\tcomment_like_cnt,\n" +
"\tlast30d_share_cnt,\n" +
"\tuser_like_consume_trend_type,\n" +
"\tprovince,\n" +
"\tlast_access_version,\n" +
"\tothers\n" +
"\t)SELECT\n" +
" number as user_id,\n" +
" toUInt32(rand(11)%4) as city_level,\n" +
" toUInt32(rand(30)%2) as gender,\n" +
" toUInt32(rand(28)%2) as interest_sports,\n" +
" (toDateTime('2020-01-01 00:00:00') + rand(1)%(3600*24*30*4)) as reg_date,\n" +
" toUInt32(rand(15)%10) as comment_like_cnt,\n" +
" toUInt32(rand(16)%10) as last30d_share_cnt,\n" +
"randomPrintableASCII(64) as user_like_consume_trend_type,\n" +
"randomPrintableASCII(64) as province,\n" +
"randomPrintableASCII(64) as last_access_version,\n" +
"[randomPrintableASCII(64)] as others\n" +
" FROM numbers(100000);\n";
stmt.execute(insertSQL);
System.out.println("Mock data and insert done.");
System.out.println("Select count(user_id)...");
ResultSet rs = stmt.executeQuery("select count(user_id) from whatever_table_dist");
while (rs.next()) {
int count = rs.getInt(1);
System.out.println("user_id count: " + count);
}
// 数据合并
String optimizeSQL = "OPTIMIZE table whatever_table final;";
// 如数据合并时间过长,可在partition级别并行执行
String optimizeByPartitionSQL = "OPTIMIZE table whatever_table PARTITION 202001 final;";
try {
stmt.execute(optimizeByPartitionSQL);
}catch (SQLTimeoutException e){
// 查看merge进展
// String checkMergeSQL = "select * from system.merges where database = 'default' and table = 'whatever_table';";
Thread.sleep(60*1000);
}
// 人群圈选(city_level='一线城市',gender='男性',interest_sports='是', reg_date<='2020-01-31 23:59:59')
String selectSQL = "SELECT user_id from whatever_table_dist where city_level=0 and gender=1 and interest_sports=1 and reg_date <= NOW();";
rs = stmt.executeQuery(selectSQL);
while (rs.next()) {
int user_id = rs.getInt(1);
System.out.println("Got suitable user: " + user_id);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
|