作者:禅与计算机程序设计艺术
1.简介
1.1.什么是Socket?
Socket又称"套接字",应用程序通常通过"套接字"向网络中某台计算机请求服务或等待网络中的数据。简单的说,一个进程(应用层)可以通过一个"套接字"(传输层)与另一个进程建立起通信连接。Socket也是一种IPC(Inter-Process Communication,进程间通信)机制。通信双方各自占用两端独立的socket资源,进行数据的收发。如下图所示:
- 每个socket都由一个全局唯一的4字节长地址标识
- socket分为三类:
- 流式Socket:提供面向流的数据传输方式,例如TCP Socket、UDP Socket等。
- 数据报式Socket:提供不可靠的数据传输方式,例如ICMP Socket、IGMP Socket等。
- 可选Socket:可以利用其他协议来实现对原始IP包的处理。例如RAW Socket、UNIX Domain Socket、TUN/TAP Socket等。
1.2.Socket编程模型
传统上,客户端与服务器之间需要建立通信连接,完成数据的收发。在早期的开发阶段,客户端到服务器之间的数据交换主要依赖于“阻塞式”I/O模型和系统调用。阻塞式I/O模型是指,用户程序需要等待直至数据读写或系统调用返回才可以继续运行。这种模型虽然简单易用,但是效率低下,不适合高并发场景下的需求。因此,后来又引入了非阻塞式I/O模型,允许用户程序异步地进行I/O操作,由内核负责切换线程,提高了程序的响应速度。
随着互联网的发展,越来越多的服务需要被部署在分布式环境下,服务器数量逐渐增加,客户机数量也在快速增长。为了满足高并发需求,新的服务架构设计通常采用“事件驱动”的模型。在这种模型中,服务器不再主动向客户端发送消息,而是当客户端请求时,服务器将产生一个事件通知客户端,客户端通过轮询的方式来获取新的数据。因此,对于服务器来说,只需要监听来自客户端的事件通知即可。
但是,如果要实现这种架构,就需要统一标准的API来描述Socket接口,方便不同语言之间的交互。目前,国际标准组织IETF已经定义了Socket API,它包括四种类型:
- Stream Sockets:用于基于TCP/IP协议的流式通信,如TCP Socket;
- Datagram Sockets:用于基于UDP/IP协议的消息通信,如UDP Socket;
- Raw Sockets:允许原始IP包的读写,如RAW Socket;
- Connectionless Sockets:不建立连接,只支持发送、接收数据,如ICMP Socket、IGMP Socket。
因此,对于不同的编程语言来说,都有相应的Socket库来实现这些功能,比如Java里面的java.net.Socket和javax.net.ssl.SSLSocket,Python里面的是socket模块,Ruby里面的是Ruby's built-in TCPServer和TCPSocket,PHP里面的是fsockopen函数。
综上所述,Socket编程模型包含三个重要层次:
- 传输层:处理底层的网络连接细节,如Socket地址、端口号等。
- 网络层:实现网络层的各种功能,如路由、拥塞控制、分包与重组等。
- 应用层:封装应用层的各种协议,如HTTP、FTP、SMTP等。
Socket编程模型旨在屏蔽底层网络传输协议的复杂性,让开发人员更加关注业务逻辑。
1.3.Socket编程的特点
相比于传统的C/S模式,Socket模式更加灵活可控。下面列举Socket编程的一些特点:
- 非阻塞式I/O模型:Socket是由操作系统维护的,不会引起程序的阻塞,可以同时处理多个连接,有效提高了并发性能。
- 支持异步处理:IO多路复用模型使得Socket可以同时监视多个Socket,并根据状态情况进行相应的读写操作。
- 事件驱动型架构:Socket的事件驱动型架构使得服务器端不需要主动向客户端推送信息,只需等待客户端的请求即可。
- 支持多种协议:Socket支持多种协议,如TCP、UDP、SCTP、DCCP等。
- 自动错误处理:Socket提供了自动错误处理机制,能够识别出诸如网络断开、连接超时、远程主机崩溃等异常情况,并进行相应的错误恢复。
- 跨平台兼容性好:Socket具有良好的跨平台兼容性,几乎可以在任何平台上进行Socket编程。
二、Socket编程基础知识
2.1.创建套接字
创建一个Socket最常用的方法就是调用socket()函数。这个函数需要传入的参数有两个,分别是AF_INET表示使用IPv4协议,SOCK_STREAM表示使用TCP协议。返回值是一个代表Socket的整形变量,失败则返回一个负值。
package main
import (
"fmt"
"net"
"os"
)
func main() {
// 创建一个Socket对象
conn, err := net.Dial("tcp", "www.baidu.com:80")
if err!= nil {
fmt.Println(err)
os.Exit(-1)
}
defer conn.Close()
// 使用Socket进行读写
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err!= nil {
fmt.Println(err)
return
}
fmt.Printf("%d bytes read from server:\n%s\n", n, string(buffer[:n]))
_, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"))
if err!= nil {
fmt.Println(err)
return
}
// 读取响应头部
for {
line, err := conn.ReadString('\n')
if err!= nil || len(line) == 0 {
break
}
fmt.Print(line)
}
// 读取响应体
n, err = io.Copy(ioutil.Discard, conn)
if err!= nil {
fmt.Println(err)
return
}
fmt.Printf("\nread %d bytes of content.\n", n)
}
代码注释:
conn, err := net.Dial("tcp", "www.baidu.com:80")
:创建了一个tcp类型的Socket连接到www.baidu.com的80端口,成功返回一个*net.TCPConn对象,失败返回nil及错误。defer conn.Close()
:程序执行结束后关闭Socket连接。_, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"))
:向服务器发送一个HTTP GET请求,成功返回nil及错误。for {line, err := conn.ReadString('\n'); if err!= nil || len(line) == 0 {break}}
:读取服务器响应的每一行,直到空行为止。n, err := io.Copy(ioutil.Discard, conn)
:读取服务器响应的内容,成功返回字节数及nil,失败返回0及错误。if err!= nil {fmt.Println(err); return}
:如果有错误发生,打印错误信息并返回。
注意:
如果服务器不存在或无法访问,
net.Dial()
会阻塞到超时或发生其他错误。如果服务器存在多个服务或者端口,需要指定正确的端口号才能连接成功。
如果希望连接过程异步进行,可以使用
net.DialTimeout()
或net.Listen()
配合select{}
,实现更高级的Socket编程。可以使用
netcat
工具来测试Socket的连通性,示例命令:nc -v -z www.baidu.com 80
输出结果:
www.baidu.com (192.168.127.12): tcp port 80 open www.baidu.com (192.168.127.12): tcp port 80 open www.baidu.com (192.168.127.12): Name or service not known
2.2.连接管理
为了提升Socket的可用性,避免因频繁创建和销毁Socket造成资源浪费,Go语言中提供了连接池net.ConnPool
。net.ConnPool
缓存了之前创建的连接,可以重复使用,避免反复创建和销毁造成系统开销。
2.2.1.连接池概念
连接池是一种缓存技术,用来保存并重用连接以减少创建和关闭连接带来的开销。连接池解决的问题是在高并发环境下,频繁创建和销毁Socket造成系统资源消耗过多的问题。
一般来说,连接池的工作原理是预先创建一定数量的连接,然后放入一个池子中供客户端使用。当一个客户端需要新建连接时,就可以从池子中取出一个现有的连接,这样可以降低新建连接的时间。当客户端释放连接时,就可以把连接归还给连接池,供其他客户端使用。
2.2.2.创建连接池
创建一个连接池很简单,只需要调用net.Dial()
创建一个连接,然后调用net.NewConnPool()
创建连接池,并指定最大连接数。
package main
import (
"fmt"
"net"
"sync"
)
var pool *net.ConnPool
func init() {
maxConns := 100 // 最大连接数
dialer := func() (net.Conn, error) {
// 通过调用net.Dial()创建连接
c, err := net.Dial("tcp", "www.baidu.com:80")
return c, err
}
// 创建连接池
var once sync.Once
pool, _ = net.NewConnPool(maxConns, dialer)
}
func handleClient(client net.Conn) {
// 从连接池中取出一个连接
conn, err := pool.Get()
if err!= nil {
client.Close()
fmt.Println(err)
return
}
// 读写连接
go copyData(conn.(net.Conn), client)
go copyData(client, conn.(net.Conn))
}
func copyData(dst io.Writer, src io.Reader) {
buf := make([]byte, 1024)
for {
n, err := src.Read(buf)
if err!= nil || n <= 0 {
break
}
dst.Write(buf[0:n])
}
}
func main() {
listener, err := net.Listen("tcp", ":8000")
if err!= nil {
fmt.Println(err)
return
}
defer listener.Close()
for {
conn, err := listener.Accept()
if err!= nil {
continue
}
go handleClient(conn)
}
}
代码注释:
- 在init()函数中,首先指定最大连接数为100,并创建一个回调函数
dialer()
,该函数用于创建新的连接。 - 在init()函数中,调用
net.NewConnPool()
创建连接池,并指定最大连接数和回调函数。 - 当有客户端连接时,
handleClient()
函数会调用pool.Get()
从连接池中取出一个连接,并启动两个协程分别读写两个连接。 - 函数
copyData()
用于异步复制数据。
注意:
- 连接池中所有连接都是未加密的。
- 更多关于连接池的配置可以参考源码:
src/net/http/transport.go
。
2.3.Socket参数设置
2.3.1.Socket缓冲区大小设置
默认情况下,Socket的缓冲区大小设置为0。当接收到的数据大于缓冲区大小时,可能会出现粘包或拆包现象,导致程序无法正常工作。因此,一般建议设置Socket缓冲区大小为合适的值。
在Go语言中,可以通过调用SetReadBuffer()
和SetWriteBuffer()
函数设置Socket的接收缓冲区和发送缓冲区大小。
package main
import (
"fmt"
"net"
)
func main() {
conn, err := net.Dial("tcp", "www.baidu.com:80")
if err!= nil {
fmt.Println(err)
os.Exit(-1)
}
defer conn.Close()
// 设置缓冲区大小
bufferSize := 1024 * 1024 // 1M
conn.(*net.TCPConn).SetReadBuffer(bufferSize)
conn.(*net.TCPConn).SetWriteBuffer(bufferSize)
// 读写连接
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err!= nil {
fmt.Println(err)
return
}
fmt.Printf("%d bytes read from server:\n%s\n", n, string(buffer[:n]))
_, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"))
if err!= nil {
fmt.Println(err)
return
}
// 读取响应头部
for {
line, err := conn.ReadString('\n')
if err!= nil || len(line) == 0 {
break
}
fmt.Print(line)
}
// 读取响应体
n, err = io.Copy(ioutil.Discard, conn)
if err!= nil {
fmt.Println(err)
return
}
fmt.Printf("\nread %d bytes of content.\n", n)
}
代码注释:
conn.(*net.TCPConn).SetReadBuffer(bufferSize)
:设置Socket接收缓冲区大小为1M。conn.(*net.TCPConn).SetWriteBuffer(bufferSize)
:设置Socket发送缓冲区大小为1M。
2.3.2.Socket超时设置
对于Socket连接,最重要的一个参数是超时时间。如果客户端在指定时间内没有收到服务器端的响应,就会触发超时事件,此时客户端应该重新发起连接。否则,客户端认为连接已经断开,并一直等待服务器响应,造成程序的无限等待。
在Go语言中,可以通过调用SetDeadline()
函数设置超时时间。
package main
import (
"fmt"
"net"
"time"
)
func main() {
conn, err := net.Dial("tcp", "www.baidu.com:80")
if err!= nil {
fmt.Println(err)
os.Exit(-1)
}
defer conn.Close()
// 设置超时时间
timeoutDuration := time.Second * 5
conn.(*net.TCPConn).SetDeadline(time.Now().Add(timeoutDuration))
// 读写连接
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err!= nil {
fmt.Println(err)
return
}
fmt.Printf("%d bytes read from server:\n%s\n", n, string(buffer[:n]))
_, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"))
if err!= nil {
fmt.Println(err)
return
}
// 读取响应头部
for {
line, err := conn.ReadString('\n')
if err!= nil || len(line) == 0 {
break
}
fmt.Print(line)
}
// 读取响应体
n, err = io.Copy(ioutil.Discard, conn)
if err!= nil {
fmt.Println(err)
return
}
fmt.Printf("\nread %d bytes of content.\n", n)
}
代码注释:
conn.(*net.TCPConn).SetDeadline(time.Now().Add(timeoutDuration))
:设置Socket超时时间为5秒。- 当超过5秒没有收到服务器响应时,程序会抛出超时错误。
2.3.3.Socket超时重试
即使设置了超时时间,但是仍然可能由于网络原因导致Socket连接断开。因此,还需要设置超时重试机制,防止程序陷入无限等待。
package main
import (
"fmt"
"net"
"time"
)
func main() {
conn, err := net.Dial("tcp", "www.baidu.com:80")
if err!= nil {
fmt.Println(err)
os.Exit(-1)
}
defer conn.Close()
// 设置超时时间
timeoutDuration := time.Second * 5
conn.(*net.TCPConn).SetDeadline(time.Now().Add(timeoutDuration))
// 设置超时重试次数
retryCount := 3
deadline := time.Now().Add(retryCount * timeoutDuration)
for i := 0; ; i++ {
_, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"))
if err!= nil {
fmt.Println(err)
return
}
// 读取响应头部
headerBytes := make([]byte, 0)
for {
line, err := conn.ReadBytes('\n')
if err!= nil || len(line) == 0 {
break
}
headerBytes = append(headerBytes, line...)
}
responseHeaderStr := strings.TrimSpace(string(headerBytes))
responseHeader := http.Response{StatusLine: responseHeaderStr}
statusCode := responseHeader.StatusCode
// 判断是否应该重试
if statusCode >= 200 && statusCode < 300 {
break
} else if time.Now().After(deadline) {
fmt.Println("connect to baidu failed.")
return
} else {
fmt.Println("connect to baidu failed, will try again after some seconds...")
<-time.After((i+1)*timeoutDuration) // 重试间隔
}
}
// 读取响应体
bodyBytes := make([]byte, 0)
reader := bufio.NewReader(conn)
contentLength := int(responseHeader.ContentLength)
for {
b, err := reader.Peek(contentLength + 1)
if err!= nil || len(b) > contentLength {
break
}
bodyBytes = append(bodyBytes, b...)
reader.Discard(len(b))
contentLength -= len(b)
}
contentType := responseHeader.Header["Content-Type"][0]
if strings.Contains(contentType, "html") {
fmt.Println(string(bodyBytes))
} else if strings.Contains(contentType, "json") {
jsonObj, err := simplejson.NewJson(bodyBytes)
if err!= nil {
fmt.Println(err)
return
}
fmt.Println(jsonObj)
} else {
fmt.Printf("%d bytes of data received.", len(bodyBytes))
}
}
代码注释:
- 设定超时时间为5秒。
- 将超时重试次数设置为3。
- 用死循环等待响应,超时后退出程序。
- 判断是否应该重试,超时后退出程序。
- 如果响应头中包含html内容,解析HTML内容并打印;如果响应头中包含json内容,解析JSON内容并打印;否则打印字节数。