Golang database/sql包源码学习一

  • Open返回的DB句柄是协程安全的,可以被众多协程共用,通常不需要调用DB.Close
  • sql操作很重,如果可能,应该尽量使用缓存或者异步
  • database/sql包有自动重试等功能,外部通常不需要自己实现
  • 对于普通的sql,通常不要试图使用Prepare来优化查询
  • rows.Scan之后,一定要Close


  • 避免错误操作,例如LOCK TABLE后用 INSERT会死锁,因为两个操作不是同一个连接,insert的连接没有table lock。
  • 当需要连接,且连接池中没有可用连接时,新的连接就会被创建。
  • 默认没有连接上限,你可以设置一个,但这可能会导致数据库产生错误“too many connections”
  • db.SetMaxIdleConns(N)设置最大空闲连接数
  • db.SetMaxOpenConns(N)设置最大打开连接数
  • 长时间保持空闲连接可能会导致db timeout






type DB struct {

driver driver.Driver

dsn string

// numClosed is an atomic counter which represents a total number of

// closed connections. Stmt.openStmt checks it before cleaning closed

// connections in Stmt.css.

numClosed uint64

mu sync.Mutex // protects following fields

freeConn []*driverConn

connRequests []chan connRequest

numOpen int // 已经打开的和将要打开的连接的总数

// 用来接收创建新连接的信号

// connectionOpener方法会读取该channel的信号,而当需要创建连接的时候,maybeOpenNewConnections就会往该channel发信号。

// 当调用db.Close()的时候,该channel就会被关闭

openerCh chan struct{}

closed bool

dep map[finalCloser]depSet

lastPut map[*driverConn]string // stacktrace of last conn's put; debug only

maxIdle int // zero means defaultMaxIdleConns; negative means 0

maxOpen int // <= 0 means unlimited

maxLifetime time.Duration // maximum amount of time a connection may be reused

cleanerCh chan struct{}






func Open(driverName, dataSourceName string) (*DB, error) {


driveri, ok := drivers[driverName]


if !ok {

return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)


db := &DB{

driver: driveri,

dsn: dataSourceName,

openerCh: make(chan struct{}, connectionRequestQueueSize),

lastPut: make(map[*driverConn]string),


// 这是关键一步

go db.connectionOpener()

return db, nil


// Runs in a separate goroutine, opens new connections when requested.

func (db *DB) connectionOpener() {

for range db.openerCh {




// Open one new connection

func (db *DB) openNewConnection() {

// 在发送到db.openerCh之前,maybeOpenNewConnections之前已经将numOpen++

// 如果新连接创建失败,或者已经被关闭,则在返回之前必须减1

ci, err := db.driver.Open(db.dsn)

// 创建连接的过程加了互斥锁


defer db.mu.Unlock()

if db.closed {

if err == nil {






if err != nil {


db.putConnDBLocked(nil, err)




dc := &driverConn{

db: db,

createdAt: nowFunc(),

ci: ci,


if db.putConnDBLocked(dc, err) {

db.addDepLocked(dc, dc)

} else {





// 如果有连接请求,并且连接数还没达到上限,就通知connectionOpener创建新的连接

func (db *DB) maybeOpenNewConnections() {

numRequests := len(db.connRequests)

if db.maxOpen > 0 {

numCanOpen := db.maxOpen - db.numOpen

if numRequests > numCanOpen {

numRequests = numCanOpen



for numRequests > 0 {

db.numOpen++ // optimistically


if db.closed {



db.openerCh <- struct{}{}







func (db *DB) Close() error {


if db.closed { // Make DB.Close idempotent


return nil



if db.cleanerCh != nil {



var err error

fns := make([]func() error, 0, len(db.freeConn))

for _, dc := range db.freeConn {

fns = append(fns, dc.closeDBLocked())


db.freeConn = nil

db.closed = true

for _, req := range db.connRequests {




for _, fn := range fns {

err1 := fn()

if err1 != nil {

err = err1



return err





// Query executes a query that returns rows, typically a SELECT.

// The args are for any placeholder parameters in the query.

func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {

return db.QueryContext(context.Background(), query, args...)


// QueryContext executes a query that returns rows, typically a SELECT.

// The args are for any placeholder parameters in the query.

func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {

var rows *Rows

var err error

// maxBadConnRetries是一个常量,值为2

for i := 0; i < maxBadConnRetries; i++ {

// cacheOrNewConn: 是获取连接策略常量,优先从连接池中获取一个可用的连接,如果没有可用连接,此时如果连接数已经达到上限,则等待,否则则直接创建一个新连接。

rows, err = db.query(ctx, query, args, cachedOrNewConn)

// 如果连接异常,则重试

if err != driver.ErrBadConn {




if err == driver.ErrBadConn {

// 再重试一遍,使用创建连接的策略

return db.query(ctx, query, args, alwaysNewConn)


return rows, err


func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {

ci, err := db.conn(ctx, strategy)

if err != nil {

return nil, err


// 执行查询

return db.queryConn(ctx, ci, ci.releaseConn, query, args)


// connRequest represents one request for a new connection

// When there are no idle connections available, DB.conn will create

// a new connRequest and put it on the db.connRequests list.

type connRequest struct {

conn *driverConn

err error


var errDBClosed = errors.New("sql: database is closed")

// conn 获取一个缓存中的或者新打开的连接

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {


if db.closed {


// 如果数据库已经关闭,会直接返回错误,而且外部不会对这个错误做处理

return nil, errDBClosed


// Check if the context is expired.

if err := ctx.Err(); err != nil {


return nil, err


lifetime := db.maxLifetime

// 如果可能,返回一个空闲的连接

numFree := len(db.freeConn)

if strategy == cachedOrNewConn && numFree > 0 {

// 取第一个空闲连接,并生成新的空闲连接列表

// 问题是:如果开始将连接全部放到一个数组里,然后只是标注那些已经使用,这样效率会不会更好

conn := db.freeConn[0]

copy(db.freeConn, db.freeConn[1:])

db.freeConn = db.freeConn[:numFree-1]

conn.inUse = true


if conn.expired(lifetime) {

// 如果超时了,则关闭该连接,返回该错误时,外部会重试


return nil, driver.ErrBadConn


return conn, nil


// Out of free connections or we were asked not to use one. If we're not

// allowed to open any more connections, make a request and wait.

if db.maxOpen > 0 && db.numOpen >= db.maxOpen {

// Make the connRequest channel. It's buffered so that the

// connectionOpener doesn't block while waiting for the req to be read.

req := make(chan connRequest, 1)

db.connRequests = append(db.connRequests, req)


// Timeout the connection request with the context.

select {

case <-ctx.Done():

return nil, ctx.Err()

case ret, ok := <-req:

if !ok {

return nil, errDBClosed


if ret.err == nil && ret.conn.expired(lifetime) {


return nil, driver.ErrBadConn


return ret.conn, ret.err



db.numOpen++ // optimistically


ci, err := db.driver.Open(db.dsn)

if err != nil {


db.numOpen-- // correct for earlier optimism



return nil, err



dc := &driverConn{

db: db,

createdAt: nowFunc(),

ci: ci,


db.addDepLocked(dc, dc)

dc.inUse = true


return dc, nil


// queryConn 执行查询

// The connection gets released by the releaseConn function.

func (db *DB) queryConn(ctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {

if queryer, ok := dc.ci.(driver.Queryer); ok {

dargs, err := driverArgs(nil, args)

if err != nil {


return nil, err


var rowsi driver.Rows

withLock(dc, func() {

rowsi, err = ctxDriverQuery(ctx, queryer, query, dargs)


if err != driver.ErrSkip {

if err != nil {


return nil, err


// Note: ownership of dc passes to the *Rows, to be freed

// with releaseConn.

rows := &Rows{

dc: dc,

releaseConn: releaseConn,

rowsi: rowsi,



return rows, nil



var si driver.Stmt

var err error

withLock(dc, func() {

si, err = ctxDriverPrepare(ctx, dc.ci, query)


if err != nil {


return nil, err


ds := driverStmt{dc, si}

rowsi, err := rowsiFromStatement(ctx, ds, args...)

if err != nil {

withLock(dc, func() {




return nil, err


// Note: ownership of ci passes to the *Rows, to be freed

// with releaseConn.

rows := &Rows{

dc: dc,

releaseConn: releaseConn,

rowsi: rowsi,

closeStmt: si, // close的时候,这个也需要关闭



return rows, nil






func (rs *Rows) Next() bool {

if rs.isClosed() {

return false


if rs.lastcols == nil {

rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))


rs.lasterr = rs.rowsi.Next(rs.lastcols)

if rs.lasterr != nil {

// Close the connection if there is a driver error.

if rs.lasterr != io.EOF {


return false


nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)

if !ok {


return false


// The driver is at the end of the current result set.

// Test to see if there is another result set after the current one.

// Only close Rows if there is no futher result sets to read.

if !nextResultSet.HasNextResultSet() {



return false


return true




// Scan copies the columns in the current row into the values pointed

// at by dest. The number of values in dest must be the same as the

// number of columns in Rows.


// Scan 会将数据库的字段转化为Go能识别的字段:


// *string

// *[]byte

// *int, *int8, *int16, *int32, *int64

// *uint, *uint8, *uint16, *uint32, *uint64

// *bool

// *float32, *float64

// *interface{}

// *RawBytes

// any type implementing Scanner (see Scanner docs)


// In the most simple case, if the type of the value from the source

// column is an integer, bool or string type T and dest is of type *T,

// Scan simply assigns the value through the pointer.


// Scan 也能在字符串类型和数值类型之间转换,只要转换的过程中没有信息丢失。

// 例如一个float64类型的300或者string类型的300,都能自动转换为uint16,但是不能转为uint8,因为已经超过其最大值

// One exception is that scans of some float64 numbers to

// strings may lose information when stringifying. In general, scan

// floating point columns into *float64.


// If a dest argument has type *[]byte, Scan saves in that argument a

// copy of the corresponding data. The copy is owned by the caller and

// can be modified and held indefinitely. The copy can be avoided by

// using an argument of type *RawBytes instead; see the documentation

// for RawBytes for restrictions on its use.


// If an argument has type *interface{}, Scan copies the value

// provided by the underlying driver without conversion. When scanning

// from a source value of type []byte to *interface{}, a copy of the

// slice is made and the caller owns the result.


// Source values of type time.Time may be scanned into values of type

// *time.Time, *interface{}, *string, or *[]byte. When converting to

// the latter two, time.Format3339Nano is used.


// Source values of type bool may be scanned into types *bool,

// *interface{}, *string, *[]byte, or *RawBytes.


// For scanning into *bool, the source may be true, false, 1, 0, or

// string inputs parseable by strconv.ParseBool.

func (rs *Rows) Scan(dest ...interface{}) error {

if rs.isClosed() {

return errors.New("sql: Rows are closed")


if rs.lastcols == nil {

return errors.New("sql: Scan called without calling Next")


if len(dest) != len(rs.lastcols) {

return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest))


for i, sv := range rs.lastcols {

err := convertAssign(dest[i], sv)

if err != nil {

return fmt.Errorf("sql: Scan error on column index %d: %v", i, err)



return nil




var rowsCloseHook func(*Rows, *error)

// Close closes the Rows, preventing further enumeration. If Next is called

// and returns false and there are no further result sets,

// the Rows are closed automatically and it will suffice to check the

// result of Err. Close is idempotent and does not affect the result of Err.

func (rs *Rows) Close() error {

if !atomic.CompareAndSwapInt32(&rs.closed, 0, 1) {

return nil


if rs.ctxClose != nil {



err := rs.rowsi.Close()

if fn := rowsCloseHook; fn != nil {

fn(rs, &err)


if rs.closeStmt != nil {




return err




在数据库层面,Prepared Statements是和单个数据库连接绑定的。客户端发送一个有占位符的statement到服务端,服务器返回一个statement ID,然后客户端发送ID和参数来执行statement。

当你生成一个Prepared Statement

  • 自动在连接池中绑定到一个空闲连接
  • Stmt对象记住绑定了哪个连接
  • 执行Stmt时,尝试使用该连接。如果不可用,例如连接被关闭或繁忙中,会自动re-prepare,绑定到另一个连接。


某些操作使用了PS,例如db.Query(sql, param1, param2), 并在最后自动关闭statement。

// Prepare creates a prepared statement for later queries or executions.

// Multiple queries or executions may be run concurrently from the

// returned statement.

// The caller must call the statement's Close method

// when the statement is no longer needed.

func (db *DB) Prepare(query string) (*Stmt, error) {

return db.PrepareContext(context.Background(), query)


// PrepareContext creates a prepared statement for later queries or executions.

// Multiple queries or executions may be run concurrently from the

// returned statement.

// The caller must call the statement's Close method

// when the statement is no longer needed.


// The provided context is used for the preparation of the statement, not for the

// execution of the statement.

func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) {

var stmt *Stmt

var err error

for i := 0; i < maxBadConnRetries; i++ {

stmt, err = db.prepare(ctx, query, cachedOrNewConn)

if err != driver.ErrBadConn {




if err == driver.ErrBadConn {

return db.prepare(ctx, query, alwaysNewConn)


return stmt, err


func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {

// TODO: check if db.driver supports an optional

// driver.Preparer interface and call that instead, if so,

// otherwise we make a prepared statement that's bound

// to a connection, and to execute this prepared statement

// we either need to use this connection (if it's free), else

// get a new connection + re-prepare + execute on that one.

dc, err := db.conn(ctx, strategy)

if err != nil {

return nil, err


var si driver.Stmt

withLock(dc, func() {

si, err = dc.prepareLocked(ctx, query)


if err != nil {

db.putConn(dc, err)

return nil, err


stmt := &Stmt{

db: db,

query: query,

css: []connStmt{{dc, si}},

lastNumClosed: atomic.LoadUint64(&db.numClosed),


db.addDep(stmt, stmt)

db.putConn(dc, nil)

return stmt, nil






func (r *Row) Scan(dest ...interface{}) error {

if r.err != nil {

return r.err


// TODO(bradfitz): for now we need to defensively clone all

// []byte that the driver returned (not permitting

// *RawBytes in Rows.Scan), since we're about to close

// the Rows in our defer, when we return from this function.

// the contract with the driver.Next(...) interface is that it

// can return slices into read-only temporary memory that's

// only valid until the next Scan/Close. But the TODO is that

// for a lot of drivers, this copy will be unnecessary. We

// should provide an optional interface for drivers to

// implement to say, "don't worry, the []bytes that I return

// from Next will not be modified again." (for instance, if

// they were obtained from the network anyway) But for now we

// don't care.

defer r.rows.Close()

for _, dp := range dest {

if _, ok := dp.(*RawBytes); ok {

return errors.New("sql: RawBytes isn't allowed on Row.Scan")



if !r.rows.Next() {

if err := r.rows.Err(); err != nil {

return err


return ErrNoRows


err := r.rows.Scan(dest...)

if err != nil {

return err


// Make sure the query can be processed to completion with no errors.

if err := r.rows.Close(); err != nil {

return err


return nil
