1. 业务场景

主要考虑两个因素:

  • 纯粹的数据量角度:

    • 单表数据量过大已经影响查询性能(查看当前数据量)
    • 数据量增长迅速,防患于未然(查看近一月数据量增长情况,对未来做出预期)

    我们目前存在一个数据表1600w+数据,月增长200w左右。

  • 客户实际感知的业务响应速度带来的直观用户体验影响

    我们线上这个千万级的表查询速度已经5s+

2. 方案

2.1. 分表方式

预算这里已经单独使用一个MySQL实例,拥有不用与其他服务共享的,完全的CPU和磁盘IO性能,理论上性能够用,无需再做分库处理。

大多数场景依然还是单表数据量大,扫描行过多,增加了不必要的CPU负担。

水平分表,减少单表扫描行数,应该是较为简单快速的提升性能的一个方式。同时由于单表数据量变小,后续执行DDL时,也能更迅速。

因此使用水平分表。

2.2. 分表算法与分片键的选择。

分表时必须将“同一个客户”的数据落到一个表中,避免触发全表路由,不符预期。

**作为SaaS业务,我们采用企业的订阅号(企业编码)字段作为分片键。**对“同一个客户”的业务理解是字符串类型的企业编码相同的数据,这个字段也是一个表中的字段。

这里采用简单的哈希取模的方式,即:

Math.abs(企业编码.hashCode() % table_number)

缺点:

  • 每家客户的数据量不一样,可能出现几个分表的“数据倾斜”的问题。

优点:

  • 简单易实现,能够保证同一客户数据落到一个表中的业务预期。
  • 通过一致性哈希算法来实现,后期扩容影响范围小,减少业务影响

后续优化方向上可以考虑单独pick出一些数据量大的企业,路由到单独的表中。

根据一致性hash算法,table_number我们需要约定一个固定的值,也就是一致性哈希环中的最大允许数量,使用2的幂数,因此可以用位运算取代模运算提高性能(参考HashMap的容量计算逻辑):

Math.abs(企业编码.hashCode() & (table_number - 1))

2.3. 分表数量

分表的数量上,以3年作为一个业务展望时间:

  • 4500w数据 → 300w一张表 → 15张表 → 取2的4次方 → 16张表 → 表的序号从0到15

一致性hash环的数量,扩大2个量级,取2的6次方64,应该已经绰绰有余了,因此我们有了一个具体的分表算法来计算这个表序号:

int temp = Math.abs(企业编码.hashCode() & (64 - 1));
return temp / (64 / 16);

// ==> 进一步优化 
return (Math.abs(企业编码.hashCode() & 63)) / 4;

后期假设第0张表数据量过多,我们需要二次分表第0张表扩容,我们假设增加一个序号为16的分表,只需要对表0做一次数据迁移即可,其他表按照一致性hash算法,不会发生分表变化。分表算法修改为:

int hashMod = Math.abs(企业编码.hashCode() & 63;
int temp = hashMod / 4;
if (0 == temp) {
	return hashMod / 2 == 0 ? 0 : 16;
} else {
	return temp;
}

如果序号为2的幂数,这里的算法会更简单,也是更标准的一致性hash算法,不过相应的影响的表数量也会更多,不做展开。

2.4. 迁移数据

2.4.1. 数据迁移技术选型

水平分表本质上是将原来一个表的数据按照分表算法将数据分配到n个分表中,因此存在数据迁移问题,即原表→若干分表

  • 阿里云DTS工具(开源版Canal

    由于需要支持分表算法,一个思路是在MySQL中实现一个Java的String#hashCode方法,然后在迁移数据过程中对数据进行清洗,过滤符合当前分表算法的数据进入指定分表。

    但是很遗憾,DTS工具商业版暂不支持该功能,后续Canal可能可以通过代码的方式支持,这一块待完善。

    Untitled

  • 阿里云Dataworks工具(开源版DataX

    这个工具本质是一个大数据治理平台,支持自定义函数(可以用Java语言写UDF),能够满足需求。

选型上,只有Dataworks能够完成需求,下文说明如何通过阿里云Dataworks工具完成数据迁移。

  1. 首先,需要将上文的分表算法用Java语言描述,需要开发一个【UDF】发布到阿里云的Dataworks的函数上:

    1. 我这里使用的是Function Studio的方式发布上去的,代码参考:

      package com.alibaba.dataworks.udf;
      import com.aliyun.odps.udf.UDF;
      
      public class DataTransfer extends UDF {
          public Integer evaluate(String s) {
              int hash = s.hashCode();
              return (Math.abs(hash & 63)) / 4;
          }
      }
      

      需要注意的是,Function Studio后续不再维护,阿里云官方鼓励自己上传jar的方式:

      Untitled

  2. 用【ODPS SQL】创建两个ODPS表,一个用来从源库同步全量数据,一个用来存储分表数据,这里的数据是写文件的,速度很快:

    Untitled

    DROP TABLE IF EXISTS log_odps;
    CREATE TABLE IF NOT EXISTS log_odps(
    	...
    )
    COMMENT ''
    PARTITIONED BY (pt STRING) 
    lifecycle 36500;
    
    DROP TABLE IF EXISTS log_odps_sharding;
    CREATE TABLE IF NOT EXISTS log_odps_sharding(
    	...
    )
    COMMENT ''
    PARTITIONED BY (pt STRING) 
    lifecycle 36500;
    
  3. 用【数据集成】从MySQL数据源同步数据到ODPS中,需要注意这里的清理规则建议选择“Insert Overwrite”,这样后续可以重复执行不需要清理ODPS数据,分区可以随便填写一个字符串:

    Untitled

    Untitled

  4. 同步到ODPS数据后,用【ODPS SQL】,可以用“create … as …”语法,将ODPS中的数据同步到一个新表中,每个分表为一个分区:

    set odps.sql.allow.fullscan=true;
    insert overwrite table log_odps_sharding partition(pt)
    select `(pt)?+.+`, DataTransfer(企业编码) as pt from log_odps;
    
    1. set odps.sql.allow.fullscan=true 开启全表扫描。也可以指定下面的select语句的分区,否则需要开启全表扫描(只有一个分区,无意义)
    2. insert overwrite 写数据前清理数据。
    3. (pt)?+.+除了pt字段以外的字段(Hive SQL这里分区字段会作为SELECT *的一列返回,因此需要排除)
    4. DataTransfer为我们定义的UDF函数,这里我们直接调用函数的结果作为log_odps_sharding的分区字段
    5. 至此,我们的log_odps数据会分布在log_odps_sharding对各个分区中,分区字段则为分表的表序号。
  5. 用【数据集成】将ODPS数据写回原数据源。

    Untitled

    1. 这里的pt值,按照上面的分表算法,我这里直接写1,2,3之类的分表序号即可
    2. 主键冲突策略选择“on duplicate key update”策略,这样可以多次执行实行不停机迁移(下文说明)

2.4.2. 不停机迁移

不停机迁移的方案基本上是两个步骤:

  • 全量数据迁移(数据量大,比较慢,需要提前执行,可以在发版前执行)
  • 增量数据同步(数据量小,比较快,可以在发版后执行)

需要注意,这里增量数据同步追平全量数据时:

  • 会有一段时间的数据不一致的。这个如果有很高的要求可以尝试“实时计算”同步的方案,尽可能缩小增量数据同步时间也就缩小了数据不一致的时间区间(dataworks中有【实时同步】选项)
  • 最终一致性。当增量数据同步完成,追平全量数据的时刻,数据是一致的。

不停机迁移这里由于Dataworks是即时读取数据源数据,而不是像DTS那样可以感知到binlog,因此需要保证数据的修改有“标记”:

  • 数据行上要有逻辑删字段
  • 数据行上要有代表修改时间的字段

在Dataworks上的具体实践:

  1. 全量数据迁移:按照上文流程走即可

  2. 增量数据迁移:

    1. 在新发布的分表代码中,需要对数据进行**“双写”,即:新写入/修改的数据,在对对应分表发生操作的同时,也对原始表进行一次同样的操作,二者需要保证原子性。(如果是同数据库,可以使用本地事务),在此基础上,我们就可以确定:原数据表一定是最新的数据。**
    2. 在上文迁移方案中的第三步“用【数据集成】从MySQL数据源同步数据到ODPS中”过程中,我们可以指定只同步修改时间大于“全量数据同步开始的时间点”的数据
    3. 其余流程不变,再次执行即可。

    由于我们遇到相同数据时选择了“on duplicate key update”策略,Dataworks会帮助我们update数据。由于我们加入了逻辑删字段,即时数据发生了删除,Dataworks也会理解为一次update,因此不会发生数据没有正常删除的问题。

2.5. 异常回滚

由于上述不停机迁移过程中采用了“双写”,因此这里一旦发生异常,只需要切回异常前代码与配置即可。

2.6. 框架选择

市面上分表的框架还是比较多的,比如kingshard**,**ShardingSphere,Mycat等。

这次选型不严谨选用ShardingSphere下的ShardingSphere,主要出于以下考虑:

  • 社区活跃度上,ShardingSphere的社区活跃度很高。
  • ShardingSphere-JDBC无需额外部署服务,JAR包方式接入。
  • 隔壁小伙伴有使用的ShardingSphere经验,技术栈一致,减少维护和学习成本。

ShardingSphere下又有几个产品,ShardingSphere-JDBC,Sharding-Proxy,Sharding-Sidecar(规划中):

Untitled

这里选择了ShardingSphere-JDBC,主要考虑ShardingSphere-JDBC运维成本比较低,无需部署额外的中心化服务,去中心化,分表的配置主动权在开发这里。

当然ShardingSphere-JDBC也会存在一些缺点:

  • 不支持弹性伸缩,每次调整分片算法后,需要将数据手动迁移到各分片上。

    4.1.0版本之后,提供了Sharding-Scaling方案解决弹性伸缩问题,只支持Sharding-Proxy方案。

    Untitled

2.7. ShardingSphere-JDBC使用流程

Maven坐标(SpringBoot starter方式接入):

<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
		<version>latest.version</version>
</dependency>

配置参考:

# sharding-jdbc
# datasource config
spring.shardingsphere.datasource.names = ds0
spring.shardingsphere.datasource.ds0.type = com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds0.url = jdbc:mysql:xxx
spring.shardingsphere.datasource.ds0.username = xxx
spring.shardingsphere.datasource.ds0.password = xxx

# sharding table config

spring.shardingsphere.sharding.tables.[table_name].actual-data-nodes = ${['ds0']}.[table_name]_$->{0..15}
spring.shardingsphere.sharding.tables.[table_name].table-strategy.standard.sharding-column = [shard_key]
spring.shardingsphere.sharding.tables.[table_name].table-strategy.standard.precise-algorithm-class-name = com.maycur.budget.config.sharding.MaycurPreciseShardingAlgorithm

# show sql
spring.shardingsphere.props.sql.show = true
  • spring.shardingsphere.datasource.xx 指定数据源配置
  • spring.shardingsphere.sharding.tables.xx 指定分表配置
  • spring.shardingsphere.props.sql.show 展示实际路由的分表的SQL

这里分片的策略是自己写了一个类:

import java.util.Collection;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;

/**
 * Sharding algorithm.
 *
 * @author <a href="mailto:masaiqi.com@gmail.com">masaiqi</a>
 * @date 2022/7/18 17:39
 */
public class MaycurPreciseShardingAlgorithm implements PreciseShardingAlgorithm<String> {
    @Override
    public String doSharding(Collection<String> collection,
        PreciseShardingValue<String> preciseShardingValue) {
        String value = preciseShardingValue.getValue();
        int shardingKey = getShardingKey(value);

        return collection.toArray()[shardingKey].toString();
    }

    private int getShardingKey(String originKey) {
        return Math.abs(originKey.hashCode() & 63) / 4;
    }
}

需要注意的是,这个类无需交给Spring IOC管理,Sharding-JDBC会负责实例化对象:

public static <T extends ShardingAlgorithm> T newInstance(final String shardingAlgorithmClassName, final Class<T> superShardingAlgorithmClass) {
    Class<?> result = Class.forName(shardingAlgorithmClassName);
    if (!superShardingAlgorithmClass.isAssignableFrom(result)) {
        throw new ShardingSphereException("Class %s should be implement %s", shardingAlgorithmClassName, superShardingAlgorithmClass.getName());
    }
    return (T) result.newInstance();
}

备选方案:分片的策略也可以使用ShardingSphere的行表达式,不过分片key为字符串时,groovy不允许直接取模,需要转换为字符串,再加上没有Java中的String#hashcode方法,比较复杂。

spring.shardingsphere.sharding.tables.[tablename].table-strategy.inline.algorithm-expression=[groovy expression]

Done!启动运行。

3. 参考