Hikari数据源介绍

jimmy 2018年09月23日 5,375次浏览

介绍

官网地址: https://github.com/brettwooldridge/HikariCP

快速,简单,可靠的数据源,spring boot2.0已经将HikariCP做为了默认的数据源链接池,在官网测试中秒杀一切其他数据源,比如commons-dbcp,tomcat,c3po,druid。

请输入图片描述

基本设计

Hikari链接池采用了很多优化来提高并发数,可参考这里
所有数据库链接池都遵守基本的设计规则,实现 javax.sql.DataSource 接口,里面最重要的方法就是 Connection getConnection() throws SQLException; 用于获取一个Connection, 一个Connection就是一个数据库链接,就是一个TCP链接,建立TCP链接是需要进行3次握手的,这降低来链接的使用效率,也是各种数据库链接池存在的原因。

数据库链接池通过事先建立好Connection并缓存起来,这样应用需要做数据查询的时候,直接从缓存中拿到Connection就可以使用来。数据库链接池还能够检测异常的链接,释放闲置的链接。

HikariDataSource

Hikari中提供的DataSource是HikariDataSource ,HikariDataSource 实现了 HikariConfig,和数据库的各种参数超时时间配置就正HikariaConfig中。

其中提供两种初始化方式,一种是默认的构造函数,单new一个HikariDataSource时,数据源的链接不会建立,需要等到第一次调用HikariDataSource的getConnection方法。数据源建立后的相关信息保存在 HikariDataSource 中变量HikariPool pool。

另一种建立方式是调用带有HikariConfig的构造函数,这种方式适合多个数据源的建立,共享同一份配置。 这种方式在调用构造函数的时候就建立了数据源的链接。

HikariDataSource的所有数据源获取都委托给了HikariPool,一个数据源会有一个HikariPool,一个HikariPool中有一个ConcurrentBag,一个ConcurrentBag中多个PoolEntry,一个PoolEntry对应一个Connection。

HikariPool

HikariPool中的基本field介绍。

PoolEntryCreator POOL_ENTRY_CREATOR:用于创建PoolEntry,也就是用于创建Connection了,创建Connection是委托给驱动程序的。

Collection addConnectionQueue :就是一个LinkedBlockingQueue,不过不能修改其中的内容。当正在创建底层Connection的时候这个Queue会有值。用户后续判断当前线程池里面还是否需要创建新的链接。

ThreadPoolExecutor addConnectionExecutor:创建Connection链接的执行是有这个线程池调度的。使用新的链接池不会而不使用当前的工作线程,为了不影响工作线程的执行(比如会导致工作线程超时)。

ThreadPoolExecutor closeConnectionExecutor:关闭Connection的链接是有这个线程池调度的。

ConcurrentBag connectionBag:这个是最重要的,每次获取Connection都是从这里面获取,采用了ThreadLocal来减少竞争。

ProxyLeakTaskFactory leakTaskFactory;参考ProxyLeakTask用于检测Connection泄漏。

ScheduledFuture<?> houseKeeperTask:用于管理链接池里面的链接,比如链接不够用了要创建链接,链接最大生存时间到了要关闭链接。线程池中的Connection就是有它初始化的。

ConcurrentBag

ConcurrentBag设计主要为了解决并发对资源的争用,其中主要有CopyOnWriteArrayList sharedList和ThreadLocal threadList,SynchronousQueue handoffQueue,当一个线程获取链接的时候首先从自己的ThreadLocal中的threadList获取,当获取失败的时候才从sharedList获取,当从sharedList获取还是失败的话,就等待在 handoffQueue,这是一个同步的Queue,当其他线程释放链接的时候,自己就会被唤醒。

public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
  {
     // 从threadList获取链接
     final List<Object> list = threadList.get();
     for (int i = list.size() - 1; i >= 0; i--) {
        final Object entry = list.remove(i);
        final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
        if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
           return bagEntry;
        }
     }

     // 从sharedList获取链接
     final int waiting = waiters.incrementAndGet();
     try {
        for (T bagEntry : sharedList) {
           if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
              // If we may have stolen another waiter\'s connection, request another bag add.
              if (waiting > 1) {
                 listener.addBagItem(waiting - 1);
              }
              return bagEntry;
           }
        }
        listener.addBagItem(waiting);
        timeout = timeUnit.toNanos(timeout);
        do {
           final long start = currentTime();
           //等待其他线程释放链接,超过timeout就获取失败了
           final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
           if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
              return bagEntry;
           }

           timeout -= elapsedNanos(start);
        } while (timeout > 10_000);

        return null;
     }
     finally {
        waiters.decrementAndGet();
     }
  }