UCP实验性质的拥塞控制协议实现(存档)。

发布于:2024-05-20 ⋅ 阅读:(129) ⋅ 点赞:(0)

大体实现,缺省模拟双向各丢10%的情况,即折合在20%网络丢包率下工作。

###################

1、快速重传

2、积累确认

3、ARQ自动重传

4、SWS糊涂窗口综合征处理

5、延迟ACK

6、SACK选择确认

7、链接半关闭

8、NAK否定应答

9、滑块窗口

10、指数平均加权退让

等等,感兴趣的童鞋,可以自行研究,这是个网络流控控制协议。

#include <functional>
#include <memory>
#include <list>
#include <chrono>
#include <map>
#include <unordered_set>
#include <unordered_map>

#include <boost/asio.hpp>

typedef uint8_t                                                 Byte;

template <typename T>
std::shared_ptr<T>                                              make_shared_alloc(int length) noexcept 
{
    static_assert(sizeof(T) > 0, "can't make pointer to incomplete type");

    // https://pkg.go.dev/github.com/google/agi/core/os/device
    // ARM64v8a: __ALIGN(8)
    // ARMv7a  : __ALIGN(4)
    // X86_64  : __ALIGN(8)
    // X64     : __ALIGN(4)
    if (length < 1) 
    {
        return NULL;
    }

    T* p = (T*)malloc(length * sizeof(T));
    return std::shared_ptr<T>(p, free);
}

template <typename T, typename... A>
std::shared_ptr<T>                                              make_shared_object(A&&... args) noexcept 
{
    static_assert(sizeof(T) > 0, "can't make pointer to incomplete type");

    void* memory = malloc(sizeof(T));
    if (NULL == memory) 
    {
        return NULL;
    }

    T* m = new (memory) T(std::forward<A&&>(args)...);
    return std::shared_ptr<T>(m,
        [](T* p) noexcept 
        {
            if (NULL != p) 
            {
                p->~T();
                free(p);
            }
        });
}

int                                                             RandomNext(volatile unsigned int* seed) noexcept
{
    unsigned int next = *seed;
    int result;

    next *= 1103515245;
    next += 12345;
    result = (unsigned int)(next / 65536) % 2048;

    next *= 1103515245;
    next += 12345;
    result <<= 10;
    result ^= (unsigned int)(next / 65536) % 1024;

    next *= 1103515245;
    next += 12345;
    result <<= 10;
    result ^= (unsigned int)(next / 65536) % 1024;

    *seed = next;
    return result;
}

int                                                             RandomNext(int min, int max) noexcept
{
    static volatile unsigned int seed = (unsigned int)(GetTickCount() / 1000);

    int v = RandomNext(&seed);
    return v % (max - min + 1) + min;
}

static constexpr int32_t                                        UCP_CMD_SYN = 1;
static constexpr int32_t                                        UCP_CMD_PSH = 2;
static constexpr int32_t                                        UCP_CMD_ACK = 3;
static constexpr int32_t                                        UCP_CMD_FIN = 4;
static constexpr int32_t                                        UCP_CMD_RST = 5;
static constexpr int32_t                                        UCP_CMD_NAK = 6;
static constexpr int32_t                                        UCP_CMD_SYNACK = 7;
static constexpr int32_t                                        UCP_RTO_MAX = 60000;
static constexpr int32_t                                        UCP_STATE_CLOSED = 0;
static constexpr int32_t                                        UCP_STATE_SYN_SENT = 1;
static constexpr int32_t                                        UCP_STATE_SYN_RECVED = 2;
static constexpr int32_t                                        UCP_STATE_ESTABLISHED = 3;
static constexpr int32_t                                        UCP_STATE_CLOSE_WAIT = 4;
static constexpr int32_t                                        UCP_STATE_LAST_ACK = 5;

#define before(seq1, seq2)                                      ((int32_t)(seq1 - seq2) < 0)
#define after(seq2, seq1)                                       (before(seq1, seq2))
#define before_eq(seq1, seq2)                                   ((int32_t)(seq1 - seq2) <= 0)
#define after_eq(seq2, seq1)                                    (before_eq(seq1, seq2))

static inline unsigned short                                    ip_standard_chksum(void* dataptr, int len) noexcept
{
    unsigned int acc;
    unsigned short src;
    unsigned char* octetptr;

    acc = 0;

    /* dataptr may be at odd or even addresses */
    octetptr = (unsigned char*)dataptr;
    while (len > 1)
    {
        /* declare first octet as most significant
           thus assume network order, ignoring host order */
        src = (unsigned short)((*octetptr) << 8);
        octetptr++;

        /* declare second octet as least significant */
        src |= (*octetptr);
        octetptr++;
        acc += src;
        len -= 2;
    }

    if (len > 0)
    {
        /* accumulate remaining octet */
        src = (unsigned short)((*octetptr) << 8);
        acc += src;
    }

    /* add deferred carry bits */
    acc = (unsigned int)((acc >> 16) + (acc & 0x0000ffffUL));
    if ((acc & 0xffff0000UL) != 0)
    {
        acc = (unsigned int)((acc >> 16) + (acc & 0x0000ffffUL));
    }

    /* This maybe a little confusing: reorder sum using htons()
       instead of ntohs() since it has a little less call overhead.
       The caller must invert bits for Internet sum ! */
    return ntohs((unsigned short)acc);
}

static inline unsigned short                                    inet_chksum(void* dataptr, int len) noexcept
{
    return (unsigned short)~ip_standard_chksum(dataptr, len);
}

static uint64_t                                                 __GetTickCount() noexcept 
{
    auto now = std::chrono::high_resolution_clock::now();
    return (uint64_t)std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();;
}

#pragma pack(push, 1)
typedef struct                                                  UcpHeader
{
    uint16_t                                                    session_id;
    uint8_t                                                     cmd;
    uint16_t                                                    wnd;
    uint32_t                                                    local_ts;
    uint32_t                                                    remote_ts;
}                                                               UcpHeader;

typedef struct UcpPshHeader : UcpHeader
{
    uint32_t                                                    seq;
    uint32_t                                                    ack;
}                                                               UcpPshHeader;

typedef struct UcpAckHeader : UcpHeader
{
    uint32_t                                                    ack;
}                                                               UcpAckHeader;
#pragma pack(pop)

class UcpConnection;

class UcpEthernet : public std::enable_shared_from_this<UcpEthernet>
{
    friend class                                                UcpConnection;

public:
    class ConnectionKey
    {
    public:
        boost::asio::ip::address                                host;
        int                                                     port;
        uint16_t                                                session_id;

    public:
        std::size_t                                             operator()(const ConnectionKey& k) const
        {
            return std::hash<boost::asio::ip::address>{}(host) ^ (port << 16 | session_id);
        }

    public:
        bool                                                    operator()(const ConnectionKey& lhs, const ConnectionKey& rhs) const
        {
            return lhs.port == rhs.port && lhs.session_id == rhs.session_id && lhs.host == rhs.host;
        }
    };

    typedef std::shared_ptr<UcpConnection>                      ConnectionPtr;
    typedef std::unordered_map<ConnectionKey, 
        ConnectionPtr, ConnectionKey, ConnectionKey>            ConnectionTable;
    typedef std::function<bool(const ConnectionPtr&)>           AcceptEventHandler;
    typedef std::function<void(UcpConnection*, bool)>           ConnectEventHandler;

public:
    AcceptEventHandler                                          AcceptEvent;

public:
    bool                                                        Turbo = false;
    uint32_t                                                    Retries2 = 8;
    uint32_t                                                    SynRetries = 3;
    uint32_t                                                    SynAckRetries = 3;
    uint32_t                                                    OrphanRetries = 5;
    uint32_t                                                    SackRenegBytes = 3;
    uint32_t                                                    DelackMin = 20;
    uint32_t                                                    Mss = 1400;
    uint32_t                                                    MinRto = 100;
    uint32_t                                                    MaxRto = 60000;
    uint32_t                                                    DefaultRto = 1000;
    uint32_t                                                    FinTimeout = 10000;
    uint32_t                                                    Interval = MinRto;
    uint32_t                                                    IntervalMin = 50;
    uint16_t                                                    ReceiveBufferSize = UINT16_MAX;
    uint32_t                                                    RxPacketLossRate = 0;
    uint32_t                                                    TxPacketLossRate = 10;
    uint32_t                                                    RestTimeMaxLimit = 300000;

public:
    UcpEthernet(const std::shared_ptr<boost::asio::io_context>& context, int bind_port) noexcept;
    UcpEthernet(const std::shared_ptr<boost::asio::io_context>& context, int bind_port, const std::shared_ptr<Byte>& buffer, uint32_t buffer_size) noexcept;
    virtual ~UcpEthernet() noexcept;

public:
    std::shared_ptr<boost::asio::io_context>                    GetContext() noexcept { return context_; }
    uint32_t                                                    Now() const noexcept { return now_; }
    virtual bool                                                Run() noexcept;
    ConnectionPtr                                               Connect(const boost::asio::ip::address& host, int port, const ConnectEventHandler& ac) noexcept;

protected:
    virtual bool                                                Accept(const ConnectionPtr& connection) noexcept;

private:
    void                                                        FlushAll() noexcept;
    void                                                        Update() noexcept;
    void                                                        Finalize() noexcept;
    bool                                                        Rst(uint32_t session_id, uint32_t remote_ts) noexcept;
    bool                                                        PacketInput(const void* packet, uint32_t packet_length) noexcept;
    bool                                                        NextTimeout() noexcept;
    bool                                                        ReceiveLoopback() noexcept;
    ConnectionPtr                                               FindConnection(uint16_t session_id) noexcept;
    void                                                        DeleteConnection(UcpConnection* connection) noexcept;
    bool                                                        Output(const void* packet, uint32_t packet_length, const boost::asio::ip::udp::endpoint& remote_endpoint) noexcept;
    bool                                                        Output(const std::shared_ptr<Byte>& packet, uint32_t packet_length, const boost::asio::ip::udp::endpoint& remote_endpoint) noexcept { return Output(packet.get(), packet_length, remote_endpoint); }

private:
    std::shared_ptr<boost::asio::io_context>                    context_;
    boost::asio::ip::udp::socket                                socket_;
    boost::asio::deadline_timer                                 timeout_;
    uint32_t                                                    now_;
    ConnectionTable                                             connections_;
    std::unordered_set<ConnectionPtr>                           flush_list_;
    std::shared_ptr<Byte>                                       buffer_;
    uint32_t                                                    buffer_size_;
    boost::asio::ip::udp::endpoint                              source_endpoint_;
};

class UcpConnection final : public std::enable_shared_from_this<UcpConnection>
{
    friend class                                                UcpEthernet;
    typedef UcpEthernet::ConnectEventHandler                    ConnectEventHandler;

public:
    typedef std::function<void(uint32_t)>                       SendAsyncCallback;
    typedef std::function<void(uint32_t)>                       ReceiveAsyncCallback;

private:
    struct SendPacket
    {
    public:
        uint32_t                                                seq;
        uint32_t                                                retries;
        uint32_t                                                length;
        std::shared_ptr<Byte>                                   buffer;
        SendAsyncCallback                                       ac;
        uint32_t                                                ac_length;
        uint32_t                                                when;
        uint32_t                                                last;
        uint32_t                                                packet_length;

    public:
        void                                                    operator()() noexcept;
    };

    struct ReceivePacket
    {
        uint32_t                                                seq;
        uint32_t                                                offset;
        std::shared_ptr<Byte>                                   packet;
        uint32_t                                                length;
    };

    struct PacketSequenceOrder                                  /* _NODISCARD */
    {
        bool                                                    operator()(const uint32_t& _Left, const uint32_t& _Right) const noexcept /* strengthened */ 
        {
            return before(_Left, _Right);
        }
    };

    typedef std::shared_ptr<SendPacket>                         SendPacketPtr;
    typedef std::shared_ptr<ReceivePacket>                      ReceivePacketPtr;

public:
    UcpConnection(const std::shared_ptr<UcpEthernet>& ethernet) noexcept;
    ~UcpConnection() noexcept;

private:
    void                                                        ProcessAckShutdown(bool rx, bool tx) noexcept;
    bool                                                        ProcessAckAccumulation(uint32_t ack) noexcept;
    bool                                                        ProcessAck(uint32_t ack_no, const uint8_t* packet, uint32_t packet_length, uint8_t cmd, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts, bool nak) noexcept;
    bool                                                        ProcessHalfoff(uint32_t seq, uint32_t ack, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts) noexcept;
    bool                                                        ProcessPush(uint32_t seq, uint32_t ack_no, const uint8_t* payload, uint32_t payload_size, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts) noexcept;

private:
    bool                                                        ProcessAckRange(uint64_t min, uint64_t max, int origin) noexcept;
    bool                                                        ProcessCommon(uint32_t seq, uint32_t ack, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts, const std::function<bool(uint32_t, bool*)>& h1) noexcept;
    bool                                                        AckNow() noexcept;
    bool                                                        Flush(bool retransmissions) noexcept;

private:
    bool                                                        Rto(uint32_t now, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts) noexcept;
    void                                                        Rto(int32_t rtt) noexcept;
    int32_t                                                     Rtt(uint64_t now, uint64_t local_ts) noexcept;
    bool                                                        Cmd(int32_t cmd) noexcept;
    bool                                                        Cmd(const SendPacketPtr& packet) noexcept;
    bool                                                        Cmd(int32_t cmd, uint32_t seq, const void* buffer, uint32_t buffer_size, std::shared_ptr<Byte>& packet, uint32_t& packet_length) noexcept;
    bool                                                        ReadNative(const void* buffer, uint32_t buffer_size, uint32_t length, const ReceiveAsyncCallback& ac) noexcept;

private:
    void                                                        Finalize() noexcept;
    void                                                        Received(uint16_t len) noexcept;
    bool                                                        ReadSomeIfAckedThenTriggerEvent() noexcept;

public:
    bool                                                        ReadSome(const void* buffer, uint32_t length, const ReceiveAsyncCallback& ac) noexcept;
    bool                                                        Read(const void* buffer, uint32_t length, const ReceiveAsyncCallback& ac) noexcept { return ReadNative(buffer, length, length, ac); }
    void                                                        Close() noexcept;
    void                                                        DeleteAllUnsendPacket() noexcept;
    void                                                        Flush() noexcept { Flush(false); }
    bool                                                        Send(const void* buffer, int buffer_size, SendAsyncCallback ac) noexcept;
    uint32_t                                                    State() const noexcept { return state_; }
    uint32_t                                                    SendBacklogBytesSize() noexcept { return SendBacklogBytesSize(false); }
    uint32_t                                                    SendBacklogBytesSize(bool all) noexcept;

private:
    std::shared_ptr<UcpEthernet>                                ethernet_;
    uint16_t                                                    session_id_;
    std::map<uint32_t, ReceivePacketPtr, PacketSequenceOrder>   rcv_packets_;
    std::map<uint32_t, SendPacketPtr, PacketSequenceOrder>      snd_packets_;
    uint32_t                                                    snd_backlog_;
    uint32_t                                                    snd_seq_;
    uint32_t                                                    snd_wnd_;
    uint32_t                                                    rcv_wnd_;
    uint32_t                                                    lasted_ts_;
    uint32_t                                                    rcv_duplicate_ack_;
    uint32_t                                                    rcv_ack_;
    uint32_t                                                    rcv_ack2_;
    uint32_t                                                    rcv_nxt_;
    uint32_t                                                    rcv_rto_;
    uint32_t                                                    rcv_srtt_;
    uint32_t                                                    rcv_rttval_;
    uint32_t                                                    rcv_frt_;
    uint32_t                                                    rcv_ann_right_edge_;
    uint32_t                                                    trx_seq_;
    uint32_t                                                    trx_last_;
    struct
    {
        uint32_t                                                delack_ts;
        uint32_t                                                snd_last;
        uint32_t                                                snd_when;
        uint32_t                                                snd_retries;
        uint32_t                                                fin_seq;
        struct
        {
            bool                                                ack : 1;
            bool                                                nak : 1;
            bool                                                fin : 1;
            bool                                                fst : 1;
            bool                                                server : 1;
            bool                                                delack : 3;
        };
    }                                                           tf_;
    int32_t                                                     state_;
    struct
    {
        void*                                                   buffer;
        uint32_t                                                length;
        ReceiveAsyncCallback                                    ac;
    }                                                           rcv_event_;
    ConnectEventHandler                                         connect_event_;
    boost::asio::ip::udp::endpoint                              remote_endpoint_;
};

UcpConnection::UcpConnection(const std::shared_ptr<UcpEthernet>& ethernet) noexcept
    : ethernet_(ethernet)
    , session_id_(0)
    , snd_backlog_(0)
    , snd_seq_(0)
    , snd_wnd_(0)
    , rcv_wnd_(ethernet->ReceiveBufferSize)
    , lasted_ts_(0)
    , rcv_duplicate_ack_(0)
    , rcv_ack_(0)
    , rcv_ack2_(0)
    , rcv_nxt_(0)
    , rcv_ann_right_edge_(0)
    , rcv_rto_(ethernet->DefaultRto)
    , rcv_srtt_(0)
    , rcv_rttval_(0)
    , rcv_frt_(0)
    , trx_seq_(0)
    , trx_last_(0)
    , state_(UCP_STATE_CLOSED)
{
    tf_.fst = true;
    tf_.fin = false;
    tf_.ack = false;
    tf_.nak = false;
    tf_.server = false;
    tf_.delack = false;
    tf_.delack_ts = 0;
    tf_.fin_seq = 0;

    tf_.snd_last = 0;
    tf_.snd_when = 0;
    tf_.snd_retries = 0;

    rcv_event_.ac = NULL;
    rcv_event_.length = 0;
    rcv_event_.buffer = NULL;
}

UcpConnection::~UcpConnection() noexcept
{
    Finalize();
}

void UcpConnection::Rto(int32_t rtt) noexcept
{
    rtt = std::max<int32_t>(1, rtt);
    if (rcv_srtt_ == 0)
    {
        rcv_srtt_ = rtt;
        rcv_rttval_ = std::max<int32_t>(1, rtt / 2);
    }
    else
    {
        int32_t delta = rtt - rcv_srtt_;
        if (delta < 0)
        {
            delta = -delta;
        }

        rcv_rttval_ = std::max<int32_t>(1, (3 * rcv_rttval_ + delta) / 4);
        rcv_srtt_ = std::max<int32_t>(1, (7 * rcv_srtt_ + rtt) / 8);
    }

    int32_t rto = rcv_srtt_ + std::max<int32_t>(ethernet_->Interval, 4 * rcv_rttval_);
    rcv_rto_ = std::min<int32_t>(std::max<int32_t>(ethernet_->MinRto, rto), UCP_RTO_MAX);
}

int32_t UcpConnection::Rtt(uint64_t now, uint64_t local_ts) noexcept
{
    if (before(local_ts, now))
    {
        now += 1ULL << 32;
    }

    int32_t rtt = static_cast<int32_t>(now - local_ts);
    return rtt >= 0 ? rtt : -1;
}

bool UcpConnection::ProcessAckRange(uint64_t min, uint64_t max, int origin) noexcept
{
    uint32_t now = ethernet_->Now();
    bool any = false;
    bool fin = false;
    auto ack = [this, &any, &fin, now]() noexcept -> bool
        {
            if (state_ == UCP_STATE_SYN_RECVED)
            {
                state_ = UCP_STATE_ESTABLISHED;
                rcv_ack2_ = ++rcv_ack_;
                rcv_duplicate_ack_ = rcv_ack2_;
                trx_last_ = now;

                tf_.snd_last = 0;
                tf_.snd_when = 0;
                tf_.snd_retries = 0;

                fin = !ethernet_->Accept(shared_from_this());
                return true;
            }
            else if (state_ == UCP_STATE_CLOSE_WAIT)
            {
                ProcessAckShutdown(false, true);
                return true;
            }
            else if (state_ == UCP_STATE_LAST_ACK)
            {
                ProcessAckShutdown(true, true);
                return true;
            }
            else
            {
                return false;
            }
        };

    for (uint32_t i = min; before_eq(i, max) && after_eq(max, trx_seq_); i++)
    {
        printf("Ack: %u, max=%u, origin=%d\n ", (uint32_t)i, (uint32_t)max, origin);
        if (i == snd_seq_ && ack())
        {
            any |= true;
        }
        else
        {
            auto tail = snd_packets_.find(static_cast<uint32_t>(i));
            auto endl = snd_packets_.end();
            if (tail != endl)
            {
                SendPacketPtr left = std::move(tail->second);
                tail = snd_packets_.erase(tail);

                any = true;
                trx_last_ = now;
                (*left)();

                if (tail != endl)
                {
                    SendPacketPtr& reft = tail->second;
                    if (after(reft->seq, max))
                    {
                        break;
                    }
                }
            }

            tail = snd_packets_.begin();
            endl = snd_packets_.end();
            if (tail == endl)
            {
                if (after_eq(max, snd_seq_))
                {
                    i = snd_seq_;
                    any |= ack();
                }

                break;
            }
        }
    }

    if (fin)
    {
        Close();
    }

    return any;
}

bool UcpConnection::ProcessAckAccumulation(uint32_t ack) noexcept
{
    if (rcv_duplicate_ack_ == ack)
    {
        if (rcv_frt_++ >= ethernet_->SackRenegBytes)
        {
            tf_.ack = true;
            tf_.nak = true;
            rcv_frt_ = 0;
        }
    }
    else
    {
        rcv_frt_ = 0;
        rcv_duplicate_ack_ = ack;
    }

    if (before(snd_seq_, ack))
    {
        uint32_t max = ack - 1;
        return ProcessAckRange(snd_seq_, max, 4);
    }

    auto tail = snd_packets_.begin();
    auto endl = snd_packets_.end();
    if (tail != endl)
    {
        SendPacketPtr& pkg = tail->second;
        if (after(ack, pkg->seq))
        {
            return ProcessAckRange(pkg->seq, ack, 2);
        }
    }
    
    return false;
}

bool UcpConnection::ProcessAck(uint32_t ack_no, const uint8_t* packet, uint32_t packet_length, uint8_t cmd, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts, bool nak) noexcept
{
    typedef std::pair<uint32_t, uint32_t> U32U32KeyPair;

    uint32_t now = ethernet_->Now();
    if (!Rto(now, wnd, remote_ts, local_ts))
    {
        return false;
    }

    bool any = false;
    uint8_t* p = (uint8_t*)packet;
    uint32_t len = packet_length;

    any |= ProcessAckAccumulation(ack_no);
    while (len > 0)
    {
        len--;
        if (*p++)
        {
            if (len < 8)
            {
                break;
            }

            uint32_t* u = (uint32_t*)p;
            p += 8;
            len -= 8;

            uint32_t min = ntohl(u[0]);
            uint32_t max = ntohl(u[1]);
            any |= ProcessAckRange(min, max, 1);
        }
        else
        {
            if (len < 4)
            {
                break;
            }

            uint32_t* u = (uint32_t*)p;
            p += 4;
            len -= 4;

            uint32_t key = htonl(*u);
            any |= ProcessAckRange(key, key, 1);
        }
    }

    Flush(nak);
    return any;
}

bool UcpConnection::Cmd(int32_t cmd) noexcept
{
    std::shared_ptr<Byte> packet;
    uint32_t packet_length;

    return Cmd(cmd, snd_seq_, NULL, 0, packet, packet_length);
}

bool UcpConnection::Cmd(int32_t cmd, uint32_t seq, const void* buffer, uint32_t buffer_size, std::shared_ptr<Byte>& packet, uint32_t& packet_length) noexcept
{
    bool PSH = false;
    bool ACK = false;

    packet_length = sizeof(UcpHeader);
    if (cmd == UCP_CMD_PSH)
    {
        if (NULL == buffer || buffer_size == 0)
        {
            return false;
        }

        PSH = true;
        packet_length = sizeof(UcpPshHeader) + buffer_size;
    }
    else if (cmd == UCP_CMD_ACK || cmd == UCP_CMD_NAK || cmd == UCP_CMD_SYN || cmd == UCP_CMD_RST)
    {
        ACK = true;
        packet_length = sizeof(UcpAckHeader);
    }
    else if (cmd == UCP_CMD_FIN || cmd == UCP_CMD_SYNACK)
    {
        PSH = true;
        buffer = NULL;
        buffer_size = 0;
        packet_length = sizeof(UcpPshHeader);
    }

    packet = make_shared_alloc<Byte>(packet_length);
    if (NULL == packet)
    {
        return false;
    }

    UcpHeader* h = (UcpHeader*)packet.get();
    h->cmd = cmd;
    h->local_ts = htonl(lasted_ts_);
    h->remote_ts = htonl(ethernet_->Now());
    h->wnd = htons(rcv_wnd_);
    h->session_id = htons(session_id_);

    if (PSH)
    {
        UcpPshHeader* h2 = (UcpPshHeader*)h;
        h2->ack = htonl(rcv_ack_);
        h2->seq = htonl(snd_seq_);

        Byte* payload = (Byte*)(h2 + 1);
        memcpy(payload, buffer, buffer_size);

        if (NULL == buffer || buffer_size == 0)
        {
            ethernet_->Output(packet, packet_length, remote_endpoint_);
        }
    }
    else
    {
        if (ACK)
        {
            UcpAckHeader* h2 = (UcpAckHeader*)h;
            h2->ack = htonl(cmd != UCP_CMD_SYN ? rcv_ack_ : snd_seq_);
        }

        ethernet_->Output(packet, packet_length, remote_endpoint_);
        if (cmd == UCP_CMD_RST)
        {
            ProcessAckShutdown(true, true);
        }
    }

    return true;
}

bool UcpConnection::Cmd(const SendPacketPtr& packet) noexcept
{
    std::shared_ptr<Byte> buffer;
    uint32_t buffer_size;

    Cmd(UCP_CMD_PSH, packet->seq, packet->buffer.get(), packet->length, buffer, buffer_size);
    if (NULL == buffer || buffer_size == 0)
    {
        return false;
    }

    packet->buffer = buffer;
    packet->packet_length = buffer_size;
    return true;
}

void UcpConnection::Finalize() noexcept
{
    ReceiveAsyncCallback ac = std::move(rcv_event_.ac);
    rcv_event_.ac = NULL;
    rcv_event_.length = 0;
    rcv_event_.buffer = NULL;

    if (NULL != ac)
    {
        ac(0);
    }

    ConnectEventHandler connect_event = std::move(connect_event_);
    connect_event_ = NULL;

    if (NULL != connect_event)
    {
        connect_event(this, false);
    }

    state_ = UCP_STATE_CLOSED;
    tf_.snd_last = 0;
    tf_.snd_when = 0;
    tf_.snd_retries = 0;

    snd_packets_.clear();
    rcv_packets_.clear();

    ethernet_->DeleteConnection(this);
}

void UcpConnection::ProcessAckShutdown(bool rx, bool tx) noexcept
{
    if (rx && tx)
    {
        printf("shutdown_ack/rx&&tx: %s \n", tf_.server ? "server" : "client");
        Finalize();
    }
    else if (tx)
    {
        printf("shutdown_ack/tx: %s \n", tf_.server ? "server" : "client");
        state_ = UCP_STATE_LAST_ACK;

        uint32_t now = ethernet_->Now();
        tf_.snd_last = now;
        tf_.snd_when = now;
        tf_.snd_retries = 1;
    }
}

void UcpConnection::DeleteAllUnsendPacket() noexcept
{
    auto tail = snd_packets_.begin();
    auto endl = snd_packets_.end();
    for (; tail != endl; )
    {
        SendPacketPtr& packet = tail->second;
        if (packet->packet_length == 0)
        {
            if (before(packet->seq, snd_seq_))
            {
                snd_seq_ = packet->seq;
            }

            tail = snd_packets_.erase(tail);
            continue;
        }

        tail++;
    }
}

void UcpConnection::Close() noexcept
{
    if (state_ == UCP_STATE_ESTABLISHED)
    {
        uint32_t now = ethernet_->Now();
        ++snd_seq_;

        tf_.snd_last = now;
        tf_.snd_when = now;
        tf_.snd_retries = 1;
        DeleteAllUnsendPacket();

        state_ = UCP_STATE_CLOSE_WAIT;
        Cmd(UCP_CMD_FIN);
    }
    else if (state_ != UCP_STATE_CLOSE_WAIT && state_ != UCP_STATE_LAST_ACK && state_ != UCP_STATE_CLOSED)
    {
        Cmd(UCP_CMD_RST);
    }
}

bool UcpConnection::Flush(bool retransmissions) noexcept /* 1, 2, 3, 4, 5, 6, 7, 7, 7, 7, 7, 7, 7 */
{
    uint32_t now = ethernet_->Now();
    int32_t rtt = -1;
    bool delack = false;

    if (tf_.delack)
    {
        uint32_t next = tf_.delack_ts + ethernet_->DelackMin;
        if (before(now, next))
        {
            delack = true;
        }
        else
        {
            int32_t delta = Rtt(now, next);
            if (delta > 0)
            {
                lasted_ts_ -= delta;
            }

            tf_.ack = true;
            tf_.delack = false;
            tf_.delack_ts = 0;
        }
    }

    if (state_ >= UCP_STATE_ESTABLISHED)
    {
        for (auto&& kv : snd_packets_)
        {
            bool sent = false;
            SendPacketPtr& packet = kv.second;
            if (state_ == UCP_STATE_ESTABLISHED && packet->packet_length == 0)
            {
                if (snd_wnd_ >= packet->length)
                {
                    sent = true;
                    snd_wnd_ -= packet->length;
                }
                else if (snd_wnd_ > 0)
                {
                    sent = true;
                    snd_wnd_ = 0;
                }

                if (sent && Cmd(packet))
                {
                    printf("send: %u\n", packet->seq);
                    int64_t snd_backlog_new = (int64_t)snd_backlog_ - (int64_t)packet->length;
                    snd_backlog_new = std::max<int64_t>(snd_backlog_new, 0);
                    
                    if (after_eq(packet->seq, trx_seq_))
                    {
                        trx_seq_ = packet->seq;
                    }

                    packet->when = now;
                    packet->last = now;
                    snd_backlog_ = static_cast<uint32_t>(snd_backlog_new);
                }
                else
                {
                    if (snd_wnd_ == 0)
                    {
                        tf_.ack = true;
                    }

                    continue;
                }
            }
            else if (packet->retries <= ethernet_->Retries2)
            {
                uint32_t next = packet->when + rcv_rto_;
                if (retransmissions)
                {
                    sent = true; 
                }
                else if (after_eq(now, next))
                {
                    uint32_t when = ethernet_->Turbo ? packet->when : packet->last;
                    sent = true;
                    rtt = std::max<int32_t>(rtt, std::max<int32_t>(0, Rtt(now, when)));
                }

                if (sent)
                {
                    UcpHeader* h = (UcpHeader*)packet->buffer.get();
                    h->remote_ts = htonl(now);
                    h->local_ts = htonl(lasted_ts_);
                }
            }
            else
            {
                Cmd(UCP_CMD_RST);
                return false;
            }

            if (sent)
            {
                packet->when = now;
                packet->retries++;
                ethernet_->Output(packet->buffer, packet->packet_length, remote_endpoint_);
            }
        }
    }

    if (state_ == UCP_STATE_ESTABLISHED)
    {
        if (tf_.fin)
        {
            ReadSomeIfAckedThenTriggerEvent();
        }
        else
        {
            uint32_t next = trx_last_ + ethernet_->RestTimeMaxLimit;
            if (after_eq(now, next))
            {
                Close();
            }
        }
    }
    else if (state_ == UCP_STATE_CLOSE_WAIT)
    {
        if (tf_.snd_retries <= ethernet_->OrphanRetries)
        {
            uint32_t next = tf_.snd_when + rcv_rto_;
            if (after_eq(now, next))
            {
                uint32_t when = ethernet_->Turbo ? tf_.snd_when : tf_.snd_last;
                tf_.snd_when = now;
                tf_.snd_retries++;
                rtt = std::max<int32_t>(rtt, std::max<int32_t>(0, Rtt(now, when)));

                Cmd(UCP_CMD_FIN);
            }
        }
        else
        {
            Cmd(UCP_CMD_RST);
            return false;
        }
    }
    else if (state_ == UCP_STATE_LAST_ACK)
    {
        uint32_t next = tf_.snd_when + ethernet_->FinTimeout;
        if (after_eq(now, next))
        {
            tf_.snd_when = now;
            tf_.snd_retries++;

            ProcessAckShutdown(true, true);
            return false;
        }
    }
    else if (state_ > UCP_STATE_CLOSED && state_ < UCP_STATE_ESTABLISHED)
    {
        bool synack = (state_ == UCP_STATE_SYN_RECVED);
        if (tf_.snd_retries <= (synack ? ethernet_->SynAckRetries : ethernet_->SynRetries))
        {
            uint32_t next = tf_.snd_when + rcv_rto_;
            if (after_eq(now, next))
            {
                uint32_t when = ethernet_->Turbo ? tf_.snd_when : tf_.snd_last;
                tf_.snd_when = now;
                tf_.snd_retries++;
                rtt = std::max<int32_t>(rtt, std::max<int32_t>(0, Rtt(now, when)));

                Cmd(synack ? UCP_CMD_SYNACK : UCP_CMD_SYN);
            }
        }
        else
        {
            Cmd(UCP_CMD_RST);
            return false;
        }
    }

    if (!delack && tf_.ack)
    {
        AckNow();
        tf_.ack = false;
        tf_.nak = false;
        tf_.delack = false;
        tf_.delack_ts = 0;
    }

    auto& flush_list = ethernet_->flush_list_;
    for (;;)
    {
        auto tail = flush_list.find(shared_from_this());
        auto endl = flush_list.end();
        if (tail != endl)
        {
            flush_list.erase(tail);
        }

        break;
    }

    return true;
}

void UcpConnection::Received(uint16_t len) noexcept
{
    uint32_t rcv_wnd = rcv_wnd_ + len;
    rcv_nxt_ += len;

    if (rcv_wnd > ethernet_->ReceiveBufferSize || rcv_wnd < rcv_wnd_)
    {
        rcv_wnd_ = ethernet_->ReceiveBufferSize;
    }
    else
    {
        rcv_wnd_ = rcv_wnd;
    }

    uint32_t rcv_ann_right_edge = rcv_ann_right_edge_ + 
        std::min<uint32_t>(ethernet_->ReceiveBufferSize >> 2, ethernet_->Mss << 2);
    if (after_eq(rcv_nxt_, rcv_ann_right_edge))
    {
        tf_.ack = true;
        rcv_ann_right_edge_ = rcv_ann_right_edge;
    }
}

bool UcpConnection::AckNow() noexcept
{
    typedef std::pair<uint32_t, uint32_t> U32U32KeyPair;

    ReadSomeIfAckedThenTriggerEvent();
    if (state_ == UCP_STATE_CLOSED)
    {
        Cmd(UCP_CMD_RST);
        return false;
    }

    std::list<uint32_t> aszs;
    std::list<U32U32KeyPair> acks;

    auto ack_tail = acks.rbegin();
    auto ack_endl = acks.rend();
    auto ack_setp =
        [](U32U32KeyPair& kv, UcpAckHeader* h, uint32_t offset) noexcept
        {
            Byte* p = ((Byte*)(h + 1)) + offset;
            if (kv.first != kv.second)
            {
                *p++ = 1;
                *(uint32_t*)p = htonl(kv.first);
                *(uint32_t*)(p + 4) = htonl(kv.second);
            }
            else
            {
                *p++ = 0;
                *(uint32_t*)p = htonl(kv.first);
            }
        };

    for (auto&& kv : rcv_packets_)
    {
        ReceivePacketPtr& node = kv.second;
        ack_tail = acks.rbegin();
        ack_endl = acks.rend();

        if (ack_tail == ack_endl)
        {
            acks.emplace_back(
                std::make_pair(node->seq, node->seq));
        }
        else
        {
            U32U32KeyPair& ukv = *ack_tail;
            if ((ukv.second + 1) == node->seq)
            {
                ukv.second = node->seq;
            }
            else
            {
                acks.emplace_back(
                    std::make_pair(node->seq, node->seq));
            }
        }
    }

    bool f = true;
    ack_tail = acks.rbegin();
    ack_endl = acks.rend();

    if (ack_tail != ack_endl)
    {
        U32U32KeyPair& ukv = *ack_tail;
        if (tf_.fin)
        {
            uint32_t nxt = (ukv.second + 1);
            if (nxt == tf_.fin_seq)
            {
                f = false;
                ukv.second = tf_.fin_seq;
            }
        }
    }

    if (f && tf_.fin)
    {
        acks.emplace_back(
            std::make_pair(tf_.fin_seq, tf_.fin_seq));
    }

    uint32_t mss = ethernet_->Mss;
    for (U32U32KeyPair& ukv : acks)
    {
        uint32_t block_size =
            ukv.first == ukv.second ? 5 : 9;

        auto asz_tail = aszs.rbegin();
        auto asz_endl = aszs.rend();
        if (asz_tail == asz_endl)
        {
            aszs.emplace_back(block_size);
        }
        else
        {
            uint32_t& size = (*asz_tail);
            uint32_t temp = size + block_size;
            if (temp <= mss)
            {
                size = temp;
            }
            else
            {
                aszs.emplace_back(block_size);
            }
        }
    }

    uint32_t now = ethernet_->Now();
    if (aszs.empty())
    {
        std::shared_ptr<Byte> packet = make_shared_alloc<Byte>(sizeof(UcpAckHeader));
        if (NULL != packet)
        {
            UcpAckHeader* h = (UcpAckHeader*)packet.get();
            h->cmd = UCP_CMD_ACK;
            h->local_ts = htonl(lasted_ts_);
            h->remote_ts = htonl(now);
            h->wnd = htons(rcv_wnd_);
            h->session_id = htons(session_id_);
            h->ack = htonl(rcv_ack_);

            ethernet_->Output(packet, sizeof(UcpAckHeader), remote_endpoint_);
        }
    }
    else
    {
        auto asz_tail = aszs.begin();
        auto asz_endl = aszs.end();

        uint32_t offset = 0;
        std::shared_ptr<Byte> packet;

        for (U32U32KeyPair& kv : acks)
        {
        retry:
            if (asz_tail == asz_endl)
            {
                break;
            }

            uint32_t packet_size = *asz_tail + sizeof(UcpAckHeader);
            UcpAckHeader* h = NULL;
            if (NULL != packet)
            {
                h = (UcpAckHeader*)packet.get();
            }
            else
            {
                packet = make_shared_alloc<Byte>(packet_size);
                if (NULL == packet)
                {
                    break;
                }

                h = (UcpAckHeader*)packet.get();
                h->ack = htonl(rcv_ack_);
                h->cmd = UCP_CMD_ACK;
                h->local_ts = htonl(lasted_ts_);
                h->remote_ts = htonl(now);
                h->wnd = htons(rcv_wnd_);
                h->session_id = htons(session_id_);
            }

            uint32_t next = kv.first != kv.second ? offset + 9 : offset + 5;
            if (next > mss)
            {
                uint32_t packet_size = *asz_tail + sizeof(UcpAckHeader);
                ethernet_->Output(packet, packet_size, remote_endpoint_);

                asz_tail++;
                offset = 0;
                packet = NULL;
                goto retry;
            }
            else
            {
                ack_setp(kv, h, offset);
                offset = next;
            }
        }

        if (offset > 0)
        {
            UcpAckHeader* h = (UcpAckHeader*)packet.get();
            if (tf_.nak)
            {
                tf_.nak = false;
                h->cmd = UCP_CMD_NAK;
            }

            uint32_t packet_size = *asz_tail + sizeof(UcpAckHeader);
            ethernet_->Output(packet, packet_size, remote_endpoint_);
        }
    }

    return true;
}

bool UcpConnection::Send(const void* buffer, int buffer_size, SendAsyncCallback ac) noexcept
{
    if (NULL == buffer || buffer_size < 1)
    {
        return false;
    }

    if (NULL == ac)
    {
        return false;
    }

    if (state_ != UCP_STATE_ESTABLISHED)
    {
        return false;
    }

    if (tf_.fin)
    {
        Flush();
        return false;
    }

    uint32_t buffer_offset = 0;
    uint32_t mss = ethernet_->Mss;
    uint32_t buffer_size_raw = static_cast<uint32_t>(buffer_size);

    auto send_tail = snd_packets_.rbegin();
    auto send_endl = snd_packets_.rend();
    if (send_tail != send_endl)
    {
        SendPacketPtr& left_packet = send_tail->second;
        if (left_packet->packet_length == 0)
        {
            uint32_t available_size = mss - left_packet->length;
            if (available_size > 0)
            {
                uint32_t fragment_size = buffer_size_raw;
                if (buffer_size_raw > available_size)
                {
                    fragment_size = available_size;
                    buffer_offset = available_size;
                }

                uint32_t packet_new_size = left_packet->length + fragment_size;
                std::shared_ptr<Byte> packet_new = make_shared_alloc<Byte>(packet_new_size);
                if (NULL == packet_new)
                {
                    return false;
                }
                else
                {
                    (*left_packet)();
                }

                memcpy(packet_new.get(), left_packet->buffer.get(), left_packet->length);
                memcpy(packet_new.get() + left_packet->length, buffer, fragment_size);

                left_packet->ac_length = fragment_size;
                left_packet->ac = std::move(ac);
                left_packet->buffer = packet_new;
                left_packet->length = packet_new_size;

                ac = NULL;
                if (buffer_offset == 0)
                {
                    buffer_size_raw = 0;
                }
                else
                {
                    buffer_size_raw = buffer_size_raw - buffer_offset;
                }
            }
        }
    }

    uint32_t snd_seq_raw = snd_seq_;
    static auto clean_err_packets =
        [](UcpConnection* my, uint32_t n) noexcept
        {
            auto& snd_packets = my->snd_packets_;
            for (;;)
            {
                auto tail = snd_packets.find(n++);
                auto endl = snd_packets.end();
                if (tail == endl)
                {
                    break;
                }

                snd_packets.erase(tail);
            }
        };

    bool flush = false;
    while (buffer_size_raw > 0)
    {
        SendPacketPtr send_packet = make_shared_object<SendPacket>();
        if (NULL == send_packet)
        {
            clean_err_packets(this, snd_seq_raw);
            return false;
        }

        send_packet->seq = ++snd_seq_;
        send_packet->retries = 0;
        send_packet->length = 0;
        send_packet->when = 0;
        send_packet->last = 0;
        send_packet->ac_length = 0;
        send_packet->packet_length = 0;

        if (buffer_size_raw > mss)
        {
            std::shared_ptr<Byte> buffer_new = make_shared_alloc<Byte>(mss);
            if (NULL == buffer_new)
            {
                clean_err_packets(this, snd_seq_raw);
                return false;
            }

            memcpy(buffer_new.get(), (Byte*)buffer + buffer_offset, mss);
            send_packet->length = mss;
            send_packet->buffer = buffer_new;

            if (NULL != ac)
            {
                send_packet->ac = std::move(ac);
                ac = NULL;

                send_packet->ac_length = mss;
            }

            buffer_offset += mss;
            buffer_size_raw -= mss;
            if (!snd_packets_.emplace(send_packet->seq, send_packet).second)
            {
                clean_err_packets(this, snd_seq_raw);
                return false;
            }
        }
        else
        {
            send_packet->length = buffer_size_raw;
            if (buffer_offset == 0)
            {
                std::shared_ptr<Byte> chunk = make_shared_alloc<Byte>(buffer_size);
                if (NULL == chunk)
                {
                    clean_err_packets(this, snd_seq_raw);
                    return false;
                }

                memcpy(chunk.get(), buffer, buffer_size);
                send_packet->buffer = chunk;
                send_packet->length = buffer_size_raw;
                send_packet->ac = ac;
                send_packet->ac_length = buffer_size_raw;
            }
            else
            {
                std::shared_ptr<Byte> buffer_new = make_shared_alloc<Byte>(buffer_size_raw);
                if (NULL == buffer_new)
                {
                    clean_err_packets(this, snd_seq_raw);
                    return false;
                }

                memcpy(buffer_new.get(), (Byte*)buffer + buffer_offset, buffer_size_raw);
                send_packet->buffer = buffer_new;
                send_packet->length = buffer_size_raw;

                if (NULL != ac)
                {
                    send_packet->ac = std::move(ac);
                    ac = NULL;

                    send_packet->ac_length = buffer_size_raw;
                }
            }

            buffer_offset += buffer_size_raw;
            buffer_size_raw = 0;
            if (!snd_packets_.emplace(send_packet->seq, send_packet).second)
            {
                clean_err_packets(this, snd_seq_raw);
                return false;
            }
        }
    }

    uint32_t snd_threshold = std::min<uint32_t>(ethernet_->ReceiveBufferSize >> 1, mss);
    snd_backlog_ += static_cast<uint32_t>(buffer_size);

    std::shared_ptr<boost::asio::io_context>& context = ethernet_->context_;
    if (snd_backlog_ < snd_threshold)
    {
        auto packet_tail = snd_packets_.begin();
        auto packet_endl = snd_packets_.end();
        for (; packet_tail != packet_endl; packet_tail++)
        {
            SendPacketPtr& packet = packet_tail->second;
            if (packet->ac_length != 0)
            {
                (*packet)();
                if (snd_backlog_ >= snd_threshold)
                {
                    break;
                }
            }
        }
    }
    else
    {
        flush = snd_wnd_ > 0;
    }

    if (tf_.fst)
    {
        flush = true;
        tf_.fst = false;
    }

    if (flush)
    {
        Flush();
    }
    else
    {
        auto self = shared_from_this();
        ethernet_->flush_list_.emplace(self);
    }

    return true;
}

uint32_t UcpConnection::SendBacklogBytesSize(bool all) noexcept
{
    if (all)
    {
        uint32_t packet_count = snd_packets_.size();
        if (packet_count == 0)
        {
            return 0;
        }

        auto tail = snd_packets_.rbegin();
        SendPacketPtr packet = tail->second;

        if (packet_count == 1)
        {
            return packet->length;
        }

        uint32_t fragment_size = (packet_count - 1) * ethernet_->Mss;
        return fragment_size + packet->length;
    }
    else
    {
        return snd_backlog_;
    }
}

void UcpConnection::SendPacket::operator()() noexcept
{
    SendPacket* my = this;
    SendAsyncCallback ac = std::move(my->ac);
    if (NULL != ac)
    {
        uint32_t length = my->ac_length;
        my->ac_length = 0;

        ac(length);
    }
}

bool UcpConnection::ReadNative(const void* buffer, uint32_t buffer_size, uint32_t length, const ReceiveAsyncCallback& ac) noexcept
{
    if (NULL == ac)
    {
        return false;
    }

    return ReadSome(buffer, length,
        [this, buffer, buffer_size, length, ac](uint32_t bytes_transferred) noexcept
        {
            std::shared_ptr<boost::asio::io_context>& context = ethernet_->context_;
            if (bytes_transferred == 0)
            {
                ac(0);
            }
            else if (length <= bytes_transferred)
            {
                context->post(
                    [ac, buffer_size]() noexcept
                    {
                        ac(buffer_size);
                    });
            }
            else
            {
                uint8_t* next = ((uint8_t*)buffer) + bytes_transferred;
                uint32_t size = length - bytes_transferred;
                context->post(
                    [this, next, buffer_size, size, ac]() noexcept
                    {
                        if (size == 0 || !ReadNative(next, buffer_size, size, ac))
                        {
                            ac(buffer_size);
                        }
                    });
            }
        });
}

bool UcpConnection::ReadSome(const void* buffer, uint32_t length, const ReceiveAsyncCallback& ac) noexcept
{
    if (NULL == buffer || length == 0)
    {
        return false;
    }

    if (NULL == ac)
    {
        return false;
    }

    if (state_ < UCP_STATE_ESTABLISHED)
    {
        return false;
    }

    ReadSomeIfAckedThenTriggerEvent();
    if (rcv_event_.length != 0)
    {
        return false;
    }

    rcv_event_.ac = ac;
    rcv_event_.length = length;
    rcv_event_.buffer = (void*)buffer;

    ReadSomeIfAckedThenTriggerEvent();
    Flush();
    return true;
}

bool UcpConnection::ReadSomeIfAckedThenTriggerEvent() noexcept
{
    bool fin = false;
    bool any = false;
    uint32_t length = 0;

    if (rcv_event_.length != 0)
    {
        uint32_t remain = rcv_event_.length;
        for (;;)
        {
            auto tail = rcv_packets_.begin();
            auto endl = rcv_packets_.end();
            if (tail == endl)
            {
                break;
            }

            ReceivePacketPtr pkg = tail->second;
            if (pkg->seq != rcv_ack2_)
            {
                break;
            }

            uint32_t bytes_transferred = std::min<uint32_t>(pkg->length - pkg->offset, remain);
            if (bytes_transferred == 0)
            {
                break;
            }
            else
            {
                any = true;
                memcpy(rcv_event_.buffer, pkg->packet.get() + pkg->offset, bytes_transferred);
            }

            pkg->offset += bytes_transferred;
            if (pkg->offset >= pkg->length)
            {
                rcv_ack2_++;
                rcv_packets_.erase(tail);
            }

            remain -= bytes_transferred;
            length += bytes_transferred;
        }
    }

    if ((length == 0) && (tf_.fin && state_ == UCP_STATE_ESTABLISHED) && (before_eq(rcv_ack_, tf_.fin_seq)))
    {
        auto recv_tail = rcv_packets_.begin();
        auto recv_endl = rcv_packets_.end();
        if (recv_tail == recv_endl)
        {
            any = true;
            fin = true;
            rcv_ack_ = tf_.fin_seq;
        }
    }

    if (any)
    {
        ReceiveAsyncCallback ac = std::move(rcv_event_.ac);
        rcv_event_.ac = NULL;
        rcv_event_.length = 0;
        rcv_event_.buffer = NULL;
        if (length > 0)
        {
            uint32_t now = ethernet_->Now();
            trx_last_ = now;

            Received(length);
        }

        if (NULL != ac)
        {
            ac(length);
        }
    }

    if (fin)
    {
        Close();
    }

    return any;
}

bool UcpConnection::Rto(uint32_t now, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts) noexcept
{
    int32_t rtt = Rtt(now, local_ts);
    if (rtt >= 0)
    {
        Rto(rtt);
    }

    snd_wnd_ = wnd;
    lasted_ts_ = remote_ts;
    return true;
}

bool UcpConnection::ProcessPush(uint32_t seq, uint32_t ack_no, const uint8_t* payload, uint32_t payload_size, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts) noexcept
{
    if (NULL == payload || payload_size < 1)
    {
        return false;
    }

    return ProcessCommon(seq, ack_no, wnd, remote_ts, local_ts,
        [this, payload, payload_size, seq](uint32_t now, bool* delay) noexcept
        {
            auto tail = rcv_packets_.find(seq);
            auto endl = rcv_packets_.end();
            if (tail != endl)
            {
                ReceivePacketPtr& left = tail->second;
                if (left->length >= payload_size)
                {
                    return false;
                }
            }

            ReceivePacketPtr packet = make_shared_object<ReceivePacket>();
            if (NULL == packet)
            {
                return false;
            }

            std::shared_ptr<Byte> buffer = make_shared_alloc<Byte>(payload_size);
            if (NULL == buffer)
            {
                return false;
            }

            packet->seq = seq;
            packet->offset = 0;
            packet->packet = buffer;
            packet->length = payload_size;
            memcpy(buffer.get(), payload, payload_size);

            printf("rccv: %u\n", seq);
            if (tail != endl)
            {
                tail->second = packet;
            }
            else
            {
                rcv_packets_.emplace(seq, packet);
            }

            *delay = true;
            if (rcv_wnd_ < payload_size)
            {
                rcv_wnd_ = 0;
            }
            else
            {
                rcv_wnd_ -= payload_size;
            }

            return true;
        });
}

bool UcpConnection::ProcessHalfoff(uint32_t seq, uint32_t ack, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts) noexcept
{
    return ProcessCommon(seq, ack, wnd, remote_ts, local_ts,
        [this, seq](uint32_t now, bool*) noexcept -> bool
        {
            for (;;)
            {
                if (state_ < UCP_STATE_ESTABLISHED)
                {
                    return false;
                }

                if (tf_.fin)
                {
                    return true;
                }

                tf_.ack = true;
                tf_.fin = true;
                tf_.fin_seq = seq;
                DeleteAllUnsendPacket();
                return true;
            }
        });
}

bool UcpConnection::ProcessCommon(uint32_t seq, uint32_t ack_no, uint32_t wnd, uint32_t remote_ts, uint32_t local_ts, const std::function<bool(uint32_t, bool*)>& h1) noexcept
{
    uint32_t now = ethernet_->Now();
    if (!Rto(now, wnd, remote_ts, local_ts))
    {
        return false;
    }

    ProcessAckAccumulation(ack_no);
    ReadSomeIfAckedThenTriggerEvent();

    bool ack = false;
    bool nak = false;
    bool delack = false;
    if ((before_eq(rcv_ack_, seq)) && (ack = h1(now, &delack)))
    {
        auto tail = rcv_packets_.begin();
        auto endl = rcv_packets_.end();
        for (; tail != endl; tail++)
        {
            ReceivePacketPtr& i = tail->second;
            if (i->seq != rcv_ack_)
            {
                if (after(i->seq, rcv_ack_))
                {
                    break;
                }
            }
            else
            {
                rcv_ack_++;
            }
        }
    }

    ReadSomeIfAckedThenTriggerEvent();
    if (rcv_wnd_ == 0)
    {
        ack = true;
        delack = true;
    }

    if (ack)
    {
        tf_.ack = ack;
        tf_.nak = nak;
        if (delack && !tf_.delack)
        {
            if (rcv_wnd_ > 0)
            {
                tf_.delack = true;
                tf_.delack_ts = now;
            }
        }

        Flush();
    }

    return true;
}

UcpEthernet::UcpEthernet(const std::shared_ptr<boost::asio::io_context>& context, int bind_port, const std::shared_ptr<Byte>& buffer, uint32_t buffer_size) noexcept
    : context_(context)
    , socket_(*context)
    , timeout_(*context)
    , now_(0)
    , buffer_(buffer)
    , buffer_size_(buffer_size)
{
    if (bind_port < 0 || bind_port > UINT16_MAX)
    {
        bind_port = 0;
    }

    boost::system::error_code ec;
    socket_.open(boost::asio::ip::udp::v6(), ec);

    if (!ec)
    {
        boost::asio::ip::udp::endpoint bind(boost::asio::ip::address_v6::any(), bind_port);
        socket_.bind(bind, ec);

        if (ec)
        {
            socket_.close(ec);
        }
    }

    now_ = __GetTickCount();
}

UcpEthernet::UcpEthernet(const std::shared_ptr<boost::asio::io_context>& context, int bind_port) noexcept
    : UcpEthernet(context, bind_port, make_shared_alloc<Byte>(UINT16_MAX), UINT16_MAX)
{

}

UcpEthernet::~UcpEthernet() noexcept
{
    Finalize();
}

void UcpEthernet::Finalize() noexcept
{
    for (auto&& kv : connections_)
    {
        ConnectionPtr& connection = kv.second;
        connection->Cmd(UCP_CMD_RST);
    }

    boost::system::error_code ec;
    socket_.close(ec);
    timeout_.cancel(ec);

    flush_list_.clear();
    connections_.clear();
}

void UcpEthernet::DeleteConnection(UcpConnection* connection) noexcept
{
    UcpEthernet::ConnectionKey key;
    key.session_id = connection->session_id_;
    key.host = connection->remote_endpoint_.address();
    key.port = connection->remote_endpoint_.port();

    auto tail = connections_.find(key);
    auto endl = connections_.end();
    if (tail != endl)
    {
        connections_.erase(tail);
    }
}

UcpEthernet::ConnectionPtr UcpEthernet::FindConnection(uint16_t session_id) noexcept
{
    if (session_id == 0)
    {
        return NULL;
    }

    ConnectionKey key;
    key.host = source_endpoint_.address();
    key.port = source_endpoint_.port();
    key.session_id = session_id;

    ConnectionTable::iterator tail = connections_.find(key);
    ConnectionTable::iterator endl = connections_.end();
    return tail != endl ? tail->second : NULL;
}

bool UcpEthernet::PacketInput(const void* packet, uint32_t packet_length) noexcept
{
    if (NULL == packet || packet_length < sizeof(UcpHeader))
    {
        return false;
    }
    else
    {
        uint32_t rx_packet_loss_rate = RxPacketLossRate;
        if (rx_packet_loss_rate > 0)
        {
            uint32_t rate = (uint32_t)RandomNext(0, 100);
            if (rate < rx_packet_loss_rate)
            {
                return false;
            }
        }
    }

    UcpHeader* h = (UcpHeader*)packet;
    uint8_t cmd = h->cmd;
    uint16_t wnd = ntohs(h->wnd);
    uint16_t session_id = ntohs(h->session_id);
    uint32_t local_ts = ntohl(h->local_ts);
    uint32_t remote_ts = ntohl(h->remote_ts);

    if (cmd == UCP_CMD_PSH)
    {
        if (packet_length < sizeof(UcpAckHeader))
        {
            Rst(session_id, remote_ts);
            return false;
        }

        ConnectionPtr connection = FindConnection(session_id);
        if (NULL == connection)
        {
            Rst(session_id, remote_ts);
            return false;
        }

        UcpPshHeader* h2 = (UcpPshHeader*)h;
        uint32_t ack = htonl(h2->ack);
        uint32_t seq = htonl(h2->seq);

        uint8_t* payload = (uint8_t*)(h2 + 1);
        uint32_t payload_size = packet_length - sizeof(*h2);
        return connection->ProcessPush(seq, ack, payload, payload_size, wnd, remote_ts, local_ts);
    }

    bool nak = false;
    if ((cmd == UCP_CMD_ACK) || (nak = (cmd == UCP_CMD_NAK)))
    {
        if (packet_length < sizeof(UcpAckHeader))
        {
            Rst(session_id, remote_ts);
            return false;
        }

        ConnectionPtr connection = FindConnection(session_id);
        if (NULL == connection)
        {
            Rst(session_id, remote_ts);
            return false;
        }

        UcpAckHeader* h2 = (UcpAckHeader*)h;
        uint32_t ack = htonl(h2->ack);

        uint8_t* acks = (uint8_t*)(h2 + 1);
        uint32_t ack_size = packet_length - sizeof(*h2);
        return connection->ProcessAck(ack, acks, ack_size, cmd, wnd, remote_ts, local_ts, nak);
    }
    else if (cmd == UCP_CMD_FIN)
    {
        ConnectionPtr connection = FindConnection(session_id);
        if (NULL == connection)
        {
            Rst(session_id, remote_ts);
            return false;
        }

        UcpPshHeader* h2 = (UcpPshHeader*)h;
        uint32_t ack = htonl(h2->ack);
        uint32_t seq = htonl(h2->seq);

        return connection->ProcessHalfoff(seq, ack, wnd, remote_ts, local_ts);
    }
    else if (cmd == UCP_CMD_SYN)
    {
        if (session_id == 0 || packet_length < sizeof(UcpAckHeader))
        {
            Rst(session_id, remote_ts);
            return false;
        }

        UcpAckHeader* h2 = (UcpAckHeader*)h;
        uint32_t ack = htonl(h2->ack);

        ConnectionPtr connection = FindConnection(session_id);
        if (NULL == connection)
        {
            connection = make_shared_object<UcpConnection>(shared_from_this());
            if (NULL == connection)
            {
                Rst(session_id, remote_ts);
                return false;
            }

            uint32_t now = Now();
            connection->session_id_ = session_id;
            connection->rcv_ack_ = ack + 1;
            connection->rcv_ack2_ = connection->rcv_ack_;
            connection->rcv_duplicate_ack_ = connection->rcv_ack2_;
            connection->snd_seq_ = RandomNext(1, INT32_MAX);
            connection->trx_seq_ = connection->snd_seq_;
            connection->remote_endpoint_ = source_endpoint_;

            connection->tf_.server = true;
            connection->tf_.snd_last = now;
            connection->tf_.snd_when = now;
            connection->tf_.snd_retries = 1;
            connection->state_ = UCP_STATE_SYN_RECVED;

            ConnectionKey key;
            key.session_id = session_id;
            key.host = source_endpoint_.address();
            key.port = source_endpoint_.port();

            connections_.emplace(key, connection);
        }

        connection->lasted_ts_ = remote_ts;
        connection->snd_wnd_ = wnd;
        connection->rcv_nxt_ = 0;
        connection->rcv_ann_right_edge_ = 0;

        return connection->Cmd(UCP_CMD_SYNACK);
    }
    else if (cmd == UCP_CMD_SYNACK)
    {
        if (packet_length < sizeof(UcpPshHeader))
        {
            Rst(session_id, remote_ts);
            return false;
        }
        
        for (;;)
        {
            ConnectionPtr connection = FindConnection(session_id);
            if (NULL != connection)
            {
                UcpPshHeader* h2 = (UcpPshHeader*)h;
                uint32_t ack = htonl(h2->ack);
                uint32_t seq = htonl(h2->seq);
                uint32_t nxt = connection->snd_seq_ + 1;
                if (ack == nxt)
                {
                    uint32_t now = Now();
                    if (connection->state_ == UCP_STATE_SYN_SENT || connection->state_ == UCP_STATE_ESTABLISHED)
                    {
                        if (connection->Rto(now, wnd, remote_ts, local_ts))
                        {
                            if (connection->state_ != UCP_STATE_SYN_SENT)
                            {
                                connection->AckNow();
                            }
                            else
                            {
                                connection->trx_last_ = now;
                                connection->snd_seq_ = nxt;
                                connection->trx_seq_ = connection->snd_seq_;
                                connection->rcv_ack_ = seq + 1;
                                connection->rcv_ack2_ = connection->rcv_ack_;
                                connection->rcv_duplicate_ack_ = connection->rcv_ack2_;
                                connection->state_ = UCP_STATE_ESTABLISHED;
                                connection->AckNow();

                                ConnectEventHandler connect_event = std::move(connection->connect_event_);
                                connection->connect_event_ = NULL;

                                if (NULL != connect_event)
                                {
                                    connect_event(connection.get(), true);
                                }
                            }

                            return true;
                        }
                    }
                }
                else if (before(ack, connection->snd_seq_))
                {
                    if (connection->state_ == UCP_STATE_ESTABLISHED)
                    {
                        connection->AckNow();
                        return true;
                    }
                }
            }

            Rst(session_id, remote_ts);
            return false;
        }
    }
    else if (cmd == UCP_CMD_RST)
    {
        ConnectionPtr connection = FindConnection(session_id);
        if (NULL == connection)
        {
            return false;
        }

        if (packet_length >= sizeof(UcpAckHeader)) 
        {
            UcpAckHeader* h2 = (UcpAckHeader*)h;
            uint32_t ack = htonl(h2->ack);

            uint32_t now = Now();
            if (connection->Rto(now, wnd, remote_ts, local_ts))
            {
                connection->ProcessAckAccumulation(ack);
                connection->ReadSomeIfAckedThenTriggerEvent();
            }
        }

        connection->ProcessAckShutdown(true, true);
        return true;
    }
    else
    {
        Rst(session_id, remote_ts);
        return false;
    }
}

bool UcpEthernet::Rst(uint32_t session_id, uint32_t remote_ts) noexcept
{
    ConnectionPtr connection = FindConnection(session_id);
    if (NULL != connection)
    {
        return connection->Cmd(UCP_CMD_RST);
    }
    else 
    {
        UcpHeader h;
        h.cmd = UCP_CMD_RST;
        h.local_ts = htonl(remote_ts);
        h.remote_ts = htonl(Now());
        h.wnd = htons(0);
        h.session_id = htons(session_id);

        return Output(&h, sizeof(UcpHeader), source_endpoint_);
    }
}

void UcpEthernet::Update() noexcept
{
    ConnectionTable::iterator tail = connections_.begin();
    ConnectionTable::iterator endl = connections_.end();
    now_ = __GetTickCount();

    for (; tail != endl;)
    {
        ConnectionPtr connection = tail->second;
        tail++;

        connection->Flush();
        if (connection->state_ == UCP_STATE_CLOSED)
        {
            DeleteConnection(connection.get());
        }
    }
}

void UcpEthernet::FlushAll() noexcept
{
    auto flush_list = std::move(flush_list_);
    flush_list_.clear();

    for (const ConnectionPtr& connection : flush_list)
    {
        connection->Flush();
    }
}

bool UcpEthernet::Output(const void* packet, uint32_t packet_length, const boost::asio::ip::udp::endpoint& remote_endpoint) noexcept
{
    if (NULL == packet || packet_length < 1)
    {
        return false;
    }

    bool opened = socket_.is_open();
    if (opened)
    {
        uint32_t tx_packet_loss_rate = TxPacketLossRate;
        if (tx_packet_loss_rate > 0)
        {
            uint32_t rate = (uint32_t)RandomNext(0, 100);
            if (rate < tx_packet_loss_rate)
            {
                return true;
            }
        }

        boost::system::error_code ec;
        socket_.send_to(boost::asio::buffer(packet, packet_length), remote_endpoint, boost::asio::socket_base::message_end_of_record, ec);

        if (ec == boost::system::errc::success)
        {
            return true;
        }
    }

    return false;
}

bool UcpEthernet::Run() noexcept
{
    return NextTimeout() && ReceiveLoopback();
}

bool UcpEthernet::NextTimeout() noexcept
{
    bool opened = socket_.is_open();
    if (!opened)
    {
        return false;
    }

    auto self = shared_from_this();
    now_ = __GetTickCount();

    timeout_.expires_from_now(boost::posix_time::milliseconds(1));
    timeout_.async_wait(
        [self, this](boost::system::error_code ec) noexcept
        {
            if (ec == boost::system::errc::operation_canceled)
            {
                return false;
            }
            else if (ec == boost::system::errc::success)
            {
                uint32_t next = std::max<uint32_t>(Interval >> 1, IntervalMin);
                if (after_eq(now_, next))
                {
                    Update();
                }

                FlushAll();
            }

            NextTimeout();
            return true;
        });
    return true;
}

bool UcpEthernet::ReceiveLoopback() noexcept
{
    bool opened = socket_.is_open();
    if (!opened)
    {
        return false;
    }

    auto self = shared_from_this();
    socket_.async_receive_from(boost::asio::buffer(buffer_.get(), buffer_size_), source_endpoint_,
        [self, this](boost::system::error_code ec, std::size_t sz) noexcept
        {
            if (ec != boost::system::errc::operation_canceled)
            {
                if (ec == boost::system::errc::success)
                {
                    PacketInput(buffer_.get(), sz);
                }

                ReceiveLoopback();
            }
        });
    return true;
}

bool UcpEthernet::Accept(const ConnectionPtr& connection) noexcept
{
    AcceptEventHandler event_handler = AcceptEvent;
    if (NULL != event_handler)
    {
        return event_handler(connection);
    }

    return false;
}

UcpEthernet::ConnectionPtr UcpEthernet::Connect(const boost::asio::ip::address& host, int port, const ConnectEventHandler& ac) noexcept
{
    static std::atomic<uint16_t> session_id = RandomNext(1, INT16_MAX);

    if (NULL == ac)
    {
        return NULL;
    }

    if (host.is_multicast() || host.is_unspecified())
    {
        return NULL;
    }

    if (port <= 0 || port > UINT16_MAX)
    {
        return NULL;
    }

    bool opened = socket_.is_open();
    if (!opened)
    {
        return NULL;
    }

    for (;;)
    {
        uint16_t id = ++session_id;
        if (id == 0)
        {
            session_id = 0;
            continue;
        }

        ConnectionKey key;
        key.session_id = id;
        key.host = host;
        key.port = port;

        auto tail = connections_.find(key);
        auto endl = connections_.end();
        if (tail != endl)
        {
            continue;
        }

        std::shared_ptr<UcpConnection> connection = make_shared_object<UcpConnection>(shared_from_this());
        if (NULL == connection)
        {
            return NULL;
        }

        uint32_t now = Now();
        connection->tf_.snd_last = now;
        connection->tf_.snd_when = now;
        connection->tf_.snd_retries = 1;

        connection->connect_event_ = ac;
        connection->lasted_ts_ = 0;
        connection->session_id_ = id;
        connection->state_ = UCP_STATE_SYN_SENT;
        connection->rcv_ack_ = 0;
        connection->rcv_ack2_ = 0;
        connection->rcv_duplicate_ack_ = 0;
        connection->snd_seq_ = RandomNext(1, INT32_MAX);
        connection->trx_seq_ = connection->snd_seq_;

        connection->rcv_ann_right_edge_ = 0;
        connection->rcv_nxt_ = 0;
        connection->rcv_wnd_ = ReceiveBufferSize;
        connection->remote_endpoint_ = boost::asio::ip::udp::endpoint(host, port);

        if (!connections_.emplace(key, connection).second)
        {
            return NULL;
        }

        connection->Cmd(UCP_CMD_SYN);
        return connection;
    }
}

static void receive_loop(UcpEthernet::ConnectionPtr connection, char* buff, int* count) noexcept
{
    connection->Read(buff, 1000,
        [connection, buff, count](uint32_t bytes_transferred) noexcept
        {
            buff[bytes_transferred] = '\x0';
            printf("recv: %s, bytes_transferred: %u\n", buff, bytes_transferred);

            if (bytes_transferred > 0)
            {
                receive_loop(connection, buff, count);
                if ((*count)++ >= 100)
                {
                    connection->Close();
                }
            }
            else
            {
                printf("recv: 0字节,中断链接\n");
            }
        });
}

static void sent_loop(UcpEthernet::ConnectionPtr connection, const char* buffer, int buffer_size) noexcept
{
    connection->Send(buffer, buffer_size,
        [connection, buffer, buffer_size](uint32_t bytes_transferred) noexcept
        {
            if (bytes_transferred != 0)
            {
                sent_loop(connection, buffer, buffer_size);
            }
        });
    connection->Flush();
}

static UcpEthernet::ConnectionPtr server_connection;
static UcpEthernet::ConnectionPtr client_connection;

int main(int argc, const char* argv[])
{
    std::shared_ptr<boost::asio::io_context> context = make_shared_object<boost::asio::io_context>();
    boost::asio::io_context::work work(*context);

    std::shared_ptr<UcpEthernet> server = make_shared_object<UcpEthernet>(context, 55555);
    server->Run();
    server->AcceptEvent = 
        [](const UcpEthernet::ConnectionPtr& connection) noexcept
        {
            static char buff[65536];
            static int count = 0;
            server_connection = connection;

            receive_loop(connection, buff, &count);
            sent_loop(connection->shared_from_this(), "QWERTYUIOP", 10);
            return true;
        };

    std::shared_ptr<UcpEthernet> client = make_shared_object<UcpEthernet>(context, 0);
    client->Run();
    client->Connect(boost::asio::ip::address_v6::loopback(), 55555, 
        [](UcpConnection* connection, bool connected) noexcept
        {
            client_connection = connection->shared_from_this();

            printf("connected is %d\n", connected);
            if (connected)
            {
                static char buff[65536];
                static int count = 0;

                receive_loop(connection->shared_from_this(), buff, &count);
                sent_loop(connection->shared_from_this(), "ASDFGHJKLZ", 10);
            }
        });

    context->run();
    return 0;
}


网站公告

今日签到

点亮在社区的每一天
去签到