本文译自: https://appliedgo.net/networking/
在TCP/IP层连接2个进程初看让人害怕,但在Go中,它比预想的要简单。
入门: 通过TCP传输string数据
发送方
发送string数据需要简单的3步:
向接收方进程打开一个连接(connection)
写入string
关闭连接
net
包为此提供了一些方法。
ResolveTCPAddr()
接收一个代表着TCP地址的string对象( localhost:80, 127.0.0.1:80, 或者 [::1]:80, 它们都代表本地机器的80
端口),然后返回一个net.TCPAddr
对象(如果参数无法被解析为合法的地址,返回error)。
DialTCP()
接收一个net.TCPAddr
对象并连接到此地址。它返回的是一个net.TCPConn
对象(如果连接失败,返回的是一个error)。
如果不需要对Dial(即,拨号)
进行更细粒度的控制,我们可以直接使用net.Dial()
。该方法接收一个string类型的地址并返回一个普通的net.Conn
对象。这也足够作为演示用了。但是,如果你需要只有TCP连接能提供的功能,你必须使用基于TCP
的变体(DialTCP
,TCPConn
,TCPAddr
等等)。
拨号成功后,我们可以像处理任何其它的io流一样,处理这个新的连接(connection)。我们甚至可以使用bufio.ReadWriter
对其进行包装,然后方便地使用ReadWriter
中的方法,像ReadString
、ReadBytes
、WriteString
等。
注意,buffered Writers在写入后,需要主动地调用Flush()
, 如此,所有写入的数据才能转发到底层的网络连接。
最后,每个连接对象都有一个Close()
方法,用来结束通信。
优化
有一些调优的手段可用。举几个栗子:
Dialer
接口提供了这些选项(还有其它的):
DeadLine
和Timeout
,为不成功的拨号设定超时KeepAlive
选项,管理连接的存活时间
Conn
接口也提供了deadline设定;有的是设置连接的整体deadline(SetDeadLine
),有的则是针对特定的操作(SetReadDeadLine()
和SetWriteDeadLine()
)。
注意,deadline是按挂钟时间(wallclock)计算的固定时间点,新的活动开始时,前一个deadline并不会被重置。因此每个活动都需要主动设置自己的deadline。
下面的示例代码没有使用deadline, 因为代码很简单,当问题出现时,我们很容易定位到。CTRL+C
是我们的deadline触发工具:)。
接收方
接收方需要遵守以下步骤。
监听本地的某个端口
当请求进来时,新起一个goroutine处理请求
在新起的goroutine中,读取数据。可选的,也能发送响应
关闭连接
监听需要指定一个端口。通常,负责监听的应用(也叫server,服务端)声明它要监听的端口,或者如果它提供了某个标准服务,它会使用该服务特定的端口。例如,Web服务器监听80端口来处理HTTP请求,监听443端口处理HTTPS请求。SSH后台进程默认监听22端口,而WHOIS服务器则监听43端口。
net
包负责实现服务端功能的核心部分是:
net.Listen()
在本地网络地址上创建一个监听器。如果传入的地址中只包含了端口,像:61000
,监听器将监听所有可用的网络接口(network interfaces)。这样做很方便,因为一台电脑通常有至少2个可用的接口,loopback interface和真实的网卡。
监听器的Accept()
方法一直等待着,直到一个新的连接请求进来。然后它接收该请求,将新的连接(connection)返回给调用者。Accept()
通常是写在一个循环中,以便同时处理多个连接请求。每个连接可以由一个新的goroutine来处理,我们将在下面的代码中看到。
代码
不故作摆弄,我更想展示一些有用的代码。我想向服务端发送不同的命令(command),每个命令都有自己独有的数据。服务器能够区分不同的命令,并对数据进行解码。
下面的示例代码中,客户端会发送2类命令: STRING
和GOB
。命令由换行符终止。
STRING
命令包含一行string数据,使用bufio
中的简单的读写方法就能处理。
GOB
命令包含一个struct类型数据,它由多个字段组成:一个slice,一个map,甚至一个和该struct类型一样的指针。你将看到,当运行代码时,gob
负责将数据通过网络连接进行传输,没有什么难以理解的东西。
我们这里实现的是一种临时的协议(ad-hoc protocol):客户端和服务端都假定一个命令后会跟着一个换行符,后面再跟着一些数据。对于每个命令,服务器必须知道数据格式以及如何处理数据。
要实现上面说的这些,服务端代码要做2件事。
当
Listen()
方法收到新的连接请求时,启动一个新的goroutine来执行handleMessage()
。该方法会从连接中读取命令名称,再从map中检索出对应的处理器方法,并执行它。选中的处理器方法读取并处理命令后面附带的数据。
记住上图,有助于理解代码(原文的图是动态的,这里只提供一个截图,建议去原文看)。
上代码
导入包和定义全局变量(globals)
package main
import (
"bufio"
"io"
"log"
"net"
"strconv"
"strings"
"sync"
"github.com/pkg/errors"
"encoding/gob"
"flag"
)
复制
包含多类型字段的struct, 用于GOB
命令:
type complexData struct {
N int
S string
M map[string]int
P []byte
C *complexData
}
复制
Port
是服务端要监听的端口。
const (
Port = ":61000"
)
复制
向外的连接(Outgoing connections)
向外的连接很简单。net.Conn
同时满足了io.Reader
和io.Writer
接口,所以,我们可以把一个TCP connection视为一个Reader
或Writer
。
Open
向给定的TCP地址打开一个连接。它返回一个配备了timeout和buffered ReadWriter
的TCP connection。
func Open(addr string) (*bufio.ReadWriter, error) {
log.Println("Dial " + addr)
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, errors.Wrap(err, "Dialing "+addr+" failed")
}
return bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), nil
}
复制
向内的连接(Incomming connections)
处理向内的(流入的)数据,涉及的步骤更多。按照上面临时的协议,我们接收到的是 命令名
+\n
+数据
。数据的格式取决于所接收的命令。为方便处理,我们封装一个Endpoint
,它有以下属性:
它允许注册一个或多个处理器函数,每个函数用来处理特定的命令
它能基于命令名称将收到的命令派发给相应的处理器函数
HandleFunc
是用来处理命令的函数。它接收的是由ReadWriter
接口所包裹的TCP连接。
type HandleFunc func(*bufio.ReadWriter)
复制
Endpoint
是其它进程发送的数据的接收方。
type Endpoint struct {
listener net.Listener
handler map[string]HandleFunc
// map不是线程安全的,需要一个mutex来控制访问
m sync.RWMutex
}
复制
NewEndPoint
用来创建新的endpoint。为使事情简单,endpoint将监听固定的端口号。
func NewEndpoint() *Endpoint {
return &Endpoint{
handler: map[string]HandleFunc{},
}
}
复制
AddHandleFunc
用来注册新的处理器函数。
func (e *Endpoint) AddHandleFunc(name string, f HandleFunc) {
e.m.Lock()
e.handler[name] = f
e.m.Unlock()
}
复制
Listen
将监听所有网络接口。在监听之前AddHandleFunc()
至少要调用一次。
func (e *Endpoint) Listen() error {
var err error
e.listener, err = net.Listen("tcp", Port)
if err != nil {
return errors.Wrapf(err, "Unable to listen on port %s\n", Port)
}
log.Println("Listen on", e.listener.Addr().String())
for {
log.Println("Accept a connection request.")
conn, err := e.listener.Accept()
if err != nil {
log.Println("Failed accepting a connection request:", err)
continue
}
log.Println("Handle incoming messages.")
go e.handleMessages(conn)
}
}
复制
handleMessages
读取连接(connection)的第一行。根据读取到的数据,找到合适的HandleFunc
并执行。
func (e *Endpoint) handleMessages(conn net.Conn) {
// 使用buffered reader包裹,以方便读取数据
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
defer conn.Close()
// 解析出command,执行对应的处理器函数
for {
log.Print("Receive command '")
cmd, err := rw.ReadString('\n')
switch {
case err == io.EOF:
log.Println("Reached EOF - close this connection.\n ---")
return
case err != nil:
log.Println("\nError reading command. Got: '"+cmd+"'\n", err)
return
}
//移除行尾的换行符
cmd = strings.Trim(cmd, "\n ")
log.Println(cmd + "'")
// 找到command对应的处理器函数,并调用它
e.m.RLock()
handleCommand, ok := e.handler[cmd]
e.m.RUnlock()
if !ok {
log.Println("Command '" + cmd + "' is not registered.")
return
}
handleCommand(rw)
}
}
复制
现在,我们创建2个处理器函数。最容易的是接收string数据。第二个则用来接收并处理GOB
数据。
handleStrings
处理STRING
请求。
func handleStrings(rw *bufio.ReadWriter) {
log.Print("Receive STRING message:")
s, err := rw.ReadString('\n')
if err != nil {
log.Println("Cannot read from connection.\n", err)
}
s = strings.Trim(s, "\n ")
log.Println(s)
_, err = rw.WriteString("Thank you.\n")
if err != nil {
log.Println("Cannot write to connection.\n", err)
}
err = rw.Flush()
if err != nil {
log.Println("Flush failed.", err)
}
}
复制
handleGob
处理GOB
请求。它把接收到的GOB
数据解码为一个struct。
func handleGob(rw *bufio.ReadWriter) {
log.Print("Receive GOB data:")
var data complexData
// 创建一个解码器,直接解码为一个struct变量
dec := gob.NewDecoder(rw)
err := dec.Decode(&data)
if err != nil {
log.Println("Error decoding GOB data:", err)
return
}
// 打印complexData以及深一层的数据,证明数据确实传输过来了
log.Printf("Outer complexData struct: \n%#v\n", data)
log.Printf("Inner complexData struct: \n%#v\n", data.C)
}
复制
客户端和服务端函数
万事俱备只欠东风,我们现在来完善客户端和服务端函数。
客户端函数连接到服务端,发送STRING
和GOB
请求。
服务端监听请求并收到的信息派发给相应的处理器处理。
传入 -connect=ip addr
来启动客户端。
func client(ip string) error {
// 测试数据,注意GOB能正确处理map、slice、struct等数据类型
testStruct := complexData{
N: 23,
S: "string data",
M: map[string]int{"one": 1, "two": 2, "three": 3},
P: []byte("abc"),
C: &complexData{
N: 256,
S: "Recursive structs? Piece of cake!",
M: map[string]int{"01": 1, "10": 2, "11": 3},
},
}
// 连接到服务端
rw, err := Open(ip + Port)
if err != nil {
return errors.Wrap(err, "Client: Failed to open connection to "+ip+Port)
}
// 发送 STRING 请求。先发送命令,再发送数据
log.Println("Send the string request.")
n, err := rw.WriteString("STRING\n")
if err != nil {
return errors.Wrap(err, "Could not send the STRING request ("+strconv.Itoa(n)+" bytes written)")
}
n, err = rw.WriteString("Additional data.\n")
if err != nil {
return errors.Wrap(err, "Could not send additional STRING data ("+strconv.Itoa(n)+" bytes written)")
}
log.Println("Flush the buffer.")
err = rw.Flush()
if err != nil {
return errors.Wrap(err, "Flush failed.")
}
// 读取响应数据
log.Println("Read the reply.")
response, err := rw.ReadString('\n')
if err != nil {
return errors.Wrap(err, "Client: Failed to read the reply: '"+response+"'")
}
log.Println("STRING request: got a response:", response)
// 发送 GOB 请求。创建一个编码器,直接编码到rw。发送 命令名 ,然后发送 GOB数据
log.Println("Send a struct as GOB:")
log.Printf("Outer complexData struct: \n%#v\n", testStruct)
log.Printf("Inner complexData struct: \n%#v\n", testStruct.C)
enc := gob.NewEncoder(rw)
n, err = rw.WriteString("GOB\n")
if err != nil {
return errors.Wrap(err, "Could not write GOB data ("+strconv.Itoa(n)+" bytes written)")
}
err = enc.Encode(testStruct)
if err != nil {
return errors.Wrapf(err, "Encode failed for struct: %#v", testStruct)
}
err = rw.Flush()
if err != nil {
return errors.Wrap(err, "Flush failed.")
}
return nil
}
复制
服务端监听请求并将请求派发给合适的处理器函数。
func server() error {
endpoint := NewEndpoint()
注册处理器函数
endpoint.AddHandleFunc("STRING", handleStrings)
endpoint.AddHandleFunc("GOB", handleGob)
开始监听
return endpoint.Listen()
}
复制
Main 主类
Main
负责启动客户端或服务端,这取决于是否设置了connect
参数。没设置时,启动服务端,进行监听。设置了connect
参数,则启动客户端,连接到给定的服务端。
如果是在同一台机器运行,可以使用 localhost 或则 127.0.0.1 作为地址。
main
func main() {
connect := flag.String("connect", "", "IP address of process to join. If empty, go into listen mode.")
flag.Parse()
// 如果connect选项不为空,进入客户端模式
if *connect != "" {
err := client(*connect)
if err != nil {
log.Println("Error:", errors.WithStack(err))
}
log.Println("Client done.")
return
}
// 否则进入服务端模式
err := server()
if err != nil {
log.Println("Error:", errors.WithStack(err))
}
log.Println("Server done.")
}
复制
Lshortfile
标识用来在日志信息中记录文件名和行号。
func init() {
log.SetFlags(log.Lshortfile)
}
复制
怎么获取本文中的代码以及如何运行
获取代码,
-d
参数防止自动安装binary到$GOPATH/bin
下:go get -d github.com/appliedgo/networking
复制进入源码目录
cd $GOPATH/src/github.com/appliedgo/networking
复制运行服务端
go run networking.go
复制打开另一个终端窗口,运行客户端
go run networking.go -connect localhost
复制
Tips
如果你想进一步试验这些代码,下面是一些建议
尝试把客户端和服务端运行在不同的机器上(在同一内网下)
给
complexData
添加更多的不同类型的字段,看看gob是如何处理它们的同时启动多个客户端,看看服务端是否能正确处理
Links
文章已经很长了,如果你需要一篇更简单的,下面链接中的文章涉及了本文的核心部分,它只发送string数据。没有gob,没有command/data构造。
A Simple Go TCP Server and TCP Client, https://systembash.com/a-simple-go-tcp-server-and-tcp-client/
更多关于gob
的知识:
Gobs of data, https://blog.golang.org/gobs-of-data
祝你编码愉快!
勘误
2017-02-09 - Map access: map不是线程安全的。因此,当一个map对象用在不同的goroutine中时,应该使用mutex来控制访问。在上面的代码中,map是在goroutine启动前更新的,所以mutex并不是必需的。然而我还是加了mutex, 这样做的好处是,即使handleMessages
在goroutine中运行着,AddHandleFunc()
也可以被安全地调用。