做完准备工作后就开始启动server,定义如下:包含listener、handler和sessionManager三部分
type Server struct {
Listener *mysql.Listener
handler mysql.Handler
sessionMgr *SessionManager
}
复制
首先看下默认server的定义
s, err := server.NewDefaultServer(config, engine)
复制
源码位于github.com/dolthub/go-mysql-server@v0.14.0/server/server.go
func NewDefaultServer(cfg Config, e *sqle.Engine) (*Server, error) {
return NewServer(cfg, e, DefaultSessionBuilder, nil)
}
复制
核心内容就是初始化上述三个内容:
func NewServer(cfg Config, e *sqle.Engine, sb SessionBuilder, listener ServerEventListener) (*Server, error) {
sm := NewSessionManager(sb, tracer, e.Analyzer.Catalog.HasDB, e.MemoryManager, e.ProcessList, cfg.Address)
handler := NewHandler(e, sm, cfg.ConnReadTimeout, cfg.DisableClientMultiStatements, listener)
return newServerFromHandler(cfg, e, sm, handler)
复制
最后调用start函数启动服务,开始监听端口
func (s *Server) Start() error {
s.Listener.Accept()
复制
type Listener struct {
// Construction parameters, set by NewListener.
// authServer is the AuthServer object to use for authentication.
authServer AuthServer
// handler is the data handler.
handler Handler
// This is the main listener socket.
listener net.Listener
// Max limit for connections
maxConns uint64
// The following parameters are read by multiple connection go
// routines. They are not protected by a mutex, so they
// should be set after NewListener, and not changed while
// Accept is running.
// ServerVersion is the version we will advertise.
ServerVersion string
// TLSConfig is the server TLS config. If set, we will advertise
// that we support SSL.
TLSConfig *tls.Config
// AllowClearTextWithoutTLS needs to be set for the
// mysql_clear_password authentication method to be accepted
// by the server when TLS is not in use.
AllowClearTextWithoutTLS sync2.AtomicBool
// SlowConnectWarnThreshold if non-zero specifies an amount of time
// beyond which a warning is logged to identify the slow connection
SlowConnectWarnThreshold sync2.AtomicDuration
// The following parameters are changed by the Accept routine.
// Incrementing ID for connection id.
connectionID uint32
// Read timeout on a given connection
connReadTimeout time.Duration
// Write timeout on a given connection
connWriteTimeout time.Duration
// connReadBufferSize is size of buffer for reads from underlying connection.
// Reads are unbuffered if it's <=0.
connReadBufferSize int
// shutdown indicates that Shutdown method was called.
shutdown sync2.AtomicBool
// RequireSecureTransport configures the server to reject connections from insecure clients
RequireSecureTransport bool
}
复制
其中sessionManager定义位于github.com/dolthub/go-mysql-server@v0.14.0/server/context.go
type SessionManager struct {
addr string
tracer trace.Tracer
hasDBFunc func(ctx *sql.Context, name string) bool
memory *sql.MemoryManager
processlist sql.ProcessList
mu *sync.Mutex
builder SessionBuilder
sessions map[uint32]*managedSession
pid uint64
}
复制
hanlder位于github.com/dolthub/go-mysql-server@v0.14.0/server/handler.go
func NewHandler(e *sqle.Engine, sm *SessionManager, rt time.Duration, disableMultiStmts bool, listener ServerEventListener) *Handler {
复制
type Handler struct {
e *sqle.Engine
sm *SessionManager
readTimeout time.Duration
disableMultiStmts bool
sel ServerEventListener
}
复制
比如处理查询请求的hanler函数,先进行sql语句解析,然后进行参数绑定,运行查询,最后将查询结果放回返回容器里。
func (h *Handler) ComQuery(
c *mysql.Conn,
query string,
callback func(*sqltypes.Result, bool) error,
) error {
_, err := h.errorWrappedDoQuery(c, query, MultiStmtModeOff, nil, callback)
复制
func (h *Handler) errorWrappedDoQuery(
c *mysql.Conn,
query string,
mode MultiStmtMode,
bindings map[string]*query.BindVariable,
callback func(*sqltypes.Result, bool) error,
) (string, error) {
remainder, err := h.doQuery(c, query, mode, bindings, callback)
复制
func (h *Handler) doQuery(
c *mysql.Conn,
query string,
mode MultiStmtMode,
bindings map[string]*query.BindVariable,
callback func(*sqltypes.Result, bool) error,
) (string, error) {
if parsed == nil {
parsed, err = parse.Parse(ctx, query)
sqlBindings, err = bindingsToExprs(bindings)
ctx, err = ctx.ProcessList.AddProcess(ctx, query)
schema, rowIter, err := h.e.QueryNodeWithBindings(ctx, query, parsed, sqlBindings)
if ri2, ok := rowIter.(sql.RowIterTypeSelector); ok && ri2.IsNode2() {
rowIter2 = rowIter.(sql.RowIter2)
row2Chan = make(chan sql.Row2, 512)
} else {
rowChan = make(chan sql.Row, 512)
}
复制
其中sql语句的解析过程如下github.com/dolthub/go-mysql-server@v0.14.0/sql/parse/parse.go
func ParseOne(ctx *sql.Context, query string) (sql.Node, string, string, error) {
return parse(ctx, query, true)
}
复制
func parse(ctx *sql.Context, query string, multi bool) (sql.Node, string, string, error) {
stmt, ri, err = sqlparser.ParseOne(s)
复制
github.com/dolthub/vitess@v0.0.0-20221031111135-9aad77e7b39f/go/vt/sqlparser/ast.go
func ParseOne(sql string) (Statement, int, error) {
tokenizer := NewStringTokenizer(sql)
tokenizer.stopAfterFirstStmt = true
tree, err := parseTokenizer(sql, tokenizer)
复制
func parseTokenizer(sql string, tokenizer *Tokenizer) (Statement, error) {
s.Start()
复制
启动server后,开始监听端口,在循环中不断accept请求github.com/dolthub/vitess@v0.0.0-20221031111135-9aad77e7b39f/go/mysql/server.go,当请求到来后,开始启动一个协程调用一个handler来处理请求:
func (l *Listener) Accept() {
for {
conn, err := l.listener.Accept()
connCount.Add(1)
connAccept.Add(1)
go l.handle(conn, connectionID, acceptTime)
复制
func (l *Listener) handle(conn net.Conn, connectionID uint32, acceptTime time.Time) {
c := newServerConn(conn, l)
c.ConnectionID = connectionID
l.handler.NewConnection(c)
salt, err := c.writeHandshakeV10(l.ServerVersion, l.authServer, l.TLSConfig != nil)
response, err := c.readEphemeralPacketDirect()
user, authMethod, authResponse, err := l.parseClientHandshakePacket(c, true, response)
c.recycleReadPacket()
authServerMethod, err := l.authServer.AuthMethod(user, conn.RemoteAddr().String())
// Set db name.
if err = l.handler.ComInitDB(c, c.schemaName); err != nil {
if err := c.writeOKPacket(0, 0, c.StatusFlags, 0); err != nil {
for {
err := c.handleNextCommand(l.handler)
复制
github.com/dolthub/vitess@v0.0.0-20221031111135-9aad77e7b39f/go/mysql/conn.go,对于每一个请求(连接),中间会发送很多命令,所以需要在一个循环中,依次解析每一个命令,并处理每一个命令:
func (c *Conn) handleNextCommand(handler Handler) error {
data, err := c.readEphemeralPacket()
switch data[0] {
case ComQuit:
c.recycleReadPacket()
return errors.New("ComQuit")
case ComInitDB:
db := c.parseComInitDB(data)
case ComQuery:
// flush is called at the end of this block.
// To simplify error handling, we do not
// encapsulate it with a defer'd func()
c.startWriterBuffering()
queryStart := time.Now()
query := c.parseComQuery(data)
c.recycleReadPacket()
multiStatements := !c.DisableClientMultiStatements && c.Capabilities&CapabilityClientMultiStatements != 0
var err error
复制
比如上面提到的查询命令,命令内部,会根据解析后sql语法树,对每一个token进行对应的处理:
for query, err = c.execQuery(query, handler, multiStatements); err == nil && query != ""; {
query, err = c.execQuery(query, handler, multiStatements)
}
if err != nil {
return err
}
timings.Record(queryTimingKey, queryStart)
if err := c.flush(); err != nil {
log.Errorf("Conn %v: Flush() failed: %v", c.ID(), err)
return err
}
case ComFieldList:
case ComPing:
case ComSetOption:
case ComPrepare:
case ComStmtExecute:
case ComStmtSendLongData:
case ComStmtClose:
case ComStmtReset:
case ComStmtFetch:
case ComResetConnection:
复制
func (c *Conn) execQuery(query string, handler Handler, multiStatements bool) (string, error) {
resultsCB := func(qr *sqltypes.Result, more bool) error {
return c.writeOKPacketWithInfo(qr.RowsAffected, qr.InsertID, flag, handler.WarningCount(c), qr.Info)
return c.writeRows(qr)
if multiStatements {
remainder, err = handler.ComMultiQuery(c, query, resultsCB)
} else {
err = handler.ComQuery(c, query, resultsCB)
}
复制
其中的hanler的初始化位于初始化listener的时候:
return &Listener{
authServer: cfg.AuthServer,
handler: cfg.Handler,
复制
type Handler interface {
NewConnection is called when a connection is created.
It is not established yet. The handler can decide to
set StatusFlags that will be returned by the handshake methods.
In particular, ServerStatusAutocommit might be set.
NewConnection(c *Conn)
ConnectionClosed is called when a connection is closed.
ConnectionClosed(c *Conn)
InitDB is called once at the beginning to set db name,
/ and subsequently for every ComInitDB event.
ComInitDB(c *Conn, schemaName string) error
// ComQuery is called when a connection receives a query.
// Note the contents of the query slice may change after
// the first call to callback. So the Handler should not
// hang on to the byte slice.
ComQuery(c *Conn, query string, callback func(res *sqltypes.Result, more bool) error) error
// ComMultiQuery is called when a connection receives a query and the
// client supports MULTI_STATEMENT. It should process the first
// statement in |query| and return the remainder. It will be called
// multiple times until the remainder is |""|.
ComMultiQuery(c *Conn, query string, callback func(res *sqltypes.Result, more bool) error) (string, error)
// ComPrepare is called when a connection receives a prepared
// statement query.
ComPrepare(c *Conn, query string) ([]*querypb.Field, error)
// ComStmtExecute is called when a connection receives a statement
// execute query.
ComStmtExecute(c *Conn, prepare *PrepareData, callback func(*sqltypes.Result) error) error
// WarningCount is called at the end of each query to obtain
// the value to be returned to the client in the EOF packet.
// Note that this will be called either in the context of the
// ComQuery callback if the result does not contain any fields,
// or after the last ComQuery call completes.
WarningCount(c *Conn) uint16
ComResetConnection(c *Conn)
}
复制
总的来说,整个server就是一个tcp服务器,分为认证、连接管理、命令处理等多个部分,每个部分根据mysql协议进行具体处理。