暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

TCP/IP Networking in Go

背井 2021-03-03
409

本文译自: https://appliedgo.net/networking/

在TCP/IP层连接2个进程初看让人害怕,但在Go中,它比预想的要简单。

入门: 通过TCP传输string数据

发送方

发送string数据需要简单的3步:

  1. 向接收方进程打开一个连接(connection)

  2. 写入string

  3. 关闭连接


net
包为此提供了一些方法。

ResolveTCPAddr()
接收一个代表着TCP地址的string对象( localhost:80127.0.0.1:80, 或者 [::1]:80, 它们都代表本地机器的80
端口),然后返回一个net.TCPAddr
对象(如果参数无法被解析为合法的地址,返回error)。

DialTCP()
接收一个net.TCPAddr
对象并连接到此地址。它返回的是一个net.TCPConn
对象(如果连接失败,返回的是一个error)。

如果不需要对Dial(即,拨号)
进行更细粒度的控制,我们可以直接使用net.Dial()
。该方法接收一个string类型的地址并返回一个普通的net.Conn
对象。这也足够作为演示用了。但是,如果你需要只有TCP连接能提供的功能,你必须使用基于TCP
的变体(DialTCP
,TCPConn
,TCPAddr
等等)。

拨号成功后,我们可以像处理任何其它的io流一样,处理这个新的连接(connection)。我们甚至可以使用bufio.ReadWriter
对其进行包装,然后方便地使用ReadWriter
中的方法,像ReadString
ReadBytes
WriteString
等。

注意,buffered Writers在写入后,需要主动地调用Flush()
, 如此,所有写入的数据才能转发到底层的网络连接。

最后,每个连接对象都有一个Close()
方法,用来结束通信。

优化

有一些调优的手段可用。举几个栗子:

Dialer
接口提供了这些选项(还有其它的):

  • DeadLine
    Timeout
    ,为不成功的拨号设定超时

  • KeepAlive
    选项,管理连接的存活时间


Conn
接口也提供了deadline设定;有的是设置连接的整体deadline(SetDeadLine
),有的则是针对特定的操作(SetReadDeadLine()
SetWriteDeadLine()
)。

注意,deadline是按挂钟时间(wallclock)计算的固定时间点,新的活动开始时,前一个deadline并不会被重置。因此每个活动都需要主动设置自己的deadline。

下面的示例代码没有使用deadline, 因为代码很简单,当问题出现时,我们很容易定位到。CTRL+C
是我们的deadline触发工具:)。

接收方

接收方需要遵守以下步骤。

  1. 监听本地的某个端口

  2. 当请求进来时,新起一个goroutine处理请求

  3. 在新起的goroutine中,读取数据。可选的,也能发送响应

  4. 关闭连接


监听需要指定一个端口。通常,负责监听的应用(也叫server,服务端)声明它要监听的端口,或者如果它提供了某个标准服务,它会使用该服务特定的端口。例如,Web服务器监听80端口来处理HTTP请求,监听443端口处理HTTPS请求。SSH后台进程默认监听22端口,而WHOIS服务器则监听43端口。

net
包负责实现服务端功能的核心部分是:

net.Listen()
在本地网络地址上创建一个监听器。如果传入的地址中只包含了端口,像:61000
,监听器将监听所有可用的网络接口(network interfaces)。这样做很方便,因为一台电脑通常有至少2个可用的接口,loopback interface和真实的网卡。

监听器的Accept()
方法一直等待着,直到一个新的连接请求进来。然后它接收该请求,将新的连接(connection)返回给调用者。Accept()
通常是写在一个循环中,以便同时处理多个连接请求。每个连接可以由一个新的goroutine来处理,我们将在下面的代码中看到。

代码

不故作摆弄,我更想展示一些有用的代码。我想向服务端发送不同的命令(command),每个命令都有自己独有的数据。服务器能够区分不同的命令,并对数据进行解码。

下面的示例代码中,客户端会发送2类命令: STRING
GOB
。命令由换行符终止。

STRING
命令包含一行string数据,使用bufio
中的简单的读写方法就能处理。

GOB
命令包含一个struct类型数据,它由多个字段组成:一个slice,一个map,甚至一个和该struct类型一样的指针。你将看到,当运行代码时,gob
负责将数据通过网络连接进行传输,没有什么难以理解的东西。

我们这里实现的是一种临时的协议(ad-hoc protocol):客户端和服务端都假定一个命令后会跟着一个换行符,后面再跟着一些数据。对于每个命令,服务器必须知道数据格式以及如何处理数据。

要实现上面说的这些,服务端代码要做2件事。

  1. Listen()
    方法收到新的连接请求时,启动一个新的goroutine来执行handleMessage()
    。该方法会从连接中读取命令名称,再从map中检索出对应的处理器方法,并执行它。

  2. 选中的处理器方法读取并处理命令后面附带的数据。



记住上图,有助于理解代码(原文的图是动态的,这里只提供一个截图,建议去原文看)。

上代码

导入包和定义全局变量(globals)

    package main


    import (
    "bufio"
    "io"
    "log"
    "net"
    "strconv"
    "strings"
    "sync"


    "github.com/pkg/errors"


    "encoding/gob"
    "flag"
    )
    复制

    包含多类型字段的struct, 用于GOB
    命令:

      type complexData struct {
      N int
      S string
      M map[string]int
      P []byte
      C *complexData
      }
      复制


      Port
      是服务端要监听的端口。

        const (
        Port = ":61000"
        )
        复制

        向外的连接(Outgoing connections)

        向外的连接很简单。net.Conn
        同时满足了io.Reader
        io.Writer
        接口,所以,我们可以把一个TCP connection视为一个Reader
        Writer

        Open
        向给定的TCP地址打开一个连接。它返回一个配备了timeout和buffered ReadWriter
         的TCP connection。

          func Open(addr string) (*bufio.ReadWriter, error) {
          log.Println("Dial " + addr)
          conn, err := net.Dial("tcp", addr)
          if err != nil {
          return nil, errors.Wrap(err, "Dialing "+addr+" failed")
          }
          return bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), nil
          }
          复制

          向内的连接(Incomming connections)

          处理向内的(流入的)数据,涉及的步骤更多。按照上面临时的协议,我们接收到的是 命令名
          +\n
          +数据
          。数据的格式取决于所接收的命令。为方便处理,我们封装一个Endpoint
          ,它有以下属性:

          • 它允许注册一个或多个处理器函数,每个函数用来处理特定的命令

          • 它能基于命令名称将收到的命令派发给相应的处理器函数


          HandleFunc
          是用来处理命令的函数。它接收的是由ReadWriter
          接口所包裹的TCP连接。

            type HandleFunc func(*bufio.ReadWriter)
            复制

            Endpoint
            是其它进程发送的数据的接收方。

              type Endpoint struct {
              listener net.Listener
              handler map[string]HandleFunc
              // map不是线程安全的,需要一个mutex来控制访问
              m sync.RWMutex
              }
              复制

              NewEndPoint
              用来创建新的endpoint。为使事情简单,endpoint将监听固定的端口号。

                func NewEndpoint() *Endpoint {
                return &Endpoint{
                handler: map[string]HandleFunc{},
                }
                }
                复制

                AddHandleFunc
                用来注册新的处理器函数。

                  func (e *Endpoint) AddHandleFunc(name string, f HandleFunc) {
                  e.m.Lock()
                  e.handler[name] = f
                  e.m.Unlock()
                  }
                  复制

                  Listen
                  将监听所有网络接口。在监听之前AddHandleFunc()
                  至少要调用一次。

                    func (e *Endpoint) Listen() error {
                    var err error
                    e.listener, err = net.Listen("tcp", Port)
                    if err != nil {
                    return errors.Wrapf(err, "Unable to listen on port %s\n", Port)
                    }
                    log.Println("Listen on", e.listener.Addr().String())
                    for {
                    log.Println("Accept a connection request.")
                    conn, err := e.listener.Accept()
                    if err != nil {
                    log.Println("Failed accepting a connection request:", err)
                    continue
                    }
                    log.Println("Handle incoming messages.")
                    go e.handleMessages(conn)
                    }
                    }
                    复制

                    handleMessages
                    读取连接(connection)的第一行。根据读取到的数据,找到合适的HandleFunc
                    并执行。

                      func (e *Endpoint) handleMessages(conn net.Conn) {
                      // 使用buffered reader包裹,以方便读取数据
                      rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
                      defer conn.Close()
                      // 解析出command,执行对应的处理器函数
                      for {
                      log.Print("Receive command '")
                      cmd, err := rw.ReadString('\n')
                      switch {
                      case err == io.EOF:
                      log.Println("Reached EOF - close this connection.\n ---")
                      return
                      case err != nil:
                      log.Println("\nError reading command. Got: '"+cmd+"'\n", err)
                      return
                      }
                      //移除行尾的换行符
                      cmd = strings.Trim(cmd, "\n ")
                      log.Println(cmd + "'")

                      // 找到command对应的处理器函数,并调用它
                      e.m.RLock()
                      handleCommand, ok := e.handler[cmd]
                      e.m.RUnlock()
                      if !ok {
                      log.Println("Command '" + cmd + "' is not registered.")
                      return
                      }
                      handleCommand(rw)
                      }
                      }
                      复制

                      现在,我们创建2个处理器函数。最容易的是接收string数据。第二个则用来接收并处理GOB
                      数据。

                      handleStrings
                      处理STRING
                      请求。

                        func handleStrings(rw *bufio.ReadWriter) {
                        log.Print("Receive STRING message:")
                        s, err := rw.ReadString('\n')
                        if err != nil {
                        log.Println("Cannot read from connection.\n", err)
                        }
                        s = strings.Trim(s, "\n ")
                        log.Println(s)
                        _, err = rw.WriteString("Thank you.\n")
                        if err != nil {
                        log.Println("Cannot write to connection.\n", err)
                        }
                        err = rw.Flush()
                        if err != nil {
                        log.Println("Flush failed.", err)
                        }
                        }
                        复制

                        handleGob
                        处理GOB
                        请求。它把接收到的GOB
                        数据解码为一个struct。

                          func handleGob(rw *bufio.ReadWriter) {
                          log.Print("Receive GOB data:")
                          var data complexData
                          // 创建一个解码器,直接解码为一个struct变量
                          dec := gob.NewDecoder(rw)
                          err := dec.Decode(&data)
                          if err != nil {
                          log.Println("Error decoding GOB data:", err)
                          return
                          }
                          // 打印complexData以及深一层的数据,证明数据确实传输过来了
                          log.Printf("Outer complexData struct: \n%#v\n", data)


                          log.Printf("Inner complexData struct: \n%#v\n", data.C)
                          }
                          复制

                          客户端和服务端函数

                          万事俱备只欠东风,我们现在来完善客户端和服务端函数。

                          客户端函数连接到服务端,发送STRING
                          GOB
                          请求。

                          服务端监听请求并收到的信息派发给相应的处理器处理。

                          传入 -connect=ip addr
                          来启动客户端。

                            func client(ip string) error {
                            // 测试数据,注意GOB能正确处理map、slice、struct等数据类型
                            testStruct := complexData{
                            N: 23,
                            S: "string data",
                            M: map[string]int{"one": 1, "two": 2, "three": 3},
                            P: []byte("abc"),
                            C: &complexData{
                            N: 256,
                            S: "Recursive structs? Piece of cake!",
                            M: map[string]int{"01": 1, "10": 2, "11": 3},
                            },
                            }
                            // 连接到服务端
                            rw, err := Open(ip + Port)
                            if err != nil {
                            return errors.Wrap(err, "Client: Failed to open connection to "+ip+Port)
                            }
                            // 发送 STRING 请求。先发送命令,再发送数据
                            log.Println("Send the string request.")
                            n, err := rw.WriteString("STRING\n")
                            if err != nil {
                            return errors.Wrap(err, "Could not send the STRING request ("+strconv.Itoa(n)+" bytes written)")
                            }
                            n, err = rw.WriteString("Additional data.\n")
                            if err != nil {
                            return errors.Wrap(err, "Could not send additional STRING data ("+strconv.Itoa(n)+" bytes written)")
                            }
                            log.Println("Flush the buffer.")
                            err = rw.Flush()
                            if err != nil {
                            return errors.Wrap(err, "Flush failed.")
                            }
                            // 读取响应数据
                            log.Println("Read the reply.")
                            response, err := rw.ReadString('\n')
                            if err != nil {
                            return errors.Wrap(err, "Client: Failed to read the reply: '"+response+"'")
                            }


                            log.Println("STRING request: got a response:", response)

                            // 发送 GOB 请求。创建一个编码器,直接编码到rw。发送 命令名 ,然后发送 GOB数据
                            log.Println("Send a struct as GOB:")
                            log.Printf("Outer complexData struct: \n%#v\n", testStruct)
                            log.Printf("Inner complexData struct: \n%#v\n", testStruct.C)
                            enc := gob.NewEncoder(rw)
                            n, err = rw.WriteString("GOB\n")
                            if err != nil {
                            return errors.Wrap(err, "Could not write GOB data ("+strconv.Itoa(n)+" bytes written)")
                            }
                            err = enc.Encode(testStruct)
                            if err != nil {
                            return errors.Wrapf(err, "Encode failed for struct: %#v", testStruct)
                            }
                            err = rw.Flush()
                            if err != nil {
                            return errors.Wrap(err, "Flush failed.")
                            }
                            return nil
                            }
                            复制

                            服务端监听请求并将请求派发给合适的处理器函数。

                              func server() error {
                              endpoint := NewEndpoint()
                              注册处理器函数
                              endpoint.AddHandleFunc("STRING", handleStrings)
                              endpoint.AddHandleFunc("GOB", handleGob)
                              开始监听
                              return endpoint.Listen()
                              }
                              复制

                              Main 主类

                              Main
                              负责启动客户端或服务端,这取决于是否设置了connect
                              参数。没设置时,启动服务端,进行监听。设置了connect
                              参数,则启动客户端,连接到给定的服务端。

                              如果是在同一台机器运行,可以使用 localhost 或则 127.0.0.1 作为地址。

                              main

                                func main() {
                                connect := flag.String("connect", "", "IP address of process to join. If empty, go into listen mode.")
                                flag.Parse()
                                // 如果connect选项不为空,进入客户端模式
                                if *connect != "" {
                                err := client(*connect)
                                if err != nil {
                                log.Println("Error:", errors.WithStack(err))
                                }
                                log.Println("Client done.")
                                return
                                }
                                // 否则进入服务端模式
                                err := server()
                                if err != nil {
                                log.Println("Error:", errors.WithStack(err))
                                }


                                log.Println("Server done.")
                                }
                                复制

                                Lshortfile
                                标识用来在日志信息中记录文件名和行号。

                                  func init() {
                                  log.SetFlags(log.Lshortfile)
                                  }
                                  复制

                                  怎么获取本文中的代码以及如何运行

                                  1. 获取代码,-d
                                    参数防止自动安装binary到$GOPATH/bin
                                    下: 

                                    go get -d github.com/appliedgo/networking
                                    复制
                                  2. 进入源码目录

                                    cd $GOPATH/src/github.com/appliedgo/networking
                                    复制
                                  3. 运行服务端

                                    go run networking.go
                                    复制
                                  4. 打开另一个终端窗口,运行客户端

                                    go run networking.go -connect localhost
                                    复制

                                  Tips

                                  如果你想进一步试验这些代码,下面是一些建议

                                  • 尝试把客户端和服务端运行在不同的机器上(在同一内网下)

                                  • complexData
                                    添加更多的不同类型的字段,看看gob是如何处理它们的

                                  • 同时启动多个客户端,看看服务端是否能正确处理

                                  Links

                                  文章已经很长了,如果你需要一篇更简单的,下面链接中的文章涉及了本文的核心部分,它只发送string数据。没有gob,没有command/data构造。

                                  • A Simple Go TCP Server and TCP Client, https://systembash.com/a-simple-go-tcp-server-and-tcp-client/

                                  更多关于gob
                                  的知识:

                                  • Gobs of data, https://blog.golang.org/gobs-of-data

                                  祝你编码愉快!


                                  勘误

                                  2017-02-09 - Map access: map不是线程安全的。因此,当一个map对象用在不同的goroutine中时,应该使用mutex来控制访问。在上面的代码中,map是在goroutine启动前更新的,所以mutex并不是必需的。然而我还是加了mutex, 这样做的好处是,即使handleMessages
                                  在goroutine中运行着,AddHandleFunc()
                                  也可以被安全地调用。


                                  文章转载自背井,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                  评论