问题来源

简单来说就是企图在一个事务中开启多个协程执行并发查询,会出现busy buffer的问题

接下来这篇文章将尝试深入探讨产生 busy buffer 的根本原因。

为什么会出现 busy buffer?

事务执行的语句的返回的 Rows 如果没有关闭,那么在事务里再次执行其他语句,会返回 busy buffer,这样也对,但有些肤浅,也不够准确。

我们所使用的 database/sql 提供的是一个数据库的抽象,具体负责连接数据库、收发包的则是 go-sql-driver/mysql。当在一个事务中执行一条 exec 语句时,客户端通过 mysqlConn 向 db 发送 CommandPacket,具体的执行流程是:tx.ExecContext -> tx.db.execDC-> ctxDriverExec -> mc.Exec(Execer) -> mc.writeCommandPacketStr(mysqlConn)。

向 mysqlConn 连接中的 buffer中写数据之前,需要预先申请空间,多出的4个字节为头部。一个 mysqlConn 连接中在同一时间只有一个 buffer 可以用,如果 buffer 被占用,就会返回 ErrBusyBuffer,实际上就是 busy buffer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 来自:go-sql-driver/mysql@1.4.0/packets.go#L419
func (mc *mysqlConn) writeCommandPacketStr(command byte, arg string) error {
 // Reset Packet Sequence
    mc.sequence = 0
    pktLen := 1 + len(arg)
    data := mc.buf.takeBuffer(pktLen + 4)
    if data == nil {
    // cannot take the buffer. Something must be wrong with the connection
        errLog.Print(ErrBusyBuffer)
        return errBadConnNoWrite
     }
    // Add command byte
    data[4] = command
    // Add arg
    copy(data[5:], arg)
     // Send CMD packet
    return mc.writePacket(data)
}
// 来自:go-sql-driver/mysql@1.4.0/buffer.go#L112
// takeBuffer returns a buffer with the requested size.
// If possible, a slice from the existing buffer is returned.
// Otherwise a bigger buffer is made.
// Only one buffer (total) can be used at a time.(一次仅仅只能有一个 buffer 被使用)
func (b *buffer) takeBuffer(length int) []byte {
    // 如果 buffer 有值,就会返回 nil (即不能分配 buffer)
    if b.length > 0 {
        return nil
    }
    // test (cheap) general case first
    if length <= defaultBufSize || length <= cap(b.buf) {
        return b.buf[:length]
    }
    if length < maxPacketSize {
        b.buf = make([]byte, length)
        return b.buf
    }
    return make([]byte, length)
}

那么 buffer 什么时候清空,然后 buffer.length 重新等于 0,可以执行新的查询呢?把 buffer 里面的内容全部读出来就可以了,也就是调用 readPacket()。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 来自:go-sql-driver/mysql@1.4.0/packets.go#L27
// Read packet to buffer ‘data’
func (mc *mysqlConn) readPacket() ([]byte, error) {
     var prevData []byte
     for {
         // read packet header
         data, err := mc.buf.readNext(4)
         //…校验
         // read packet body [pktLen bytes]
         data, err = mc.buf.readNext(pktLen)
         //…其他判断
         prevData = append(prevData, data)
     }
}
// 来自:go-sql-driver/mysql@1.4.0/buffer.go#L94
func (b *buffer) readNext(need int) ([]byte, error) {
     if b.length < need {
         // refill
        if err := b.fill(need); err != nil {
         return nil, err
     }
 }

     offset := b.idx
     b.idx += need
     b.length -= need
     return b.buf[offset:b.idx], nil
}

为什么不关闭 Rows 会出现 busy buffer

首先,这里的 Rows 是执行 query 返回的结果,当我们执行 rows, err := tx.Query(“SELECT 1”) 拿到 Rows 时,数据还在 buffer 里面,并没有读出来。

这时如果有其他协程在这个事务里执行语句,要向 db 发包,就会出现 busy buffer,因为在一个事务里,只有一个连接可以用。

前面提到,把数据从 buffer 中读出来就可以重新利用这一条 mysqlConn 发包,那么把 Rows 中的数据读出来还会出现这个问题吗?是可以的,加上一句 for rows.Next(){rows1.Scan()}就不会出现 busy buffer 了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 来自:go-sql-driver/mysql@1.4.0/rows.go
func (rows *textRows) Next(dest []driver.Value) error {
 if mc := rows.mc; mc != nil {
 if err := mc.error(); err != nil {
 return err
 }
 // Fetch next row from stream
 // 这里会调用 mc.readPacket() 将 buffer 里的数据读出来。
 return rows.readRow(dest)
 }
 return io.EOF
}

当然关闭也是可以的,我们看一下关闭 rows 后 buffer 会怎样。会由 mysqlConn 来负责处理没有读的 buffer,会调用 mc.readUntilEOF() 将 buffer 里的内容全部读出来,直到 EOF。mc.discardResults() 也会调用 mc.readUntilEOF(),而 mc.readUntilEOF() 则会调用 mc.readPacket()。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 来自:go-sql-driver/mysql@1.4.0/rows.go
func (rows *mysqlRows) Close() (err error) {
    if f := rows.finish; f != nil {
    f()
    rows.finish = nil
    }
    mc := rows.mc
    if mc == nil {
      return nil
    }
 if err := mc.error(); err != nil {
    return err
 }
 // Remove unread packets from stream
 if !rows.rs.done {
    err = mc.readUntilEOF()
 }
 if err == nil {
    if err = mc.discardResults(); err != nil {
        return err
    }
 }
rows.mc = nil
 return err
}

当然了,打开的资源是要关闭的,因此只执行 for rows.Next(){rows1.Scan()}是不够的,还要关闭 rows.

为什么不要在一个事务中执行并发的查询?

首先不要用并发的协程去操作同一个连接,实际上上面的分析也说明了,如果你这么做了,go-sql-driver/mysql 很有可能会返回给你一个 busy buffer。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 来自 src/database/sql/driver.go
// Conn is a connection to a database. It is not used concurrently
// by multiple goroutines.
//
// Conn is assumed to be stateful.
type Conn interface {
 // Prepare returns a prepared statement, bound to this connection.
 Prepare(query string) (Stmt, error)
 // Close invalidates and potentially stops any current
 // prepared statements and transactions, marking this
 // connection as no longer in use.
 //
 // Because the sql package maintains a free pool of
 // connections and only calls Close when there’s a surplus of
 // idle connections, it shouldn’t be necessary for drivers to
 // do their own connection caching.
 Close() error
 // Begin starts and returns a new transaction.
 //
 // Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
 Begin() (Tx, error)
}

一个事务只有一个连接,当调用 db.Begin() 开启一个事务时,sql.DB 会从连接池中取出一个连接或者创建一个新的连接分配给这个事务。

可以在一个事务中执行并发的更新吗?

是可以的,因为更新或者插入的操作调用的是 Exec(),db 返回的 result 在这里就直接从 buffer 中读出来了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 来自:go-sql-driver/mysql@1.4.0/connection.go
// Internal function to execute commands
func (mc *mysqlConn) exec(query string) error {
 // Send command
 if err := mc.writeCommandPacketStr(comQuery, query); err != nil {
 return mc.markBadConn(err)
 }
 // Read Result
 resLen, err := mc.readResultSetHeaderPacket()
 if err != nil {
 return err
 }
 if resLen > 0 {
 // columns
 if err := mc.readUntilEOF(); err != nil {
 return err
 }
 // rows
 if err := mc.readUntilEOF(); err != nil {
 return err
 }
 }
 return mc.discardResults()
}

转载:https://medium.com/impopper-engineering/go-concurrency-query-in-mysql-transactions-f6018c7b16b2