一、ShardingContext

在Sharding-Jdbc中,我们其实需要抓住一个核心类,也就是ShardingContext,分片上下文,里面定义了下面几个内容:

@RequiredArgsConstructor
@Getter
public final class ShardingContext {
    //分片规则
    private final ShardingRule shardingRule;
    //数据库类型
    private final DatabaseType databaseType;
    //执行引擎
    private final ExecutorEngine executorEngine;
    //是否要在log文件中展示sql语句
    private final boolean showSQL;
}

里面的几个参数很简单易懂,但是里面包含的内容也是整个中间件的核心内容。

二、preparedStatement

这块对应于代码中的jdbc/core/statement,里面包括两部分,一部分是statement,一部分是prepareStatement。考虑到性能问题(prepareStatement可以进行缓存)和代码的优雅性(变量的设置),以及考虑到通常JDBC使用的都是prepareStatement,所以我们着重看下prepareStatement的两部分,即MasterSlavePreparedStatement和ShardingPreparedStatement。

2.1 MasterSlavePreparedStatement

2.1.1 获取connection

支持读写分离。这块可以先看一些他的构造方法,构造方法中一般都有这样的方法:

connection.getConnections(sqlStatement.getType())

这块其实是根据sql的类型,来获取不同的连接

  • 如果是DDL类型,会获取所有的连接,包括master和slave,也就是说对于表结构的修改,会修改包括master和slave
  • 如果是DML类型,也就是写操作,获取的是master的连接
  • 最后,如果是DQL类型,也就是读操作,是根据读写分离的策略获取某个slave连接

后面两种是有本地缓存的,可以避免每次都进行connect构建,提高效率。

当然,这个sqlType是怎么来的?这就需要用到sql解析的模块了。

SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();

这块先提一下,后续再详细讲解。

2.1.2 执行sql

主要有三个方法,

  • executeQuery
public ResultSet executeQuery() throws SQLException {
    Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeQuery for DDL");
    return routedStatements.iterator().next().executeQuery();
}

首先说明下,这里的routedStatements是待执行sql的一个集合,这块首先确保只有一个查询语句。然后直接调用PreparedStatement.executeQuery()方法,比较直观。

  • executeUpdate
public int executeUpdate() throws SQLException {
    int result = 0;
    for (PreparedStatement each : routedStatements) {
        result += each.executeUpdate();
    }
    return result;
}

这块逻辑与上面一致,不过不一样的是,可以传入多个update的语句,然后顺序执行,最后调用的是PreparedStatement.executeUpdate()方法。

  • execute
public boolean execute() throws SQLException {
    boolean result = false;
    for (PreparedStatement each : routedStatements) {
        result = each.execute();
    }
    return result;
}

最后就是一些其他的方法的执行了。

总的来说,读写分离这块的内容比较简单,首先sql语句不要怎么变化,再者数据库连接connection基本上都是确定的,所以不需要路由什么的,可以直接运行。

2.2 ShardingPreparedStatement

下面我们看下分库分表的情况下,来执行我们的sql,这块就稍微有些复杂了。

2.2.1 构造方法

首先看一下他的构造方法,构造方法中有一些很奇怪的常量,比如TYPE_FORWARD_ONLY、CONCUR_READ_ONLY等等,这块可以参考这篇博客。其实就是在读写过程中的指针的方向等内容。

2.2.2 sql执行

2.2.2.1 查询

查询的逻辑大概如下:

  • sql路由
  • sql改写
  • sql执行
  • 结果合并

下面看下代码:

public ResultSet executeQuery() throws SQLException {
    ResultSet result;
    try {
        Collection<PreparedStatementUnit> preparedStatementUnits = route();
        List<ResultSet> resultSets = new PreparedStatementExecutor(
                getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
        result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge());
    } finally {
        clearBatch();
    }
    currentResultSet = result;
    return result;
}

首先我们需要看几个类:

  • PreparedStatementUnit

    • SQLExecutionUnit:包括dataSource和sql
    • PreparedStatement

这个类,会用于最终我们到具体的数据库上执行sql,调用route方法,其实就是路由到具体的服务器上面。

private Collection<PreparedStatementUnit> route() throws SQLException {
    Collection<PreparedStatementUnit> result = new LinkedList<>();
    routeResult = routingEngine.route(getParameters());
    for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
        SQLType sqlType = routeResult.getSqlStatement().getType();
        Collection<PreparedStatement> preparedStatements;
        if (SQLType.DDL == sqlType) {
            preparedStatements = generatePreparedStatementForDDL(each);
        } else {
            preparedStatements = Collections.singletonList(generatePreparedStatement(each));
        }
        routedStatements.addAll(preparedStatements);
        for (PreparedStatement preparedStatement : preparedStatements) {
            replaySetParameter(preparedStatement);
            result.add(new PreparedStatementUnit(each, preparedStatement));
        }
    }
    return result;
}

这里面有个routeResult,其实就是根据入参路由到的数据库列表。这边有一个路由引擎,看下这边是怎么路由的:

public SQLRouteResult route(final List<Object> parameters) {
    if (null == sqlStatement) {
        sqlStatement = sqlRouter.parse(logicSQL, parameters.size());
    }
    return sqlRouter.route(logicSQL, parameters, sqlStatement);
}

这边有几个参数,

  • logicSQL:表示逻辑上的sql,因为业务使用时,写的sql其实是逻辑上的sql,用到的数据库和表也是逻辑库和逻辑表
  • parameters:传入的参数
  • sqlStatement:最终的sql语句

首先解析sql,然后路由。这边的解析还是老套路,但是如果解析出来的sql是insert,而且配置了自动生成key,那么会调用自动生成key的方法,生成key,放到对应的column下。解析这块后续再分析。下面我们看下路由:

@Override
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
    SQLRouteResult result = new SQLRouteResult(sqlStatement);
    if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) {
        processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);
    }
    //路由
    RoutingResult routingResult = route(parameters, sqlStatement);
    //重写sql
    SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement);
    boolean isSingleRouting = routingResult.isSingleRouting();
    if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
        processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
    }
    SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
    if (routingResult instanceof CartesianRoutingResult) {
        for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) {
            for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) {
                result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder)));
            }
        }
    } else {
        for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
            result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));
        }
    }
    if (showSQL) {
        SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
    }
    return result;
}

先看路由:

private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
    Collection<String> tableNames = sqlStatement.getTables().getTableNames();
    RoutingEngine routingEngine;
    //不涉及到具体的表,全路由
    if (tableNames.isEmpty()) {
        routingEngine = new DatabaseAllRoutingEngine(shardingRule.getDataSourceMap());
    } else if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {
        routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
    } else {
        // TODO config for cartesian set
        routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
    }
    return routingEngine.route();
}

这里涉及到一个bindingTable的概念,概念如下:指在任何场景下分片规则均一致的主表和子表。例:订单表和订单项表,均按照订单ID分片,则此两张表互为BindingTable关系。BindingTable关系的多表关联查询不会出现笛卡尔积关联,关联查询效率将大大提升。

这里最终返回的结果RoutingResult类似如下:

这里面有几种路由类,后续再详细分析。

获取到数据库表的路由信息后,就到了sql改写的过程。这里也有一个sql改写的引擎:++SQLRewriteEngine++。中间插入了一个判断,routingResult.isSingleRouting(),判断是否路由之后的DB只有一个。这里有个小的优化:如果是select语句,用到了limit,而且最终落到了单片上,那么sql语句是不会被重写的,通过rewrite方法也可以看出来,是和isSingleRouting取反的。

public SQLBuilder rewrite(final boolean isRewriteLimit) {
    SQLBuilder result = new SQLBuilder();
    if (sqlTokens.isEmpty()) {
        result.appendLiterals(originalSQL);
        return result;
    }
    int count = 0;
    sortByBeginPosition();
    for (SQLToken each : sqlTokens) {
        if (0 == count) {
            result.appendLiterals(originalSQL.substring(0, each.getBeginPosition()));
        }
        if (each instanceof TableToken) {
            appendTableToken(result, (TableToken) each, count, sqlTokens);
        } else if (each instanceof ItemsToken) {
            appendItemsToken(result, (ItemsToken) each, count, sqlTokens);
        } else if (each instanceof RowCountToken) {
            appendLimitRowCount(result, (RowCountToken) each, count, sqlTokens, isRewriteLimit);
        } else if (each instanceof OffsetToken) {
            appendLimitOffsetToken(result, (OffsetToken) each, count, sqlTokens, isRewriteLimit);
        } else if (each instanceof OrderByToken) {
            appendOrderByToken(result, count, sqlTokens);
        }
        count++;
    }
    return result;
}

执行完成这步之后,sql语句重写基本完成。如果路由结果涉及到笛卡尔积,还需要对sql进行进一步的重写,因为涉及到关联表。下面涉及到真正转化sql这部分,使用的方法是:rewriteEngine.generateSQL,涉及到sql中的逻辑库表替换为实际的库表,形成SQLExecutionUnit,添加到待执行的sql列表中,最终得到SQLRouteResult。至此,sql路由完成。回到ShardingPreparedStatement的route方法。遍历routeResult,形成预执行的statement集合。

再回到ShardingPreparedStatement的executeQuery()方法,route()得到了Collection

【源码解析】Sharding-Jdbc的执行过程(一)的更多相关文章

  1. Flink 源码解析 —— JobManager 处理 SubmitJob 的过程

    JobManager 处理 SubmitJob https://t.zsxq.com/3JQJMzZ 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink 从0到1 ...

  2. Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程

    TaskManager 处理 SubmitJob 的过程 https://t.zsxq.com/eu7mQZj 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink ...

  3. Fabric1.4源码解析:客户端创建通道过程

    在使用Fabric创建通道的时候,通常我们执行一条命令完成,这篇文章就解析一下执行这条命令后Fabric源码中执行的流程. peer channel create -o orderer.example ...

  4. 【转】aiohttp 源码解析之 request 的处理过程

    [转自 太阳尚远的博客:http://blog.yeqianfeng.me/2016/04/01/python-yield-expression/] 使用过 python 的 aiohttp 第三方库 ...

  5. MyBatis源码解析(一)——执行流程

    原创作品,可以转载,但是请标注出处地址:http://www.cnblogs.com/V1haoge/p/6603926.html 一.MyBatis简介 MyBatis框架是一种轻量级的ORM框架, ...

  6. Hangfire源码解析-任务是如何执行的?

    一.Hangfire任务执行的流程 任务创建时: 将任务转换为Type并存储(如:HangFireWebTest.TestTask, HangFireWebTest, Version=1.0.0.0, ...

  7. AngularJS源码解析1:angular自启动过程

    angularJS加载进来后,会有一个立即执行函数调用,在源代码的最下面是angular初始化的地方.代码展示: bindJQuery(); publishExternalAPI(angular); ...

  8. 从源码解析 Spring JDBC 异常抽象

    初入学习 JDBC 操作数据库,想必大家都写过下面的代码: 数据库为:H2 如果需要处理特定 SQL 异常,比如 SQL 语句错误,这个时候我们应该怎么办? 查看 SQLException 源码,我们 ...

  9. Fabric1.4源码解析:Peer节点启动过程

    看一下Peer节点的启动过程,通常在Fabric网络中,Peer节点的启动方式有两种,通过Docker容器启动,或者是通过执行命令直接启动. 一般情况下,我们都是执行docker-compose -f ...

随机推荐

  1. Python 爬取网站资源文件

    爬虫原理: 以下来自知乎解释 首先你要明白爬虫怎样工作.想象你是一只蜘蛛,现在你被放到了互联“网”上.那么,你需要把所有的网页都看一遍.怎么办呢?没问题呀,你就随便从某个地方开始,比如说人民日报的首页 ...

  2. nginx TCP 代理&amp; windows傻瓜式安装

    一.下载nginx Windows http://nginx.org/en/download.html 二.解压到目录 三.进入目录并start nginx.exe即可启动 cd d:/java/ng ...

  3. 树莓派自身摄像头的opencv调用

    很多人知道,opencv不能直接对树莓派原装摄像头进行调用,因为raspicam不是V4L驱动,怎样才能使用树莓派原装摄像头,它可比多数usb摄像头清晰和小巧. 具体方法,给树莓派原装摄像头安装一个可 ...

  4. CSS3 box-flex属性和box-orient属性

    比较有意思的是虽然目前没有浏览器支持box-flex,box-orient属性,但CSS3问世以来,这两个属性却一直很火.2014年阿里校招第5题要求使用CSS3中的功能实现三个矩形的布局,总的宽度为 ...

  5. MyBatis(3.2.3) - Dynamic SQL

    Sometimes, static SQL queries may not be sufficient for application requirements. We may have to bui ...

  6. Storm系列(十八)事务介绍

    功能:将多个tuple组合成为一个批次,并保障每个批次的tuple被且仅被处理一次. storm事务处理中,把一个批次的tuple的处理分为两个阶段processing和commit阶段. proce ...

  7. vim的全局替换

    本文出自   http://blog.csdn.net/shuangde800 本文是在学习<使用vi编辑器, Lamb & Robbins编著>时在所记的笔记. 本文内容: 基本 ...

  8. BDIA增强

    SE24     CL_EXITHANDLER的方法GET_INSTANCE中有基本上所有的增强都会走这边,打上断点查找增强名称,或者在程序中全局搜索GET_INSTANCE关键字 然后 SE19 下 ...

  9. UI篇—UITableview

    一.基本介绍 在众多移动应⽤用中,能看到各式各样的表格数据 . 在iOS中,要实现表格数据展示,最常用的做法就是使用UITableView,UITableView继承自UIScrollView,因此支 ...

  10. Java中的会话管理——HttpServlet,Cookies,URL Rewriting(译)

    参考谷歌翻译,关键字直接使用英文,原文地址:http://www.journaldev.com/1907/java-session-management-servlet-httpsession-url ...