From b8c32f11ef599c3de5ed3edcd46198b119cd0acb Mon Sep 17 00:00:00 2001 From: kingecg Date: Tue, 1 Jul 2025 20:59:02 +0800 Subject: [PATCH] =?UTF-8?q?"=E5=AE=8C=E5=96=84README=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E5=92=8C=E4=BB=A3=E7=A0=81=E6=B3=A8=E9=87=8A=EF=BC=8C=E8=AF=A6?= =?UTF-8?q?=E7=BB=86=E8=AF=B4=E6=98=8E=E6=A1=86=E6=9E=B6=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=92=8C=E4=BD=BF=E7=94=A8=E6=96=B9=E6=B3=95"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++++- network.go | 57 ++++++++----- server.go | 139 +++++++++++++++++++++---------- 3 files changed, 365 insertions(+), 67 deletions(-) diff --git a/README.md b/README.md index 5e8a65d..490e898 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,236 @@ -# network +# 高性能网络通信框架 +一个用 Go 语言编写的高性能、高并发网络通信框架,支持 TCP 和 UDP 协议。该框架专为高吞吐量和低延迟的网络应用设计,适用于游戏服务器、聊天系统、实时数据处理等场景。 + +## 主要特性 + +- **协议支持**:同时支持 TCP 和 UDP 协议 +- **高并发处理**:使用工作协程池高效处理请求 +- **连接分片管理**:通过分片减少锁竞争,提高并发性能 +- **自动连接生命周期管理**:自动处理连接的创建、维护和关闭 +- **内存优化**:使用缓冲区池和对象池减少内存分配和 GC 压力 +- **优雅关闭**:支持服务器的平滑关闭,确保数据不丢失 +- **超时控制**:支持读写超时和空闲连接超时设置 +- **可定制的数据包处理**:灵活的数据包接口,支持自定义协议 + +## 安装 + +```bash +go get github.com/yourusername/network +``` + +## 快速开始 + +### TCP 服务器示例 + +```go +package main + +import ( + "log" + "time" + + "github.com/yourusername/network" +) + +// 自定义数据包 +type MyPacket struct { + Type byte + Length uint16 + Payload []byte +} + +// 实现 Packet 接口 +func (p *MyPacket) Decode(data []byte) error { + if len(data) < 3 { + return errors.New("invalid packet") + } + p.Type = data[0] + p.Length = uint16(data[1])<<8 | uint16(data[2]) + if len(data) < int(3+p.Length) { + return errors.New("incomplete packet") + } + p.Payload = data[3:3+p.Length] + return nil +} + +func (p *MyPacket) Encode() ([]byte, error) { + buf := make([]byte, 3+len(p.Payload)) + buf[0] = p.Type + buf[1] = byte(len(p.Payload) >> 8) + buf[2] = byte(len(p.Payload)) + copy(buf[3:], p.Payload) + return buf, nil +} + +func main() { + // 创建服务器配置 + config := network.ServerConfig{ + Network: "tcp", + Address: ":8080", + WorkerNum: 100, // 工作协程数量 + QueueSize: 1000, // 任务队列大小 + MaxConn: 10000, // 最大连接数 + ReadTimeout: 30 * time.Second, + IdleTimeout: 5 * time.Minute, + } + + // 创建处理函数 + handler := func(conn network.Conn, packet network.Packet) (network.Packet, error) { + p := packet.(*MyPacket) + log.Printf("Received packet: Type=%d, Payload=%s", p.Type, string(p.Payload)) + + // 创建响应 + response := &MyPacket{ + Type: p.Type, + Payload: []byte("Response: " + string(p.Payload)), + } + return response, nil + } + + // 创建数据包工厂函数 + packetFactory := func() network.Packet { + return &MyPacket{} + } + + // 创建并启动服务器 + server := network.NewServer(config, handler, packetFactory) + if err := server.Start(); err != nil { + log.Fatalf("Failed to start server: %v", err) + } + + // 等待信号退出 + // ... + + // 优雅关闭 + server.Stop() +} +``` + +### UDP 服务器示例 + +```go +package main + +import ( + "log" + "time" + + "github.com/yourusername/network" +) + +func main() { + // 创建服务器配置 + config := network.ServerConfig{ + Network: "udp", + Address: ":8081", + WorkerNum: 50, + QueueSize: 500, + ReadTimeout: 5 * time.Second, + } + + // 使用与 TCP 示例相同的 MyPacket 和 handler + + // 创建并启动服务器 + server := network.NewServer(config, handler, packetFactory) + if err := server.Start(); err != nil { + log.Fatalf("Failed to start UDP server: %v", err) + } + + // ... +} +``` + +## API 文档 + +### 核心接口 + +#### `Packet` 接口 + +```go +type Packet interface { + Decode(data []byte) error + Encode() ([]byte, error) +} +``` + +所有数据包必须实现此接口,用于数据的序列化和反序列化。 + +#### `Conn` 接口 + +```go +type Conn interface { + Read(b []byte) (n int, err error) + Write(b []byte) (n int, err error) + Close() error + RemoteAddr() net.Addr +} +``` + +表示一个网络连接,TCP 和 UDP 连接都实现了此接口。 + +#### `Handler` 函数类型 + +```go +type Handler func(conn Conn, packet Packet) (Packet, error) +``` + +用户定义的处理函数,处理接收到的数据包并返回响应。 + +### 主要结构体 + +#### `ServerConfig` 结构体 + +```go +type ServerConfig struct { + Network string // 网络类型: "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6" + Address string // 监听地址,如 ":8080" + WorkerNum int // 工作协程数量 + QueueSize int // 任务队列大小 + MaxConn int // 最大连接数 + ReadTimeout time.Duration // 读取超时 + WriteTimeout time.Duration // 写入超时 + IdleTimeout time.Duration // 空闲连接超时 +} +``` + +#### `HighConcurrentServer` 结构体 + +主要的服务器实现,提供以下方法: + +- `NewServer(config ServerConfig, handler Handler, packetType func() Packet) *HighConcurrentServer`:创建新服务器 +- `Start() error`:启动服务器 +- `Stop()`:停止服务器 +- `GetActiveConnections() int64`:获取当前活动连接数 + +#### `WorkerPool` 结构体 + +工作协程池,用于高效处理任务: + +- `NewWorkerPool(size int, queueSize int) *WorkerPool`:创建工作池 +- `Submit(task func())`:提交任务 +- `Stop()`:停止工作池 + +## 性能优化 + +该框架采用了多种技术来优化性能: + +1. **连接分片**:使用多个分片管理连接,每个分片有自己的锁,减少锁竞争 +2. **工作协程池**:重用协程处理请求,避免频繁创建和销毁协程的开销 +3. **缓冲区池**:使用 `sync.Pool` 复用缓冲区,减少内存分配 +4. **非阻塞 I/O**:使用 channel 进行非阻塞的读写操作 +5. **批量处理**:在可能的情况下批量处理数据包,减少系统调用 + +## 贡献指南 + +欢迎贡献代码、报告问题或提出改进建议。请遵循以下步骤: + +1. Fork 仓库 +2. 创建功能分支 (`git checkout -b feature/amazing-feature`) +3. 提交更改 (`git commit -m 'Add some amazing feature'`) +4. 推送到分支 (`git push origin feature/amazing-feature`) +5. 创建 Pull Request + +## 许可证 + +[MIT](LICENSE) \ No newline at end of file diff --git a/network.go b/network.go index db058eb..6ebbcf6 100644 --- a/network.go +++ b/network.go @@ -1,3 +1,6 @@ +// Package network 提供了一个高性能的网络通信框架,支持TCP和UDP协议。 +// 该框架采用工作池和连接分片管理来实现高并发,支持自定义数据包格式和处理函数。 +// 主要特点包括:高并发连接处理、连接生命周期管理、优雅关闭机制和缓冲区复用。 package network import ( @@ -14,51 +17,63 @@ import ( // ========== 框架核心接口 ========== // Packet 用户自定义数据包必须实现的接口 +// 用户需要实现这个接口来定义自己的数据包格式,包括编码和解码方法 type Packet interface { - // 编码为字节流 + // Encode 将数据包编码为字节流 + // 返回编码后的字节切片和可能的错误 Encode() ([]byte, error) - // 从字节流解码 + + // Decode 从字节流解码为数据包 + // 参数data包含要解码的字节数据 + // 返回可能的解码错误 Decode([]byte) error } // Handler 用户自定义的数据处理函数类型 +// 当收到数据包时,框架会调用这个函数来处理数据 +// 参数conn是产生数据的网络连接,p是解码后的数据包 +// 返回响应数据包和可能的错误 type Handler func(conn net.Conn, p Packet) (Packet, error) // ========== 框架核心结构 ========== // ServerConfig 服务器配置 +// 包含服务器运行所需的各种参数设置 type ServerConfig struct { - Network string // tcp, tcp4, tcp6, udp, udp4, udp6 - Address string // 监听地址 - MaxConn int // 最大连接数 (TCP only) - WorkerNum int // 工作协程数量 - QueueSize int // 任务队列大小 - ReadTimeout time.Duration // 读取超时 - IdleTimeout time.Duration // 空闲超时 + Network string // 网络类型: tcp, tcp4, tcp6, udp, udp4, udp6 + Address string // 监听地址,格式为 "ip:port",如 ":8080" + MaxConn int // 最大连接数 (仅TCP有效) + WorkerNum int // 工作协程数量,默认为CPU核心数的2倍 + QueueSize int // 任务队列大小,默认为1024 + ReadTimeout time.Duration // 读取超时时间,0表示不设置超时 + IdleTimeout time.Duration // 空闲连接超时时间,0表示不清理空闲连接 } // connectionShard 连接分片 +// 用于高效管理大量连接,减少锁竞争 type connectionShard struct { - conns sync.Map // map[net.Conn]*Connection - lock sync.RWMutex - lastUsed time.Time + conns sync.Map // 存储连接的并发安全map,键为net.Conn,值为*Connection + lock sync.RWMutex // 读写锁,保护分片操作 + lastUsed time.Time // 最后使用时间,用于分片管理 } // Connection 连接封装 +// 封装了底层网络连接,提供了缓冲区和通道管理 type Connection struct { - net.Conn - readBuffer *bytes.Buffer - writeBuffer *bytes.Buffer - writeChan chan []byte - closeChan chan struct{} - server *HighConcurrentServer - lastActive time.Time + net.Conn // 嵌入底层网络连接 + readBuffer *bytes.Buffer // 读缓冲区 + writeBuffer *bytes.Buffer // 写缓冲区 + writeChan chan []byte // 写入通道,用于异步写入 + closeChan chan struct{} // 关闭通道,用于通知协程退出 + server *HighConcurrentServer // 所属服务器 + lastActive time.Time // 最后活动时间,用于空闲检测 } // WorkerPool 工作协程池 +// 用于高效处理网络请求,避免为每个连接创建协程 type WorkerPool struct { - taskQueue chan func() - size int + taskQueue chan func() // 任务队列,存储待执行的函数 + size int // 工作协程数量 } // ========== 框架实现 ========== diff --git a/server.go b/server.go index 1ac91ca..a28f669 100644 --- a/server.go +++ b/server.go @@ -1,3 +1,4 @@ +// Package network 提供了一个高性能的网络通信框架,支持TCP和UDP协议。 package network import ( @@ -11,34 +12,45 @@ import ( ) // HighConcurrentServer 高并发服务器 +// 实现了一个高性能、高并发的网络服务器,支持TCP和UDP协议 +// 主要特点包括: +// - 连接分片管理,减少锁竞争 +// - 工作协程池,高效处理请求 +// - 自动管理连接生命周期 +// - 优雅关闭机制 type HighConcurrentServer struct { - config ServerConfig - listener net.Listener - packetConn net.PacketConn - workerPool *WorkerPool - shards []*connectionShard - shardCount int - handler Handler - packetType func() Packet // 用于创建新Packet实例的函数 - activeConns int64 - shutdown chan struct{} - wg sync.WaitGroup + config ServerConfig // 服务器配置 + listener net.Listener // TCP监听器 + packetConn net.PacketConn // UDP包连接 + workerPool *WorkerPool // 工作协程池 + shards []*connectionShard // 连接分片数组 + shardCount int // 分片数量 + handler Handler // 用户自定义处理函数 + packetType func() Packet // 用于创建新Packet实例的工厂函数 + activeConns int64 // 当前活动连接数 + shutdown chan struct{} // 关闭信号通道 + wg sync.WaitGroup // 等待组,用于优雅关闭 } // NewServer 创建新的高并发服务器 +// config: 服务器配置 +// handler: 用户自定义的数据包处理函数 +// packetType: 用于创建新的数据包实例的工厂函数 +// 返回初始化好的服务器实例,但尚未启动 func NewServer(config ServerConfig, handler Handler, packetType func() Packet) *HighConcurrentServer { // 设置默认值 if config.WorkerNum <= 0 { - config.WorkerNum = runtime.NumCPU() * 2 + config.WorkerNum = runtime.NumCPU() * 2 // 默认为CPU核心数的2倍 } if config.QueueSize <= 0 { - config.QueueSize = 1024 + config.QueueSize = 1024 // 默认队列大小为1024 } if config.MaxConn <= 0 { - config.MaxConn = 100000 + config.MaxConn = 100000 // 默认最大连接数为10万 } - // 计算合适的分片数量 + // 计算合适的分片数量,至少为32,且不小于CPU核心数 + // 分片数量为2的幂,便于哈希分配 shardCount := 32 for shardCount < runtime.NumCPU() { shardCount *= 2 @@ -53,41 +65,47 @@ func NewServer(config ServerConfig, handler Handler, packetType func() Packet) * shards: make([]*connectionShard, shardCount), } - // 初始化分片 + // 初始化连接分片 for i := 0; i < shardCount; i++ { server.shards[i] = &connectionShard{} } - // 初始化工作池 + // 初始化工作协程池 server.workerPool = NewWorkerPool(config.WorkerNum, config.QueueSize) return server } // Start 启动服务器 +// 根据配置的网络类型启动相应的服务器(TCP或UDP) +// 返回可能的启动错误 func (s *HighConcurrentServer) Start() error { switch s.config.Network { case "tcp", "tcp4", "tcp6": - return s.startTCP() + return s.startTCP() // 启动TCP服务器 case "udp", "udp4", "udp6": - return s.startUDP() + return s.startUDP() // 启动UDP服务器 default: return fmt.Errorf("unsupported network type: %s", s.config.Network) } } // startTCP 启动TCP服务器 +// 创建监听器并启动接受连接循环和连接管理协程 +// 返回可能的启动错误 func (s *HighConcurrentServer) startTCP() error { + // 创建TCP监听器 ln, err := net.Listen(s.config.Network, s.config.Address) if err != nil { return err } s.listener = ln + // 启动接受连接循环 s.wg.Add(1) go s.acceptLoop() - // 启动连接管理 + // 启动连接管理(清理空闲连接) s.wg.Add(1) go s.manageConnections() @@ -95,13 +113,17 @@ func (s *HighConcurrentServer) startTCP() error { } // startUDP 启动UDP服务器 +// 创建UDP包连接并启动读取循环 +// 返回可能的启动错误 func (s *HighConcurrentServer) startUDP() error { + // 创建UDP包连接 pc, err := net.ListenPacket(s.config.Network, s.config.Address) if err != nil { return err } s.packetConn = pc + // 启动UDP读取循环 s.wg.Add(1) go s.udpReadLoop() @@ -109,31 +131,37 @@ func (s *HighConcurrentServer) startUDP() error { } // acceptLoop TCP接受连接循环 +// 持续接受新的TCP连接并为每个连接创建处理协程 +// 当收到关闭信号或发生错误时退出 func (s *HighConcurrentServer) acceptLoop() { defer s.wg.Done() for { + // 检查是否收到关闭信号 select { case <-s.shutdown: return default: } + // 接受新连接 conn, err := s.listener.Accept() if err != nil { + // 处理临时错误,如"too many open files" if ne, ok := err.(net.Error); ok && ne.Temporary() { - time.Sleep(100 * time.Millisecond) + time.Sleep(100 * time.Millisecond) // 短暂休眠后重试 continue } - return + return // 非临时错误,退出循环 } - // 检查最大连接数 + // 检查是否超过最大连接数限制 if atomic.LoadInt64(&s.activeConns) >= int64(s.config.MaxConn) { - conn.Close() + conn.Close() // 超过限制,直接关闭连接 continue } + // 增加活动连接计数并启动连接处理协程 atomic.AddInt64(&s.activeConns, 1) s.wg.Add(1) go s.handleNewConnection(conn) @@ -141,67 +169,79 @@ func (s *HighConcurrentServer) acceptLoop() { } // handleNewConnection 处理新连接 +// conn: 新接受的TCP连接 +// 为新连接创建Connection对象,启动读写循环,并管理连接的生命周期 func (s *HighConcurrentServer) handleNewConnection(conn net.Conn) { defer s.wg.Done() - defer atomic.AddInt64(&s.activeConns, -1) - defer conn.Close() + defer atomic.AddInt64(&s.activeConns, -1) // 减少活动连接计数 + defer conn.Close() // 确保连接关闭 - // 获取分片 + // 获取连接对应的分片,用于减少锁竞争 shard := s.getShard(conn) c := &Connection{ Conn: conn, - readBuffer: bytes.NewBuffer(make([]byte, 0, 4096)), - writeBuffer: bytes.NewBuffer(make([]byte, 0, 4096)), - writeChan: make(chan []byte, 32), - closeChan: make(chan struct{}), + readBuffer: bytes.NewBuffer(make([]byte, 0, 4096)), // 初始化读缓冲区,容量为4KB + writeBuffer: bytes.NewBuffer(make([]byte, 0, 4096)), // 初始化写缓冲区,容量为4KB + writeChan: make(chan []byte, 32), // 写入通道,缓冲大小为32 + closeChan: make(chan struct{}), // 关闭信号通道 server: s, - lastActive: time.Now(), + lastActive: time.Now(), // 记录初始活动时间 } - // 添加到连接管理 + // 将连接添加到分片管理中 shard.addConn(c) // 启动读写协程 - go c.readLoop() - c.writeLoop() + go c.readLoop() // 读取循环在单独的协程中运行 + c.writeLoop() // 写入循环在当前协程中运行 - // 等待关闭 + // 等待关闭信号 <-c.closeChan - shard.removeConn(c) + shard.removeConn(c) // 从分片中移除连接 } // getShard 获取连接对应的分片 +// conn: 网络连接 +// 返回该连接应该被分配到的分片 +// 使用连接的远程IP地址进行哈希来确定分片,以实现负载均衡 func (s *HighConcurrentServer) getShard(conn net.Conn) *connectionShard { - // 使用连接的远程地址作为分片键 + // 尝试获取TCP地址 addr, ok := conn.RemoteAddr().(*net.TCPAddr) if !ok { - return s.shards[0] + return s.shards[0] // 非TCP连接返回第一个分片 } - // 简单哈希算法 + // 使用IP地址的四个字节进行简单哈希 + // 将IP地址的四个字节相加作为哈希值 hash := addr.IP.To4()[0] + addr.IP.To4()[1] + addr.IP.To4()[2] + addr.IP.To4()[3] - return s.shards[int(hash)%s.shardCount] + return s.shards[int(hash)%s.shardCount] // 使用取模运算确定分片索引 } // udpReadLoop UDP读取循环 +// 持续从UDP连接读取数据包并处理 +// 使用sync.Pool复用缓冲区以减少内存分配 func (s *HighConcurrentServer) udpReadLoop() { defer s.wg.Done() defer s.packetConn.Close() + // 创建缓冲区池,每个缓冲区大小为64KB(UDP最大包大小) bufPool := sync.Pool{ New: func() interface{} { return make([]byte, 65536) }, } for { + // 检查是否收到关闭信号 select { case <-s.shutdown: return default: } + // 从池中获取缓冲区 buf := bufPool.Get().([]byte) n, addr, err := s.packetConn.ReadFrom(buf) if err != nil { + // 处理临时错误 if ne, ok := err.(net.Error); ok && ne.Temporary() { time.Sleep(100 * time.Millisecond) continue @@ -209,18 +249,19 @@ func (s *HighConcurrentServer) udpReadLoop() { return } + // 启动新的协程处理数据包 s.wg.Add(1) go func(data []byte, addr net.Addr) { defer s.wg.Done() - defer bufPool.Put(data) + defer bufPool.Put(data) // 将缓冲区放回池中 - // 处理UDP数据包 + // 解码数据包 packet := s.packetType() if err := packet.Decode(data[:n]); err != nil { return } - // 创建虚拟连接 + // 创建虚拟UDP连接 conn := &udpConn{PacketConn: s.packetConn, addr: addr} // 调用用户处理函数 @@ -229,7 +270,7 @@ func (s *HighConcurrentServer) udpReadLoop() { return } - // 发送响应 + // 发送响应(如果有) if response != nil { respData, err := response.Encode() if err == nil { @@ -241,9 +282,12 @@ func (s *HighConcurrentServer) udpReadLoop() { } // manageConnections 管理连接(关闭空闲连接) +// 定期检查并关闭超过空闲超时时间的连接 +// 当服务器关闭时退出 func (s *HighConcurrentServer) manageConnections() { defer s.wg.Done() + // 创建定时器,每30秒检查一次 ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() @@ -252,16 +296,21 @@ func (s *HighConcurrentServer) manageConnections() { case <-s.shutdown: return case <-ticker.C: + // 如果未设置空闲超时,跳过检查 if s.config.IdleTimeout <= 0 { continue } now := time.Now() + // 遍历所有分片 for _, shard := range s.shards { shard.lock.RLock() + // 遍历分片中的所有连接 shard.conns.Range(func(key, value interface{}) bool { conn := value.(*Connection) + // 检查连接是否超过空闲超时时间 if now.Sub(conn.lastActive) > s.config.IdleTimeout { + // 发送关闭信号 select { case conn.closeChan <- struct{}{}: default: