暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

golang源码分析:go-mysql-server(2)

        做完准备工作后就开始启动server,定义如下:包含listener、handler和sessionManager三部分

    type Server struct {
    Listener *mysql.Listener
    handler mysql.Handler
    sessionMgr *SessionManager
    }
    复制

    首先看下默认server的定义

      s, err := server.NewDefaultServer(config, engine)
      复制

      源码位于github.com/dolthub/go-mysql-server@v0.14.0/server/server.go

        func NewDefaultServer(cfg Config, e *sqle.Engine) (*Server, error) {
        return NewServer(cfg, e, DefaultSessionBuilder, nil)
        }
        复制

        核心内容就是初始化上述三个内容:

          func NewServer(cfg Config, e *sqle.Engine, sb SessionBuilder, listener ServerEventListener) (*Server, error) {
            sm := NewSessionManager(sb, tracer, e.Analyzer.Catalog.HasDB, e.MemoryManager, e.ProcessList, cfg.Address)
          handler := NewHandler(e, sm, cfg.ConnReadTimeout, cfg.DisableClientMultiStatements, listener)
          return newServerFromHandler(cfg, e, sm, handler)
          复制

          最后调用start函数启动服务,开始监听端口

            func (s *Server) Start() error {
            s.Listener.Accept()
            复制
              type Listener struct {
              // Construction parameters, set by NewListener.




              // authServer is the AuthServer object to use for authentication.
              authServer AuthServer




              // handler is the data handler.
              handler Handler




              // This is the main listener socket.
              listener net.Listener




              // Max limit for connections
              maxConns uint64




              // The following parameters are read by multiple connection go
              // routines. They are not protected by a mutex, so they
              // should be set after NewListener, and not changed while
              // Accept is running.




              // ServerVersion is the version we will advertise.
              ServerVersion string




              // TLSConfig is the server TLS config. If set, we will advertise
              // that we support SSL.
              TLSConfig *tls.Config




              // AllowClearTextWithoutTLS needs to be set for the
              // mysql_clear_password authentication method to be accepted
              // by the server when TLS is not in use.
              AllowClearTextWithoutTLS sync2.AtomicBool




              // SlowConnectWarnThreshold if non-zero specifies an amount of time
              // beyond which a warning is logged to identify the slow connection
              SlowConnectWarnThreshold sync2.AtomicDuration




              // The following parameters are changed by the Accept routine.




              // Incrementing ID for connection id.
              connectionID uint32




              // Read timeout on a given connection
              connReadTimeout time.Duration
              // Write timeout on a given connection
              connWriteTimeout time.Duration
              // connReadBufferSize is size of buffer for reads from underlying connection.
              // Reads are unbuffered if it's <=0.
              connReadBufferSize int




              // shutdown indicates that Shutdown method was called.
              shutdown sync2.AtomicBool




              // RequireSecureTransport configures the server to reject connections from insecure clients
              RequireSecureTransport bool
              }
              复制

              其中sessionManager定义位于github.com/dolthub/go-mysql-server@v0.14.0/server/context.go

                type SessionManager struct {
                addr string
                tracer trace.Tracer
                hasDBFunc func(ctx *sql.Context, name string) bool
                memory *sql.MemoryManager
                processlist sql.ProcessList
                mu *sync.Mutex
                builder SessionBuilder
                sessions map[uint32]*managedSession
                pid uint64
                }
                复制

                hanlder位于github.com/dolthub/go-mysql-server@v0.14.0/server/handler.go

                  func NewHandler(e *sqle.Engine, sm *SessionManager, rt time.Duration, disableMultiStmts bool, listener ServerEventListener) *Handler {
                  复制
                    type Handler struct {
                    e *sqle.Engine
                    sm *SessionManager
                    readTimeout time.Duration
                    disableMultiStmts bool
                    sel ServerEventListener
                    }
                    复制

                    比如处理查询请求的hanler函数,先进行sql语句解析,然后进行参数绑定,运行查询,最后将查询结果放回返回容器里。

                      func (h *Handler) ComQuery(
                      c *mysql.Conn,
                      query string,
                      callback func(*sqltypes.Result, bool) error,
                      ) error {
                      _, err := h.errorWrappedDoQuery(c, query, MultiStmtModeOff, nil, callback)
                      复制
                        func (h *Handler) errorWrappedDoQuery(
                        c *mysql.Conn,
                        query string,
                        mode MultiStmtMode,
                        bindings map[string]*query.BindVariable,
                        callback func(*sqltypes.Result, bool) error,
                        ) (string, error) {
                        remainder, err := h.doQuery(c, query, mode, bindings, callback)
                        复制
                          func (h *Handler) doQuery(
                          c *mysql.Conn,
                          query string,
                          mode MultiStmtMode,
                          bindings map[string]*query.BindVariable,
                          callback func(*sqltypes.Result, bool) error,
                          ) (string, error) {
                          if parsed == nil {
                          parsed, err = parse.Parse(ctx, query)
                          sqlBindings, err = bindingsToExprs(bindings)
                          ctx, err = ctx.ProcessList.AddProcess(ctx, query)
                          schema, rowIter, err := h.e.QueryNodeWithBindings(ctx, query, parsed, sqlBindings)
                             if ri2, ok := rowIter.(sql.RowIterTypeSelector); ok && ri2.IsNode2() {
                          rowIter2 = rowIter.(sql.RowIter2)
                          row2Chan = make(chan sql.Row2, 512)
                          } else {
                          rowChan = make(chan sql.Row, 512)
                          }
                          复制

                          其中sql语句的解析过程如下github.com/dolthub/go-mysql-server@v0.14.0/sql/parse/parse.go

                            func ParseOne(ctx *sql.Context, query string) (sql.Node, string, string, error) {
                            return parse(ctx, query, true)
                            }
                            复制
                              func parse(ctx *sql.Context, query string, multi bool) (sql.Node, string, string, error) {
                              stmt, ri, err = sqlparser.ParseOne(s)
                              复制

                              github.com/dolthub/vitess@v0.0.0-20221031111135-9aad77e7b39f/go/vt/sqlparser/ast.go

                                func ParseOne(sql string) (Statement, int, error) {
                                  tokenizer := NewStringTokenizer(sql)
                                tokenizer.stopAfterFirstStmt = true
                                tree, err := parseTokenizer(sql, tokenizer)
                                复制
                                  func parseTokenizer(sql string, tokenizer *Tokenizer) (Statement, error) {
                                  s.Start()
                                  复制

                                  启动server后,开始监听端口,在循环中不断accept请求github.com/dolthub/vitess@v0.0.0-20221031111135-9aad77e7b39f/go/mysql/server.go,当请求到来后,开始启动一个协程调用一个handler来处理请求:

                                    func (l *Listener) Accept() {
                                       for {
                                    conn, err := l.listener.Accept()
                                        connCount.Add(1)
                                    connAccept.Add(1)
                                    go l.handle(conn, connectionID, acceptTime)
                                    复制
                                      func (l *Listener) handle(conn net.Conn, connectionID uint32, acceptTime time.Time) {
                                            c := newServerConn(conn, l)
                                      c.ConnectionID = connectionID
                                      l.handler.NewConnection(c)
                                      salt, err := c.writeHandshakeV10(l.ServerVersion, l.authServer, l.TLSConfig != nil)
                                      response, err := c.readEphemeralPacketDirect()
                                      user, authMethod, authResponse, err := l.parseClientHandshakePacket(c, true, response)
                                      c.recycleReadPacket()
                                      authServerMethod, err := l.authServer.AuthMethod(user, conn.RemoteAddr().String())
                                      // Set db name.
                                      if err = l.handler.ComInitDB(c, c.schemaName); err != nil {
                                      if err := c.writeOKPacket(0, 0, c.StatusFlags, 0); err != nil {
                                      for {
                                      err := c.handleNextCommand(l.handler)
                                      复制

                                      github.com/dolthub/vitess@v0.0.0-20221031111135-9aad77e7b39f/go/mysql/conn.go,对于每一个请求(连接),中间会发送很多命令,所以需要在一个循环中,依次解析每一个命令,并处理每一个命令:

                                        func (c *Conn) handleNextCommand(handler Handler) error {
                                        data, err := c.readEphemeralPacket()
                                        switch data[0] {
                                        case ComQuit:
                                        c.recycleReadPacket()
                                        return errors.New("ComQuit")
                                        case ComInitDB:
                                        db := c.parseComInitDB(data)
                                        case ComQuery:
                                        // flush is called at the end of this block.
                                        // To simplify error handling, we do not
                                        // encapsulate it with a defer'd func()
                                        c.startWriterBuffering()


                                        queryStart := time.Now()
                                            query := c.parseComQuery(data)
                                            
                                            c.recycleReadPacket()
                                            multiStatements := !c.DisableClientMultiStatements && c.Capabilities&CapabilityClientMultiStatements != 0


                                        var err error
                                        复制

                                        比如上面提到的查询命令,命令内部,会根据解析后sql语法树,对每一个token进行对应的处理:

                                          for query, err = c.execQuery(query, handler, multiStatements); err == nil && query != ""; {
                                          query, err = c.execQuery(query, handler, multiStatements)
                                          }
                                          if err != nil {
                                          return err
                                          }




                                          timings.Record(queryTimingKey, queryStart)




                                          if err := c.flush(); err != nil {
                                          log.Errorf("Conn %v: Flush() failed: %v", c.ID(), err)
                                          return err
                                          }
                                          case ComFieldList:
                                          case ComPing:
                                          case ComSetOption:
                                          case ComPrepare:
                                          case ComStmtExecute:
                                          case ComStmtSendLongData:
                                          case ComStmtClose:
                                          case ComStmtReset:
                                          case ComStmtFetch:
                                          case ComResetConnection:
                                          复制
                                            func (c *Conn) execQuery(query string, handler Handler, multiStatements bool) (string, error) {
                                            resultsCB := func(qr *sqltypes.Result, more bool) error {
                                            return c.writeOKPacketWithInfo(qr.RowsAffected, qr.InsertID, flag, handler.WarningCount(c), qr.Info)
                                            return c.writeRows(qr)
                                            if multiStatements {
                                            remainder, err = handler.ComMultiQuery(c, query, resultsCB)
                                            } else {
                                            err = handler.ComQuery(c, query, resultsCB)
                                            }
                                            复制

                                            其中的hanler的初始化位于初始化listener的时候

                                              return &Listener{
                                              authServer: cfg.AuthServer,
                                              handler: cfg.Handler,
                                              复制
                                                type Handler interface {
                                                NewConnection is called when a connection is created.
                                                It is not established yet. The handler can decide to
                                                set StatusFlags that will be returned by the handshake methods.
                                                In particular, ServerStatusAutocommit might be set.
                                                NewConnection(c *Conn)




                                                ConnectionClosed is called when a connection is closed.
                                                ConnectionClosed(c *Conn)




                                                InitDB is called once at the beginning to set db name,
                                                / and subsequently for every ComInitDB event.
                                                ComInitDB(c *Conn, schemaName string) error




                                                // ComQuery is called when a connection receives a query.
                                                // Note the contents of the query slice may change after
                                                // the first call to callback. So the Handler should not
                                                // hang on to the byte slice.
                                                ComQuery(c *Conn, query string, callback func(res *sqltypes.Result, more bool) error) error




                                                // ComMultiQuery is called when a connection receives a query and the
                                                // client supports MULTI_STATEMENT. It should process the first
                                                // statement in |query| and return the remainder. It will be called
                                                // multiple times until the remainder is |""|.
                                                ComMultiQuery(c *Conn, query string, callback func(res *sqltypes.Result, more bool) error) (string, error)




                                                // ComPrepare is called when a connection receives a prepared
                                                // statement query.
                                                ComPrepare(c *Conn, query string) ([]*querypb.Field, error)




                                                // ComStmtExecute is called when a connection receives a statement
                                                // execute query.
                                                ComStmtExecute(c *Conn, prepare *PrepareData, callback func(*sqltypes.Result) error) error




                                                // WarningCount is called at the end of each query to obtain
                                                // the value to be returned to the client in the EOF packet.
                                                // Note that this will be called either in the context of the
                                                // ComQuery callback if the result does not contain any fields,
                                                // or after the last ComQuery call completes.
                                                WarningCount(c *Conn) uint16




                                                ComResetConnection(c *Conn)
                                                }
                                                复制

                                                总的来说,整个server就是一个tcp服务器,分为认证、连接管理、命令处理等多个部分,每个部分根据mysql协议进行具体处理。

                                                文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                评论