Go必知必会系列:网络通信与Socket

发布于:2023-09-27 ⋅ 阅读:(79) ⋅ 点赞:(0)

作者:禅与计算机程序设计艺术

1.简介

1.1.什么是Socket?

Socket又称"套接字",应用程序通常通过"套接字"向网络中某台计算机请求服务或等待网络中的数据。简单的说,一个进程(应用层)可以通过一个"套接字"(传输层)与另一个进程建立起通信连接。Socket也是一种IPC(Inter-Process Communication,进程间通信)机制。通信双方各自占用两端独立的socket资源,进行数据的收发。如下图所示:

  1. 每个socket都由一个全局唯一的4字节长地址标识
  2. socket分为三类:
    1. 流式Socket:提供面向流的数据传输方式,例如TCP Socket、UDP Socket等。
    2. 数据报式Socket:提供不可靠的数据传输方式,例如ICMP Socket、IGMP Socket等。
    3. 可选Socket:可以利用其他协议来实现对原始IP包的处理。例如RAW Socket、UNIX Domain Socket、TUN/TAP Socket等。

1.2.Socket编程模型

传统上,客户端与服务器之间需要建立通信连接,完成数据的收发。在早期的开发阶段,客户端到服务器之间的数据交换主要依赖于“阻塞式”I/O模型和系统调用。阻塞式I/O模型是指,用户程序需要等待直至数据读写或系统调用返回才可以继续运行。这种模型虽然简单易用,但是效率低下,不适合高并发场景下的需求。因此,后来又引入了非阻塞式I/O模型,允许用户程序异步地进行I/O操作,由内核负责切换线程,提高了程序的响应速度。

随着互联网的发展,越来越多的服务需要被部署在分布式环境下,服务器数量逐渐增加,客户机数量也在快速增长。为了满足高并发需求,新的服务架构设计通常采用“事件驱动”的模型。在这种模型中,服务器不再主动向客户端发送消息,而是当客户端请求时,服务器将产生一个事件通知客户端,客户端通过轮询的方式来获取新的数据。因此,对于服务器来说,只需要监听来自客户端的事件通知即可。

但是,如果要实现这种架构,就需要统一标准的API来描述Socket接口,方便不同语言之间的交互。目前,国际标准组织IETF已经定义了Socket API,它包括四种类型:

  1. Stream Sockets:用于基于TCP/IP协议的流式通信,如TCP Socket;
  2. Datagram Sockets:用于基于UDP/IP协议的消息通信,如UDP Socket;
  3. Raw Sockets:允许原始IP包的读写,如RAW Socket;
  4. Connectionless Sockets:不建立连接,只支持发送、接收数据,如ICMP Socket、IGMP Socket。

因此,对于不同的编程语言来说,都有相应的Socket库来实现这些功能,比如Java里面的java.net.Socket和javax.net.ssl.SSLSocket,Python里面的是socket模块,Ruby里面的是Ruby's built-in TCPServer和TCPSocket,PHP里面的是fsockopen函数。

综上所述,Socket编程模型包含三个重要层次:

  1. 传输层:处理底层的网络连接细节,如Socket地址、端口号等。
  2. 网络层:实现网络层的各种功能,如路由、拥塞控制、分包与重组等。
  3. 应用层:封装应用层的各种协议,如HTTP、FTP、SMTP等。

Socket编程模型旨在屏蔽底层网络传输协议的复杂性,让开发人员更加关注业务逻辑。

1.3.Socket编程的特点

相比于传统的C/S模式,Socket模式更加灵活可控。下面列举Socket编程的一些特点:

  1. 非阻塞式I/O模型:Socket是由操作系统维护的,不会引起程序的阻塞,可以同时处理多个连接,有效提高了并发性能。
  2. 支持异步处理:IO多路复用模型使得Socket可以同时监视多个Socket,并根据状态情况进行相应的读写操作。
  3. 事件驱动型架构:Socket的事件驱动型架构使得服务器端不需要主动向客户端推送信息,只需等待客户端的请求即可。
  4. 支持多种协议:Socket支持多种协议,如TCP、UDP、SCTP、DCCP等。
  5. 自动错误处理:Socket提供了自动错误处理机制,能够识别出诸如网络断开、连接超时、远程主机崩溃等异常情况,并进行相应的错误恢复。
  6. 跨平台兼容性好:Socket具有良好的跨平台兼容性,几乎可以在任何平台上进行Socket编程。

二、Socket编程基础知识

2.1.创建套接字

创建一个Socket最常用的方法就是调用socket()函数。这个函数需要传入的参数有两个,分别是AF_INET表示使用IPv4协议,SOCK_STREAM表示使用TCP协议。返回值是一个代表Socket的整形变量,失败则返回一个负值。

package main

import (
    "fmt"
    "net"
    "os"
)

func main() {
    // 创建一个Socket对象
    conn, err := net.Dial("tcp", "www.baidu.com:80")
    if err!= nil {
        fmt.Println(err)
        os.Exit(-1)
    }
    defer conn.Close()

    // 使用Socket进行读写
    buffer := make([]byte, 1024)
    n, err := conn.Read(buffer)
    if err!= nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("%d bytes read from server:\n%s\n", n, string(buffer[:n]))

    _, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"))
    if err!= nil {
        fmt.Println(err)
        return
    }

    // 读取响应头部
    for {
        line, err := conn.ReadString('\n')
        if err!= nil || len(line) == 0 {
            break
        }
        fmt.Print(line)
    }
    // 读取响应体
    n, err = io.Copy(ioutil.Discard, conn)
    if err!= nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("\nread %d bytes of content.\n", n)
}

代码注释:

  1. conn, err := net.Dial("tcp", "www.baidu.com:80"):创建了一个tcp类型的Socket连接到www.baidu.com的80端口,成功返回一个*net.TCPConn对象,失败返回nil及错误。
  2. defer conn.Close():程序执行结束后关闭Socket连接。
  3. _, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n")):向服务器发送一个HTTP GET请求,成功返回nil及错误。
  4. for {line, err := conn.ReadString('\n'); if err!= nil || len(line) == 0 {break}}:读取服务器响应的每一行,直到空行为止。
  5. n, err := io.Copy(ioutil.Discard, conn):读取服务器响应的内容,成功返回字节数及nil,失败返回0及错误。
  6. if err!= nil {fmt.Println(err); return}:如果有错误发生,打印错误信息并返回。

注意:

  1. 如果服务器不存在或无法访问,net.Dial()会阻塞到超时或发生其他错误。

  2. 如果服务器存在多个服务或者端口,需要指定正确的端口号才能连接成功。

  3. 如果希望连接过程异步进行,可以使用net.DialTimeout()net.Listen()配合select{},实现更高级的Socket编程。

  4. 可以使用netcat工具来测试Socket的连通性,示例命令:

    nc -v -z www.baidu.com 80

    输出结果:

    www.baidu.com (192.168.127.12): tcp port 80 open
    www.baidu.com (192.168.127.12): tcp port 80 open
    www.baidu.com (192.168.127.12): Name or service not known

2.2.连接管理

为了提升Socket的可用性,避免因频繁创建和销毁Socket造成资源浪费,Go语言中提供了连接池net.ConnPoolnet.ConnPool缓存了之前创建的连接,可以重复使用,避免反复创建和销毁造成系统开销。

2.2.1.连接池概念

连接池是一种缓存技术,用来保存并重用连接以减少创建和关闭连接带来的开销。连接池解决的问题是在高并发环境下,频繁创建和销毁Socket造成系统资源消耗过多的问题。

一般来说,连接池的工作原理是预先创建一定数量的连接,然后放入一个池子中供客户端使用。当一个客户端需要新建连接时,就可以从池子中取出一个现有的连接,这样可以降低新建连接的时间。当客户端释放连接时,就可以把连接归还给连接池,供其他客户端使用。

2.2.2.创建连接池

创建一个连接池很简单,只需要调用net.Dial()创建一个连接,然后调用net.NewConnPool()创建连接池,并指定最大连接数。

package main

import (
    "fmt"
    "net"
    "sync"
)

var pool *net.ConnPool

func init() {
    maxConns := 100 // 最大连接数
    dialer := func() (net.Conn, error) {
        // 通过调用net.Dial()创建连接
        c, err := net.Dial("tcp", "www.baidu.com:80")
        return c, err
    }
    // 创建连接池
    var once sync.Once
    pool, _ = net.NewConnPool(maxConns, dialer)
}

func handleClient(client net.Conn) {
    // 从连接池中取出一个连接
    conn, err := pool.Get()
    if err!= nil {
        client.Close()
        fmt.Println(err)
        return
    }

    // 读写连接
    go copyData(conn.(net.Conn), client)
    go copyData(client, conn.(net.Conn))
}

func copyData(dst io.Writer, src io.Reader) {
    buf := make([]byte, 1024)
    for {
        n, err := src.Read(buf)
        if err!= nil || n <= 0 {
            break
        }
        dst.Write(buf[0:n])
    }
}

func main() {
    listener, err := net.Listen("tcp", ":8000")
    if err!= nil {
        fmt.Println(err)
        return
    }
    defer listener.Close()

    for {
        conn, err := listener.Accept()
        if err!= nil {
            continue
        }

        go handleClient(conn)
    }
}

代码注释:

  1. 在init()函数中,首先指定最大连接数为100,并创建一个回调函数dialer(),该函数用于创建新的连接。
  2. 在init()函数中,调用net.NewConnPool()创建连接池,并指定最大连接数和回调函数。
  3. 当有客户端连接时,handleClient()函数会调用pool.Get()从连接池中取出一个连接,并启动两个协程分别读写两个连接。
  4. 函数copyData()用于异步复制数据。

注意:

  1. 连接池中所有连接都是未加密的。
  2. 更多关于连接池的配置可以参考源码:src/net/http/transport.go

2.3.Socket参数设置

2.3.1.Socket缓冲区大小设置

默认情况下,Socket的缓冲区大小设置为0。当接收到的数据大于缓冲区大小时,可能会出现粘包或拆包现象,导致程序无法正常工作。因此,一般建议设置Socket缓冲区大小为合适的值。

在Go语言中,可以通过调用SetReadBuffer()SetWriteBuffer()函数设置Socket的接收缓冲区和发送缓冲区大小。

package main

import (
    "fmt"
    "net"
)

func main() {
    conn, err := net.Dial("tcp", "www.baidu.com:80")
    if err!= nil {
        fmt.Println(err)
        os.Exit(-1)
    }
    defer conn.Close()

    // 设置缓冲区大小
    bufferSize := 1024 * 1024 // 1M
    conn.(*net.TCPConn).SetReadBuffer(bufferSize)
    conn.(*net.TCPConn).SetWriteBuffer(bufferSize)

    // 读写连接
    buffer := make([]byte, 1024)
    n, err := conn.Read(buffer)
    if err!= nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("%d bytes read from server:\n%s\n", n, string(buffer[:n]))

    _, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"))
    if err!= nil {
        fmt.Println(err)
        return
    }

    // 读取响应头部
    for {
        line, err := conn.ReadString('\n')
        if err!= nil || len(line) == 0 {
            break
        }
        fmt.Print(line)
    }
    // 读取响应体
    n, err = io.Copy(ioutil.Discard, conn)
    if err!= nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("\nread %d bytes of content.\n", n)
}

代码注释:

  1. conn.(*net.TCPConn).SetReadBuffer(bufferSize):设置Socket接收缓冲区大小为1M。
  2. conn.(*net.TCPConn).SetWriteBuffer(bufferSize):设置Socket发送缓冲区大小为1M。

2.3.2.Socket超时设置

对于Socket连接,最重要的一个参数是超时时间。如果客户端在指定时间内没有收到服务器端的响应,就会触发超时事件,此时客户端应该重新发起连接。否则,客户端认为连接已经断开,并一直等待服务器响应,造成程序的无限等待。

在Go语言中,可以通过调用SetDeadline()函数设置超时时间。

package main

import (
    "fmt"
    "net"
    "time"
)

func main() {
    conn, err := net.Dial("tcp", "www.baidu.com:80")
    if err!= nil {
        fmt.Println(err)
        os.Exit(-1)
    }
    defer conn.Close()

    // 设置超时时间
    timeoutDuration := time.Second * 5
    conn.(*net.TCPConn).SetDeadline(time.Now().Add(timeoutDuration))

    // 读写连接
    buffer := make([]byte, 1024)
    n, err := conn.Read(buffer)
    if err!= nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("%d bytes read from server:\n%s\n", n, string(buffer[:n]))

    _, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"))
    if err!= nil {
        fmt.Println(err)
        return
    }

    // 读取响应头部
    for {
        line, err := conn.ReadString('\n')
        if err!= nil || len(line) == 0 {
            break
        }
        fmt.Print(line)
    }
    // 读取响应体
    n, err = io.Copy(ioutil.Discard, conn)
    if err!= nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("\nread %d bytes of content.\n", n)
}

代码注释:

  1. conn.(*net.TCPConn).SetDeadline(time.Now().Add(timeoutDuration)):设置Socket超时时间为5秒。
  2. 当超过5秒没有收到服务器响应时,程序会抛出超时错误。

2.3.3.Socket超时重试

即使设置了超时时间,但是仍然可能由于网络原因导致Socket连接断开。因此,还需要设置超时重试机制,防止程序陷入无限等待。

package main

import (
    "fmt"
    "net"
    "time"
)

func main() {
    conn, err := net.Dial("tcp", "www.baidu.com:80")
    if err!= nil {
        fmt.Println(err)
        os.Exit(-1)
    }
    defer conn.Close()

    // 设置超时时间
    timeoutDuration := time.Second * 5
    conn.(*net.TCPConn).SetDeadline(time.Now().Add(timeoutDuration))

    // 设置超时重试次数
    retryCount := 3
    deadline := time.Now().Add(retryCount * timeoutDuration)
    for i := 0; ; i++ {
        _, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"))
        if err!= nil {
            fmt.Println(err)
            return
        }

        // 读取响应头部
        headerBytes := make([]byte, 0)
        for {
            line, err := conn.ReadBytes('\n')
            if err!= nil || len(line) == 0 {
                break
            }
            headerBytes = append(headerBytes, line...)
        }
        responseHeaderStr := strings.TrimSpace(string(headerBytes))
        responseHeader := http.Response{StatusLine: responseHeaderStr}
        statusCode := responseHeader.StatusCode

        // 判断是否应该重试
        if statusCode >= 200 && statusCode < 300 {
            break
        } else if time.Now().After(deadline) {
            fmt.Println("connect to baidu failed.")
            return
        } else {
            fmt.Println("connect to baidu failed, will try again after some seconds...")
            <-time.After((i+1)*timeoutDuration) // 重试间隔
        }
    }
    // 读取响应体
    bodyBytes := make([]byte, 0)
    reader := bufio.NewReader(conn)
    contentLength := int(responseHeader.ContentLength)
    for {
        b, err := reader.Peek(contentLength + 1)
        if err!= nil || len(b) > contentLength {
            break
        }
        bodyBytes = append(bodyBytes, b...)
        reader.Discard(len(b))
        contentLength -= len(b)
    }
    contentType := responseHeader.Header["Content-Type"][0]
    if strings.Contains(contentType, "html") {
        fmt.Println(string(bodyBytes))
    } else if strings.Contains(contentType, "json") {
        jsonObj, err := simplejson.NewJson(bodyBytes)
        if err!= nil {
            fmt.Println(err)
            return
        }
        fmt.Println(jsonObj)
    } else {
        fmt.Printf("%d bytes of data received.", len(bodyBytes))
    }
}

代码注释:

  1. 设定超时时间为5秒。
  2. 将超时重试次数设置为3。
  3. 用死循环等待响应,超时后退出程序。
  4. 判断是否应该重试,超时后退出程序。
  5. 如果响应头中包含html内容,解析HTML内容并打印;如果响应头中包含json内容,解析JSON内容并打印;否则打印字节数。

网站公告

今日签到

点亮在社区的每一天
去签到