grpc client 代码非常简洁,分三步
1,获取连接
2,初始化客户端
3,发送请求
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())defer conn.Close()
c := pb.NewGreeterClient(conn)
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
首先看下发送请求
type GreeterClient interface {// Sends a greetingSayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error)}
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {out := new(HelloReply)err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)if err != nil {return nil, err}return out, nil}
调用了cc的Invoke方法
type greeterClient struct {cc grpc.ClientConnInterface}
ClientConnInterface的定义如下
type ClientConnInterface interface {// Invoke performs a unary RPC and returns after the response is received// into reply.Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error// NewStream begins a streaming RPC.NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)}
接口包含了两个方法Invoke和NewStream定义在clientconn.go中
接着看下client初始化的代码
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {return &greeterClient{cc}}
仅仅把connet interface传给了client
最后看下获取连接的实现
func Dial(target string, opts ...DialOption) (*ClientConn, error) {return DialContext(context.Background(), target, opts...)}
clientconn.go的Dial方法返回了ClientConn指针
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target: target,csMgr: &connectivityStateManager{},conns: make(map[*addrConn]struct{}),dopts: defaultDialOptions(),blockingpicker: newPickerWrapper(),czData: new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}....cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)....cc.balancerBuildOpts = balancer.BuildOptions{DialCreds: credsClone,CredsBundle: cc.dopts.copts.CredsBundle,Dialer: cc.dopts.copts.Dialer,CustomUserAgent: cc.dopts.copts.UserAgent,ChannelzParentID: cc.channelzID,Target: cc.parsedTarget,}.....rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)}
其中ClientConn的结构体定义如下
type ClientConn struct {ctx context.Contextcancel context.CancelFunctarget stringparsedTarget resolver.Targetauthority stringdopts dialOptionscsMgr *connectivityStateManagerbalancerBuildOpts balancer.BuildOptionsblockingpicker *pickerWrappersafeConfigSelector iresolver.SafeConfigSelectormu sync.RWMutexresolverWrapper *ccResolverWrappersc *ServiceConfigconns map[*addrConn]struct{}// Keepalive parameter can be updated if a GoAway is received.mkp keepalive.ClientParameterscurBalancerName stringbalancerWrapper *ccBalancerWrapperretryThrottler atomic.ValuefirstResolveEvent *grpcsync.EventchannelzID int64 channelz unique identification numberczData *channelzDatalceMu sync.Mutex protects lastConnectionErrorlastConnectionError error}
可以看出Dial仅仅做了connection的初始化
call.go里定义了Invoke方法
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {// allow interceptor to see all applicable call options, which means those// configured as defaults from dial option as well as per-call optionsopts = combine(cc.dopts.callOptions, opts)if cc.dopts.unaryInt != nil {return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)}return invoke(ctx, method, args, reply, cc, opts...)}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)if err != nil {return err}if err := cs.SendMsg(req); err != nil {return err}return cs.RecvMsg(reply)}
里面分了三步,建立连接,发送请求,获取结果
newClientStream 函数定义在stream.go文件里
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})callHdr := &transport.CallHdr{Host: cc.authority,Method: method,ContentSubtype: c.contentSubtype,}cs := &clientStream{callHdr: callHdr,ctx: ctx,methodConfig: &mc,opts: opts,callInfo: c,cc: cc,desc: desc,codec: c.codec,cp: cp,comp: comp,cancel: cancel,beginTime: beginTime,firstAttempt: true,onCommit: onCommit,}if err := cs.newAttemptLocked(sh, trInfo); err != nil {cs.finish(err)return nil, err}}
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {newAttempt := &csAttempt{cs: cs,dc: cs.cc.dopts.dc,statsHandler: sh,trInfo: trInfo,}t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)}
getTransport 定义在clientconn.go中
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{Ctx: ctx,FullMethodName: method,})if err != nil {return nil, nil, toRPCErr(err)}return t, done, nil}
pick函数定义在picker_warper.go
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {for{pickResult, err := p.Pick(info)if t, ok := acw.getAddrConn().getReadyTransport(); ok {}}}
在stream.go文件里定义了ClientStream的接口
type ClientStream interface {// Header returns the header metadata received from the server if there// is any. It blocks if the metadata is not ready to read.Header() (metadata.MD, error)// Trailer returns the trailer metadata from the server, if there is any.// It must only be called after stream.CloseAndRecv has returned, or// stream.Recv has returned a non-nil error (including io.EOF).Trailer() metadata.MD// CloseSend closes the send direction of the stream. It closes the stream// when non-nil error is met. It is also not safe to call CloseSend// concurrently with SendMsg.CloseSend() error// Context returns the context for this stream.//// It should not be called until after Header or RecvMsg has returned. Once// called, subsequent client-side retries are disabled.Context() context.Context// SendMsg is generally called by generated code. On error, SendMsg aborts// the stream. If the error was generated by the client, the status is// returned directly; otherwise, io.EOF is returned and the status of// the stream may be discovered using RecvMsg.//// SendMsg blocks until:// - There is sufficient flow control to schedule m with the transport, or// - The stream is done, or// - The stream breaks.//// SendMsg does not wait until the message is received by the server. An// untimely stream closure may result in lost messages. To ensure delivery,// users should ensure the RPC completed successfully using RecvMsg.//// It is safe to have a goroutine calling SendMsg and another goroutine// calling RecvMsg on the same stream at the same time, but it is not safe// to call SendMsg on the same stream in different goroutines. It is also// not safe to call CloseSend concurrently with SendMsg.SendMsg(m interface{}) error// RecvMsg blocks until it receives a message into m or the stream is// done. It returns io.EOF when the stream completes successfully. On// any other error, the stream is aborted and the error contains the RPC// status.//// It is safe to have a goroutine calling SendMsg and another goroutine// calling RecvMsg on the same stream at the same time, but it is not// safe to call RecvMsg on the same stream in different goroutines.RecvMsg(m interface{}) error}
clientstream实现了上述接口
type clientStream struct {callHdr *transport.CallHdropts []CallOptioncallInfo *callInfocc *ClientConndesc *StreamDesccodec baseCodeccp Compressorcomp encoding.Compressorcancel context.CancelFunc // cancels all attemptssentLast bool // sent an end streambeginTime time.TimemethodConfig *MethodConfigctx context.Context // the application's context, wrapped by stats/tracingretryThrottler *retryThrottler // The throttler active when the RPC began.binlog *binarylog.MethodLogger // Binary logger, can be nil.// serverHeaderBinlogged is a boolean for whether server header has been// logged. Server header will be logged when the first time one of those// happens: stream.Header(), stream.Recv().//// It's only read and used by Recv() and Header(), so it doesn't need to be// synchronized.serverHeaderBinlogged boolmu sync.MutexfirstAttempt bool // if true, transparent retry is validnumRetries int // exclusive of transparent retry attempt(s)numRetriesSincePushback int // retries since pushback; to reset backofffinished bool // TODO: replace with atomic cmpxchg or sync.Once?// attempt is the active client stream attempt.// The only place where it is written is the newAttemptLocked method and this method never writes nil.// So, attempt can be nil only inside newClientStream function when clientStream is first created.// One of the first things done after clientStream's creation, is to call newAttemptLocked which either// assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,// then newClientStream calls finish on the clientStream and returns. So, finish method is the only// place where we need to check if the attempt is nil.attempt *csAttempt// TODO(hedging): hedging will have multiple attempts simultaneously.committed bool // active attempt committed for retry?onCommit func()buffer []func(a *csAttempt) error // operations to replay on retrybufferSize int // current size of buffer}
实现了SendMsg和RecvMsg两个方法
func (cs *clientStream) SendMsg(m interface{}) (err error) {hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)op := func(a *csAttempt) error {err := a.sendMsg(m, hdr, payload, data)// nil out the message and uncomp when replaying; they are only needed for// stats which is disabled for subsequent attempts.m, data = nil, nilreturn err}}
func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {data, err = encode(codec, m)compData, err := compress(data, cp, comp)hdr, payload = msgHeader(data, compData)}
实现了数据的编码压缩
紧接着发送数据
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {}if a.statsHandler != nil {a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))}}
最后是接受消息
func (cs *clientStream) RecvMsg(m interface{}) error {err := cs.withRetry(func(a *csAttempt) error {return a.recvMsg(m, recvInfo)}, cs.commitAttemptLocked)}
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{Client: true,RecvTime: time.Now(),Payload: m,// TODO truncate large payload.Data: payInfo.uncompressedBytes,WireLength: payInfo.wireLength + headerLen,Length: len(payInfo.uncompressedBytes),})err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)}
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)if err != nil {return err}if err := c.Unmarshal(d, m); err != nil {return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)}if payInfo != nil {payInfo.uncompressedBytes = d}return nil}


文章转载自golang算法架构leetcode技术php,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




