暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

grpc client 源码分析

grpc client 代码非常简洁,分三步

1,获取连接

2,初始化客户端

3,发送请求

    conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
    defer conn.Close()
    复制
      c := pb.NewGreeterClient(conn)
      复制
        r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
        复制

        首先看下发送请求

          type GreeterClient interface {
          // Sends a greeting
          SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)
          }
          复制
            func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
            out := new(HelloReply)
            err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
            if err != nil {
            return nil, err
            }
            return out, nil
            }
            复制

            调用了cc的Invoke方法

              type greeterClient struct {
              cc grpc.ClientConnInterface
              }
              复制

              ClientConnInterface的定义如下

                type ClientConnInterface interface {
                // Invoke performs a unary RPC and returns after the response is received
                // into reply.
                Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
                // NewStream begins a streaming RPC.
                NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
                }
                复制

                接口包含了两个方法Invoke和NewStream定义在clientconn.go中


                接着看下client初始化的代码

                  func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
                  return &greeterClient{cc}
                  }
                  复制

                  仅仅把connet interface传给了client


                  最后看下获取连接的实现

                    func Dial(target string, opts ...DialOption) (*ClientConn, error) {
                    return DialContext(context.Background(), target, opts...)
                    }
                    复制

                    clientconn.go的Dial方法返回了ClientConn指针

                      func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
                      cc := &ClientConn{
                      target: target,
                      csMgr: &connectivityStateManager{},
                      conns: make(map[*addrConn]struct{}),
                      dopts: defaultDialOptions(),
                      blockingpicker: newPickerWrapper(),
                      czData: new(channelzData),
                      firstResolveEvent: grpcsync.NewEvent(),
                      }
                      ....
                      cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
                      resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
                      ....
                      cc.balancerBuildOpts = balancer.BuildOptions{
                      DialCreds: credsClone,
                      CredsBundle: cc.dopts.copts.CredsBundle,
                      Dialer: cc.dopts.copts.Dialer,
                      CustomUserAgent: cc.dopts.copts.UserAgent,
                      ChannelzParentID: cc.channelzID,
                      Target: cc.parsedTarget,
                      }
                      .....
                      rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
                      }
                      复制

                      其中ClientConn的结构体定义如下

                        type ClientConn struct {
                        ctx context.Context
                        cancel context.CancelFunc


                        target string
                        parsedTarget resolver.Target
                        authority string
                        dopts dialOptions
                        csMgr *connectivityStateManager


                        balancerBuildOpts balancer.BuildOptions
                        blockingpicker *pickerWrapper


                        safeConfigSelector iresolver.SafeConfigSelector


                        mu sync.RWMutex
                        resolverWrapper *ccResolverWrapper
                        sc *ServiceConfig
                        conns map[*addrConn]struct{}
                        // Keepalive parameter can be updated if a GoAway is received.
                        mkp keepalive.ClientParameters
                        curBalancerName string
                        balancerWrapper *ccBalancerWrapper
                        retryThrottler atomic.Value


                        firstResolveEvent *grpcsync.Event


                        channelzID int64 channelz unique identification number
                        czData *channelzData


                        lceMu sync.Mutex protects lastConnectionError
                        lastConnectionError error
                        }
                        复制

                        可以看出Dial仅仅做了connection的初始化


                        call.go里定义了Invoke方法

                          func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
                          // allow interceptor to see all applicable call options, which means those
                          // configured as defaults from dial option as well as per-call options
                          opts = combine(cc.dopts.callOptions, opts)


                          if cc.dopts.unaryInt != nil {
                          return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
                          }
                          return invoke(ctx, method, args, reply, cc, opts...)
                          }
                          复制
                            func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
                            cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
                            if err != nil {
                            return err
                            }
                            if err := cs.SendMsg(req); err != nil {
                            return err
                            }
                            return cs.RecvMsg(reply)
                            }
                            复制

                            里面分了三步,建立连接,发送请求,获取结果

                            newClientStream 函数定义在stream.go文件里

                              func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
                              rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})
                              callHdr := &transport.CallHdr{
                              Host: cc.authority,
                              Method: method,
                              ContentSubtype: c.contentSubtype,
                              }
                              cs := &clientStream{
                              callHdr: callHdr,
                              ctx: ctx,
                              methodConfig: &mc,
                              opts: opts,
                              callInfo: c,
                              cc: cc,
                              desc: desc,
                              codec: c.codec,
                              cp: cp,
                              comp: comp,
                              cancel: cancel,
                              beginTime: beginTime,
                              firstAttempt: true,
                              onCommit: onCommit,
                              }
                              if err := cs.newAttemptLocked(sh, trInfo); err != nil {
                              cs.finish(err)
                              return nil, err
                              }
                              }
                              复制
                                func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
                                newAttempt := &csAttempt{
                                cs: cs,
                                dc: cs.cc.dopts.dc,
                                statsHandler: sh,
                                trInfo: trInfo,
                                }
                                t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
                                 }
                                复制

                                getTransport 定义在clientconn.go中

                                  func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
                                  t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
                                  Ctx: ctx,
                                  FullMethodName: method,
                                  })
                                  if err != nil {
                                  return nil, nil, toRPCErr(err)
                                  }
                                  return t, done, nil
                                  }
                                  复制

                                  pick函数定义在picker_warper.go

                                    func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
                                    for{
                                    pickResult, err := p.Pick(info)
                                    if t, ok := acw.getAddrConn().getReadyTransport(); ok {}
                                    }
                                    }
                                    复制

                                    在stream.go文件里定义了ClientStream的接口

                                      type ClientStream interface {
                                      // Header returns the header metadata received from the server if there
                                      // is any. It blocks if the metadata is not ready to read.
                                      Header() (metadata.MD, error)
                                      // Trailer returns the trailer metadata from the server, if there is any.
                                      // It must only be called after stream.CloseAndRecv has returned, or
                                      // stream.Recv has returned a non-nil error (including io.EOF).
                                      Trailer() metadata.MD
                                      // CloseSend closes the send direction of the stream. It closes the stream
                                      // when non-nil error is met. It is also not safe to call CloseSend
                                      // concurrently with SendMsg.
                                      CloseSend() error
                                      // Context returns the context for this stream.
                                      //
                                      // It should not be called until after Header or RecvMsg has returned. Once
                                      // called, subsequent client-side retries are disabled.
                                      Context() context.Context
                                      // SendMsg is generally called by generated code. On error, SendMsg aborts
                                      // the stream. If the error was generated by the client, the status is
                                      // returned directly; otherwise, io.EOF is returned and the status of
                                      // the stream may be discovered using RecvMsg.
                                      //
                                      // SendMsg blocks until:
                                      // - There is sufficient flow control to schedule m with the transport, or
                                      // - The stream is done, or
                                      // - The stream breaks.
                                      //
                                      // SendMsg does not wait until the message is received by the server. An
                                      // untimely stream closure may result in lost messages. To ensure delivery,
                                      // users should ensure the RPC completed successfully using RecvMsg.
                                      //
                                      // It is safe to have a goroutine calling SendMsg and another goroutine
                                      // calling RecvMsg on the same stream at the same time, but it is not safe
                                      // to call SendMsg on the same stream in different goroutines. It is also
                                      // not safe to call CloseSend concurrently with SendMsg.
                                      SendMsg(m interface{}) error
                                      // RecvMsg blocks until it receives a message into m or the stream is
                                      // done. It returns io.EOF when the stream completes successfully. On
                                      // any other error, the stream is aborted and the error contains the RPC
                                      // status.
                                      //
                                      // It is safe to have a goroutine calling SendMsg and another goroutine
                                      // calling RecvMsg on the same stream at the same time, but it is not
                                      // safe to call RecvMsg on the same stream in different goroutines.
                                      RecvMsg(m interface{}) error
                                      }
                                      复制

                                      clientstream实现了上述接口

                                        type clientStream struct {
                                        callHdr *transport.CallHdr
                                        opts []CallOption
                                        callInfo *callInfo
                                        cc *ClientConn
                                        desc *StreamDesc


                                        codec baseCodec
                                        cp Compressor
                                        comp encoding.Compressor


                                        cancel context.CancelFunc // cancels all attempts


                                        sentLast bool // sent an end stream
                                        beginTime time.Time


                                        methodConfig *MethodConfig


                                        ctx context.Context // the application's context, wrapped by stats/tracing


                                        retryThrottler *retryThrottler // The throttler active when the RPC began.


                                        binlog *binarylog.MethodLogger // Binary logger, can be nil.
                                        // serverHeaderBinlogged is a boolean for whether server header has been
                                        // logged. Server header will be logged when the first time one of those
                                        // happens: stream.Header(), stream.Recv().
                                        //
                                        // It's only read and used by Recv() and Header(), so it doesn't need to be
                                        // synchronized.
                                        serverHeaderBinlogged bool


                                        mu sync.Mutex
                                        firstAttempt bool // if true, transparent retry is valid
                                        numRetries int // exclusive of transparent retry attempt(s)
                                        numRetriesSincePushback int // retries since pushback; to reset backoff
                                        finished bool // TODO: replace with atomic cmpxchg or sync.Once?
                                        // attempt is the active client stream attempt.
                                        // The only place where it is written is the newAttemptLocked method and this method never writes nil.
                                        // So, attempt can be nil only inside newClientStream function when clientStream is first created.
                                        // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
                                        // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
                                        // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
                                        // place where we need to check if the attempt is nil.
                                        attempt *csAttempt
                                        // TODO(hedging): hedging will have multiple attempts simultaneously.
                                        committed bool // active attempt committed for retry?
                                        onCommit func()
                                        buffer []func(a *csAttempt) error // operations to replay on retry
                                        bufferSize int // current size of buffer
                                        }
                                        复制

                                        实现了SendMsg和RecvMsg个方法

                                          func (cs *clientStream) SendMsg(m interface{}) (err error) {
                                          hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
                                          op := func(a *csAttempt) error {
                                          err := a.sendMsg(m, hdr, payload, data)
                                          // nil out the message and uncomp when replaying; they are only needed for
                                          // stats which is disabled for subsequent attempts.
                                          m, data = nil, nil
                                          return err
                                          }
                                          }
                                          复制

                                            func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
                                            data, err = encode(codec, m)
                                            compData, err := compress(data, cp, comp)
                                              hdr, payload = msgHeader(data, compData)
                                              }
                                            复制

                                            实现了数据的编码压缩

                                            紧接着发送数据

                                              func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
                                              if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
                                              }
                                              if a.statsHandler != nil {
                                              a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
                                              }
                                              }
                                              复制

                                              最后是接受消息

                                                func (cs *clientStream) RecvMsg(m interface{}) error {
                                                err := cs.withRetry(func(a *csAttempt) error {
                                                return a.recvMsg(m, recvInfo)
                                                }, cs.commitAttemptLocked)
                                                }
                                                复制
                                                  func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
                                                  err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
                                                  a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
                                                  Client: true,
                                                  RecvTime: time.Now(),
                                                  Payload: m,
                                                  // TODO truncate large payload.
                                                  Data: payInfo.uncompressedBytes,
                                                  WireLength: payInfo.wireLength + headerLen,
                                                  Length: len(payInfo.uncompressedBytes),
                                                  })
                                                  err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
                                                  }
                                                  复制
                                                    func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
                                                    d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
                                                    if err != nil {
                                                    return err
                                                    }
                                                    if err := c.Unmarshal(d, m); err != nil {
                                                    return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
                                                    }
                                                    if payInfo != nil {
                                                    payInfo.uncompressedBytes = d
                                                    }
                                                    return nil
                                                    }
                                                    复制


                                                    文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                    评论